rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffers)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_code_actions)
  89            .add_handler(Server::apply_code_action)
  90            .add_handler(Server::get_channels)
  91            .add_handler(Server::get_users)
  92            .add_handler(Server::join_channel)
  93            .add_handler(Server::leave_channel)
  94            .add_handler(Server::send_channel_message)
  95            .add_handler(Server::get_channel_messages);
  96
  97        Arc::new(server)
  98    }
  99
 100    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 101    where
 102        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 103        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 104        M: EnvelopedMessage,
 105    {
 106        let prev_handler = self.handlers.insert(
 107            TypeId::of::<M>(),
 108            Box::new(move |server, envelope| {
 109                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 110                (handler)(server, *envelope).boxed()
 111            }),
 112        );
 113        if prev_handler.is_some() {
 114            panic!("registered a handler for the same message twice");
 115        }
 116        self
 117    }
 118
 119    pub fn handle_connection(
 120        self: &Arc<Self>,
 121        connection: Connection,
 122        addr: String,
 123        user_id: UserId,
 124        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 125    ) -> impl Future<Output = ()> {
 126        let mut this = self.clone();
 127        async move {
 128            let (connection_id, handle_io, mut incoming_rx) =
 129                this.peer.add_connection(connection).await;
 130
 131            if let Some(send_connection_id) = send_connection_id.as_mut() {
 132                let _ = send_connection_id.send(connection_id).await;
 133            }
 134
 135            this.state_mut().add_connection(connection_id, user_id);
 136            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 137                log::error!("error updating contacts for {:?}: {}", user_id, err);
 138            }
 139
 140            let handle_io = handle_io.fuse();
 141            futures::pin_mut!(handle_io);
 142            loop {
 143                let next_message = incoming_rx.next().fuse();
 144                futures::pin_mut!(next_message);
 145                futures::select_biased! {
 146                    result = handle_io => {
 147                        if let Err(err) = result {
 148                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 149                        }
 150                        break;
 151                    }
 152                    message = next_message => {
 153                        if let Some(message) = message {
 154                            let start_time = Instant::now();
 155                            let type_name = message.payload_type_name();
 156                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 157                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 158                                if let Err(err) = (handler)(this.clone(), message).await {
 159                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 160                                } else {
 161                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 162                                }
 163
 164                                if let Some(mut notifications) = this.notifications.clone() {
 165                                    let _ = notifications.send(()).await;
 166                                }
 167                            } else {
 168                                log::warn!("unhandled message: {}", type_name);
 169                            }
 170                        } else {
 171                            log::info!("rpc connection closed {:?}", addr);
 172                            break;
 173                        }
 174                    }
 175                }
 176            }
 177
 178            if let Err(err) = this.sign_out(connection_id).await {
 179                log::error!("error signing out connection {:?} - {:?}", addr, err);
 180            }
 181        }
 182    }
 183
 184    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 185        self.peer.disconnect(connection_id);
 186        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 187
 188        for (project_id, project) in removed_connection.hosted_projects {
 189            if let Some(share) = project.share {
 190                broadcast(
 191                    connection_id,
 192                    share.guests.keys().copied().collect(),
 193                    |conn_id| {
 194                        self.peer
 195                            .send(conn_id, proto::UnshareProject { project_id })
 196                    },
 197                )?;
 198            }
 199        }
 200
 201        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 202            broadcast(connection_id, peer_ids, |conn_id| {
 203                self.peer.send(
 204                    conn_id,
 205                    proto::RemoveProjectCollaborator {
 206                        project_id,
 207                        peer_id: connection_id.0,
 208                    },
 209                )
 210            })?;
 211        }
 212
 213        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 214        Ok(())
 215    }
 216
 217    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 218        self.peer.respond(request.receipt(), proto::Ack {})?;
 219        Ok(())
 220    }
 221
 222    async fn register_project(
 223        mut self: Arc<Server>,
 224        request: TypedEnvelope<proto::RegisterProject>,
 225    ) -> tide::Result<()> {
 226        let project_id = {
 227            let mut state = self.state_mut();
 228            let user_id = state.user_id_for_connection(request.sender_id)?;
 229            state.register_project(request.sender_id, user_id)
 230        };
 231        self.peer.respond(
 232            request.receipt(),
 233            proto::RegisterProjectResponse { project_id },
 234        )?;
 235        Ok(())
 236    }
 237
 238    async fn unregister_project(
 239        mut self: Arc<Server>,
 240        request: TypedEnvelope<proto::UnregisterProject>,
 241    ) -> tide::Result<()> {
 242        let project = self
 243            .state_mut()
 244            .unregister_project(request.payload.project_id, request.sender_id)
 245            .ok_or_else(|| anyhow!("no such project"))?;
 246        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 247        Ok(())
 248    }
 249
 250    async fn share_project(
 251        mut self: Arc<Server>,
 252        request: TypedEnvelope<proto::ShareProject>,
 253    ) -> tide::Result<()> {
 254        self.state_mut()
 255            .share_project(request.payload.project_id, request.sender_id);
 256        self.peer.respond(request.receipt(), proto::Ack {})?;
 257        Ok(())
 258    }
 259
 260    async fn unshare_project(
 261        mut self: Arc<Server>,
 262        request: TypedEnvelope<proto::UnshareProject>,
 263    ) -> tide::Result<()> {
 264        let project_id = request.payload.project_id;
 265        let project = self
 266            .state_mut()
 267            .unshare_project(project_id, request.sender_id)?;
 268
 269        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 270            self.peer
 271                .send(conn_id, proto::UnshareProject { project_id })
 272        })?;
 273        self.update_contacts_for_users(&project.authorized_user_ids)?;
 274        Ok(())
 275    }
 276
 277    async fn join_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::JoinProject>,
 280    ) -> tide::Result<()> {
 281        let project_id = request.payload.project_id;
 282
 283        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 284        let response_data = self
 285            .state_mut()
 286            .join_project(request.sender_id, user_id, project_id)
 287            .and_then(|joined| {
 288                let share = joined.project.share()?;
 289                let peer_count = share.guests.len();
 290                let mut collaborators = Vec::with_capacity(peer_count);
 291                collaborators.push(proto::Collaborator {
 292                    peer_id: joined.project.host_connection_id.0,
 293                    replica_id: 0,
 294                    user_id: joined.project.host_user_id.to_proto(),
 295                });
 296                let worktrees = joined
 297                    .project
 298                    .worktrees
 299                    .iter()
 300                    .filter_map(|(id, worktree)| {
 301                        worktree.share.as_ref().map(|share| proto::Worktree {
 302                            id: *id,
 303                            root_name: worktree.root_name.clone(),
 304                            entries: share.entries.values().cloned().collect(),
 305                            diagnostic_summaries: share
 306                                .diagnostic_summaries
 307                                .values()
 308                                .cloned()
 309                                .collect(),
 310                            weak: worktree.weak,
 311                        })
 312                    })
 313                    .collect();
 314                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 315                    if *peer_conn_id != request.sender_id {
 316                        collaborators.push(proto::Collaborator {
 317                            peer_id: peer_conn_id.0,
 318                            replica_id: *peer_replica_id as u32,
 319                            user_id: peer_user_id.to_proto(),
 320                        });
 321                    }
 322                }
 323                let response = proto::JoinProjectResponse {
 324                    worktrees,
 325                    replica_id: joined.replica_id as u32,
 326                    collaborators,
 327                };
 328                let connection_ids = joined.project.connection_ids();
 329                let contact_user_ids = joined.project.authorized_user_ids();
 330                Ok((response, connection_ids, contact_user_ids))
 331            });
 332
 333        match response_data {
 334            Ok((response, connection_ids, contact_user_ids)) => {
 335                broadcast(request.sender_id, connection_ids, |conn_id| {
 336                    self.peer.send(
 337                        conn_id,
 338                        proto::AddProjectCollaborator {
 339                            project_id,
 340                            collaborator: Some(proto::Collaborator {
 341                                peer_id: request.sender_id.0,
 342                                replica_id: response.replica_id,
 343                                user_id: user_id.to_proto(),
 344                            }),
 345                        },
 346                    )
 347                })?;
 348                self.peer.respond(request.receipt(), response)?;
 349                self.update_contacts_for_users(&contact_user_ids)?;
 350            }
 351            Err(error) => {
 352                self.peer.respond_with_error(
 353                    request.receipt(),
 354                    proto::Error {
 355                        message: error.to_string(),
 356                    },
 357                )?;
 358            }
 359        }
 360
 361        Ok(())
 362    }
 363
 364    async fn leave_project(
 365        mut self: Arc<Server>,
 366        request: TypedEnvelope<proto::LeaveProject>,
 367    ) -> tide::Result<()> {
 368        let sender_id = request.sender_id;
 369        let project_id = request.payload.project_id;
 370        let worktree = self.state_mut().leave_project(sender_id, project_id);
 371        if let Some(worktree) = worktree {
 372            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 373                self.peer.send(
 374                    conn_id,
 375                    proto::RemoveProjectCollaborator {
 376                        project_id,
 377                        peer_id: sender_id.0,
 378                    },
 379                )
 380            })?;
 381            self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 382        }
 383        Ok(())
 384    }
 385
 386    async fn register_worktree(
 387        mut self: Arc<Server>,
 388        request: TypedEnvelope<proto::RegisterWorktree>,
 389    ) -> tide::Result<()> {
 390        let receipt = request.receipt();
 391        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 392
 393        let mut contact_user_ids = HashSet::default();
 394        contact_user_ids.insert(host_user_id);
 395        for github_login in request.payload.authorized_logins {
 396            match self.app_state.db.create_user(&github_login, false).await {
 397                Ok(contact_user_id) => {
 398                    contact_user_ids.insert(contact_user_id);
 399                }
 400                Err(err) => {
 401                    let message = err.to_string();
 402                    self.peer
 403                        .respond_with_error(receipt, proto::Error { message })?;
 404                    return Ok(());
 405                }
 406            }
 407        }
 408
 409        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 410        let ok = self.state_mut().register_worktree(
 411            request.payload.project_id,
 412            request.payload.worktree_id,
 413            Worktree {
 414                authorized_user_ids: contact_user_ids.clone(),
 415                root_name: request.payload.root_name,
 416                share: None,
 417                weak: false,
 418            },
 419        );
 420
 421        if ok {
 422            self.peer.respond(receipt, proto::Ack {})?;
 423            self.update_contacts_for_users(&contact_user_ids)?;
 424        } else {
 425            self.peer.respond_with_error(
 426                receipt,
 427                proto::Error {
 428                    message: NO_SUCH_PROJECT.to_string(),
 429                },
 430            )?;
 431        }
 432
 433        Ok(())
 434    }
 435
 436    async fn unregister_worktree(
 437        mut self: Arc<Server>,
 438        request: TypedEnvelope<proto::UnregisterWorktree>,
 439    ) -> tide::Result<()> {
 440        let project_id = request.payload.project_id;
 441        let worktree_id = request.payload.worktree_id;
 442        let (worktree, guest_connection_ids) =
 443            self.state_mut()
 444                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 445        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 446            self.peer.send(
 447                conn_id,
 448                proto::UnregisterWorktree {
 449                    project_id,
 450                    worktree_id,
 451                },
 452            )
 453        })?;
 454        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 455        Ok(())
 456    }
 457
 458    async fn share_worktree(
 459        mut self: Arc<Server>,
 460        mut request: TypedEnvelope<proto::ShareWorktree>,
 461    ) -> tide::Result<()> {
 462        let worktree = request
 463            .payload
 464            .worktree
 465            .as_mut()
 466            .ok_or_else(|| anyhow!("missing worktree"))?;
 467        let entries = worktree
 468            .entries
 469            .iter()
 470            .map(|entry| (entry.id, entry.clone()))
 471            .collect();
 472        let diagnostic_summaries = worktree
 473            .diagnostic_summaries
 474            .iter()
 475            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 476            .collect();
 477
 478        let shared_worktree = self.state_mut().share_worktree(
 479            request.payload.project_id,
 480            worktree.id,
 481            request.sender_id,
 482            entries,
 483            diagnostic_summaries,
 484        );
 485        if let Some(shared_worktree) = shared_worktree {
 486            broadcast(
 487                request.sender_id,
 488                shared_worktree.connection_ids,
 489                |connection_id| {
 490                    self.peer.forward_send(
 491                        request.sender_id,
 492                        connection_id,
 493                        request.payload.clone(),
 494                    )
 495                },
 496            )?;
 497            self.peer.respond(request.receipt(), proto::Ack {})?;
 498            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 499        } else {
 500            self.peer.respond_with_error(
 501                request.receipt(),
 502                proto::Error {
 503                    message: "no such worktree".to_string(),
 504                },
 505            )?;
 506        }
 507        Ok(())
 508    }
 509
 510    async fn update_worktree(
 511        mut self: Arc<Server>,
 512        request: TypedEnvelope<proto::UpdateWorktree>,
 513    ) -> tide::Result<()> {
 514        let connection_ids = self
 515            .state_mut()
 516            .update_worktree(
 517                request.sender_id,
 518                request.payload.project_id,
 519                request.payload.worktree_id,
 520                &request.payload.removed_entries,
 521                &request.payload.updated_entries,
 522            )
 523            .ok_or_else(|| anyhow!("no such worktree"))?;
 524
 525        broadcast(request.sender_id, connection_ids, |connection_id| {
 526            self.peer
 527                .forward_send(request.sender_id, connection_id, request.payload.clone())
 528        })?;
 529
 530        Ok(())
 531    }
 532
 533    async fn update_diagnostic_summary(
 534        mut self: Arc<Server>,
 535        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = request
 538            .payload
 539            .summary
 540            .clone()
 541            .and_then(|summary| {
 542                self.state_mut().update_diagnostic_summary(
 543                    request.payload.project_id,
 544                    request.payload.worktree_id,
 545                    request.sender_id,
 546                    summary,
 547                )
 548            })
 549            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 550
 551        broadcast(request.sender_id, receiver_ids, |connection_id| {
 552            self.peer
 553                .forward_send(request.sender_id, connection_id, request.payload.clone())
 554        })?;
 555        Ok(())
 556    }
 557
 558    async fn disk_based_diagnostics_updating(
 559        self: Arc<Server>,
 560        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 561    ) -> tide::Result<()> {
 562        let receiver_ids = self
 563            .state()
 564            .project_connection_ids(request.payload.project_id, request.sender_id)
 565            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 566        broadcast(request.sender_id, receiver_ids, |connection_id| {
 567            self.peer
 568                .forward_send(request.sender_id, connection_id, request.payload.clone())
 569        })?;
 570        Ok(())
 571    }
 572
 573    async fn disk_based_diagnostics_updated(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 576    ) -> tide::Result<()> {
 577        let receiver_ids = self
 578            .state()
 579            .project_connection_ids(request.payload.project_id, request.sender_id)
 580            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 581        broadcast(request.sender_id, receiver_ids, |connection_id| {
 582            self.peer
 583                .forward_send(request.sender_id, connection_id, request.payload.clone())
 584        })?;
 585        Ok(())
 586    }
 587
 588    async fn get_definition(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::GetDefinition>,
 591    ) -> tide::Result<()> {
 592        let receipt = request.receipt();
 593        let host_connection_id = self
 594            .state()
 595            .read_project(request.payload.project_id, request.sender_id)
 596            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 597            .host_connection_id;
 598        let response = self
 599            .peer
 600            .forward_request(request.sender_id, host_connection_id, request.payload)
 601            .await?;
 602        self.peer.respond(receipt, response)?;
 603        Ok(())
 604    }
 605
 606    async fn open_buffer(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::OpenBuffer>,
 609    ) -> tide::Result<()> {
 610        let receipt = request.receipt();
 611        let host_connection_id = self
 612            .state()
 613            .read_project(request.payload.project_id, request.sender_id)
 614            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 615            .host_connection_id;
 616        let response = self
 617            .peer
 618            .forward_request(request.sender_id, host_connection_id, request.payload)
 619            .await?;
 620        self.peer.respond(receipt, response)?;
 621        Ok(())
 622    }
 623
 624    async fn close_buffer(
 625        self: Arc<Server>,
 626        request: TypedEnvelope<proto::CloseBuffer>,
 627    ) -> tide::Result<()> {
 628        let host_connection_id = self
 629            .state()
 630            .read_project(request.payload.project_id, request.sender_id)
 631            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 632            .host_connection_id;
 633        self.peer
 634            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 635        Ok(())
 636    }
 637
 638    async fn save_buffer(
 639        self: Arc<Server>,
 640        request: TypedEnvelope<proto::SaveBuffer>,
 641    ) -> tide::Result<()> {
 642        let host;
 643        let guests;
 644        {
 645            let state = self.state();
 646            let project = state
 647                .read_project(request.payload.project_id, request.sender_id)
 648                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 649            host = project.host_connection_id;
 650            guests = project.guest_connection_ids()
 651        }
 652
 653        let sender = request.sender_id;
 654        let receipt = request.receipt();
 655        let response = self
 656            .peer
 657            .forward_request(sender, host, request.payload.clone())
 658            .await?;
 659
 660        broadcast(host, guests, |conn_id| {
 661            let response = response.clone();
 662            if conn_id == sender {
 663                self.peer.respond(receipt, response)
 664            } else {
 665                self.peer.forward_send(host, conn_id, response)
 666            }
 667        })?;
 668
 669        Ok(())
 670    }
 671
 672    async fn format_buffers(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::FormatBuffers>,
 675    ) -> tide::Result<()> {
 676        let host;
 677        {
 678            let state = self.state();
 679            let project = state
 680                .read_project(request.payload.project_id, request.sender_id)
 681                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 682            host = project.host_connection_id;
 683        }
 684
 685        let sender = request.sender_id;
 686        let receipt = request.receipt();
 687        let response = self
 688            .peer
 689            .forward_request(sender, host, request.payload.clone())
 690            .await?;
 691        self.peer.respond(receipt, response)?;
 692
 693        Ok(())
 694    }
 695
 696    async fn get_completions(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::GetCompletions>,
 699    ) -> tide::Result<()> {
 700        let host;
 701        {
 702            let state = self.state();
 703            let project = state
 704                .read_project(request.payload.project_id, request.sender_id)
 705                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 706            host = project.host_connection_id;
 707        }
 708
 709        let sender = request.sender_id;
 710        let receipt = request.receipt();
 711        let response = self
 712            .peer
 713            .forward_request(sender, host, request.payload.clone())
 714            .await?;
 715        self.peer.respond(receipt, response)?;
 716        Ok(())
 717    }
 718
 719    async fn apply_additional_edits_for_completion(
 720        self: Arc<Server>,
 721        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 722    ) -> tide::Result<()> {
 723        let host;
 724        {
 725            let state = self.state();
 726            let project = state
 727                .read_project(request.payload.project_id, request.sender_id)
 728                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 729            host = project.host_connection_id;
 730        }
 731
 732        let sender = request.sender_id;
 733        let receipt = request.receipt();
 734        let response = self
 735            .peer
 736            .forward_request(sender, host, request.payload.clone())
 737            .await?;
 738        self.peer.respond(receipt, response)?;
 739        Ok(())
 740    }
 741
 742    async fn get_code_actions(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::GetCodeActions>,
 745    ) -> tide::Result<()> {
 746        let host;
 747        {
 748            let state = self.state();
 749            let project = state
 750                .read_project(request.payload.project_id, request.sender_id)
 751                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 752            host = project.host_connection_id;
 753        }
 754
 755        let sender = request.sender_id;
 756        let receipt = request.receipt();
 757        let response = self
 758            .peer
 759            .forward_request(sender, host, request.payload.clone())
 760            .await?;
 761        self.peer.respond(receipt, response)?;
 762        Ok(())
 763    }
 764
 765    async fn apply_code_action(
 766        self: Arc<Server>,
 767        request: TypedEnvelope<proto::ApplyCodeAction>,
 768    ) -> tide::Result<()> {
 769        let host;
 770        {
 771            let state = self.state();
 772            let project = state
 773                .read_project(request.payload.project_id, request.sender_id)
 774                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 775            host = project.host_connection_id;
 776        }
 777
 778        let sender = request.sender_id;
 779        let receipt = request.receipt();
 780        let response = self
 781            .peer
 782            .forward_request(sender, host, request.payload.clone())
 783            .await?;
 784        self.peer.respond(receipt, response)?;
 785        Ok(())
 786    }
 787
 788    async fn update_buffer(
 789        self: Arc<Server>,
 790        request: TypedEnvelope<proto::UpdateBuffer>,
 791    ) -> tide::Result<()> {
 792        let receiver_ids = self
 793            .state()
 794            .project_connection_ids(request.payload.project_id, request.sender_id)
 795            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 796        broadcast(request.sender_id, receiver_ids, |connection_id| {
 797            self.peer
 798                .forward_send(request.sender_id, connection_id, request.payload.clone())
 799        })?;
 800        self.peer.respond(request.receipt(), proto::Ack {})?;
 801        Ok(())
 802    }
 803
 804    async fn update_buffer_file(
 805        self: Arc<Server>,
 806        request: TypedEnvelope<proto::UpdateBufferFile>,
 807    ) -> tide::Result<()> {
 808        let receiver_ids = self
 809            .state()
 810            .project_connection_ids(request.payload.project_id, request.sender_id)
 811            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 812        broadcast(request.sender_id, receiver_ids, |connection_id| {
 813            self.peer
 814                .forward_send(request.sender_id, connection_id, request.payload.clone())
 815        })?;
 816        Ok(())
 817    }
 818
 819    async fn buffer_reloaded(
 820        self: Arc<Server>,
 821        request: TypedEnvelope<proto::BufferReloaded>,
 822    ) -> tide::Result<()> {
 823        let receiver_ids = self
 824            .state()
 825            .project_connection_ids(request.payload.project_id, request.sender_id)
 826            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 827        broadcast(request.sender_id, receiver_ids, |connection_id| {
 828            self.peer
 829                .forward_send(request.sender_id, connection_id, request.payload.clone())
 830        })?;
 831        Ok(())
 832    }
 833
 834    async fn buffer_saved(
 835        self: Arc<Server>,
 836        request: TypedEnvelope<proto::BufferSaved>,
 837    ) -> tide::Result<()> {
 838        let receiver_ids = self
 839            .state()
 840            .project_connection_ids(request.payload.project_id, request.sender_id)
 841            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 842        broadcast(request.sender_id, receiver_ids, |connection_id| {
 843            self.peer
 844                .forward_send(request.sender_id, connection_id, request.payload.clone())
 845        })?;
 846        Ok(())
 847    }
 848
 849    async fn get_channels(
 850        self: Arc<Server>,
 851        request: TypedEnvelope<proto::GetChannels>,
 852    ) -> tide::Result<()> {
 853        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 854        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 855        self.peer.respond(
 856            request.receipt(),
 857            proto::GetChannelsResponse {
 858                channels: channels
 859                    .into_iter()
 860                    .map(|chan| proto::Channel {
 861                        id: chan.id.to_proto(),
 862                        name: chan.name,
 863                    })
 864                    .collect(),
 865            },
 866        )?;
 867        Ok(())
 868    }
 869
 870    async fn get_users(
 871        self: Arc<Server>,
 872        request: TypedEnvelope<proto::GetUsers>,
 873    ) -> tide::Result<()> {
 874        let receipt = request.receipt();
 875        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 876        let users = self
 877            .app_state
 878            .db
 879            .get_users_by_ids(user_ids)
 880            .await?
 881            .into_iter()
 882            .map(|user| proto::User {
 883                id: user.id.to_proto(),
 884                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 885                github_login: user.github_login,
 886            })
 887            .collect();
 888        self.peer
 889            .respond(receipt, proto::GetUsersResponse { users })?;
 890        Ok(())
 891    }
 892
 893    fn update_contacts_for_users<'a>(
 894        self: &Arc<Server>,
 895        user_ids: impl IntoIterator<Item = &'a UserId>,
 896    ) -> anyhow::Result<()> {
 897        let mut result = Ok(());
 898        let state = self.state();
 899        for user_id in user_ids {
 900            let contacts = state.contacts_for_user(*user_id);
 901            for connection_id in state.connection_ids_for_user(*user_id) {
 902                if let Err(error) = self.peer.send(
 903                    connection_id,
 904                    proto::UpdateContacts {
 905                        contacts: contacts.clone(),
 906                    },
 907                ) {
 908                    result = Err(error);
 909                }
 910            }
 911        }
 912        result
 913    }
 914
 915    async fn join_channel(
 916        mut self: Arc<Self>,
 917        request: TypedEnvelope<proto::JoinChannel>,
 918    ) -> tide::Result<()> {
 919        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 920        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 921        if !self
 922            .app_state
 923            .db
 924            .can_user_access_channel(user_id, channel_id)
 925            .await?
 926        {
 927            Err(anyhow!("access denied"))?;
 928        }
 929
 930        self.state_mut().join_channel(request.sender_id, channel_id);
 931        let messages = self
 932            .app_state
 933            .db
 934            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 935            .await?
 936            .into_iter()
 937            .map(|msg| proto::ChannelMessage {
 938                id: msg.id.to_proto(),
 939                body: msg.body,
 940                timestamp: msg.sent_at.unix_timestamp() as u64,
 941                sender_id: msg.sender_id.to_proto(),
 942                nonce: Some(msg.nonce.as_u128().into()),
 943            })
 944            .collect::<Vec<_>>();
 945        self.peer.respond(
 946            request.receipt(),
 947            proto::JoinChannelResponse {
 948                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 949                messages,
 950            },
 951        )?;
 952        Ok(())
 953    }
 954
 955    async fn leave_channel(
 956        mut self: Arc<Self>,
 957        request: TypedEnvelope<proto::LeaveChannel>,
 958    ) -> tide::Result<()> {
 959        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 960        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 961        if !self
 962            .app_state
 963            .db
 964            .can_user_access_channel(user_id, channel_id)
 965            .await?
 966        {
 967            Err(anyhow!("access denied"))?;
 968        }
 969
 970        self.state_mut()
 971            .leave_channel(request.sender_id, channel_id);
 972
 973        Ok(())
 974    }
 975
 976    async fn send_channel_message(
 977        self: Arc<Self>,
 978        request: TypedEnvelope<proto::SendChannelMessage>,
 979    ) -> tide::Result<()> {
 980        let receipt = request.receipt();
 981        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 982        let user_id;
 983        let connection_ids;
 984        {
 985            let state = self.state();
 986            user_id = state.user_id_for_connection(request.sender_id)?;
 987            if let Some(ids) = state.channel_connection_ids(channel_id) {
 988                connection_ids = ids;
 989            } else {
 990                return Ok(());
 991            }
 992        }
 993
 994        // Validate the message body.
 995        let body = request.payload.body.trim().to_string();
 996        if body.len() > MAX_MESSAGE_LEN {
 997            self.peer.respond_with_error(
 998                receipt,
 999                proto::Error {
1000                    message: "message is too long".to_string(),
1001                },
1002            )?;
1003            return Ok(());
1004        }
1005        if body.is_empty() {
1006            self.peer.respond_with_error(
1007                receipt,
1008                proto::Error {
1009                    message: "message can't be blank".to_string(),
1010                },
1011            )?;
1012            return Ok(());
1013        }
1014
1015        let timestamp = OffsetDateTime::now_utc();
1016        let nonce = if let Some(nonce) = request.payload.nonce {
1017            nonce
1018        } else {
1019            self.peer.respond_with_error(
1020                receipt,
1021                proto::Error {
1022                    message: "nonce can't be blank".to_string(),
1023                },
1024            )?;
1025            return Ok(());
1026        };
1027
1028        let message_id = self
1029            .app_state
1030            .db
1031            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1032            .await?
1033            .to_proto();
1034        let message = proto::ChannelMessage {
1035            sender_id: user_id.to_proto(),
1036            id: message_id,
1037            body,
1038            timestamp: timestamp.unix_timestamp() as u64,
1039            nonce: Some(nonce),
1040        };
1041        broadcast(request.sender_id, connection_ids, |conn_id| {
1042            self.peer.send(
1043                conn_id,
1044                proto::ChannelMessageSent {
1045                    channel_id: channel_id.to_proto(),
1046                    message: Some(message.clone()),
1047                },
1048            )
1049        })?;
1050        self.peer.respond(
1051            receipt,
1052            proto::SendChannelMessageResponse {
1053                message: Some(message),
1054            },
1055        )?;
1056        Ok(())
1057    }
1058
1059    async fn get_channel_messages(
1060        self: Arc<Self>,
1061        request: TypedEnvelope<proto::GetChannelMessages>,
1062    ) -> tide::Result<()> {
1063        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1064        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1065        if !self
1066            .app_state
1067            .db
1068            .can_user_access_channel(user_id, channel_id)
1069            .await?
1070        {
1071            Err(anyhow!("access denied"))?;
1072        }
1073
1074        let messages = self
1075            .app_state
1076            .db
1077            .get_channel_messages(
1078                channel_id,
1079                MESSAGE_COUNT_PER_PAGE,
1080                Some(MessageId::from_proto(request.payload.before_message_id)),
1081            )
1082            .await?
1083            .into_iter()
1084            .map(|msg| proto::ChannelMessage {
1085                id: msg.id.to_proto(),
1086                body: msg.body,
1087                timestamp: msg.sent_at.unix_timestamp() as u64,
1088                sender_id: msg.sender_id.to_proto(),
1089                nonce: Some(msg.nonce.as_u128().into()),
1090            })
1091            .collect::<Vec<_>>();
1092        self.peer.respond(
1093            request.receipt(),
1094            proto::GetChannelMessagesResponse {
1095                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1096                messages,
1097            },
1098        )?;
1099        Ok(())
1100    }
1101
1102    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1103        self.store.read()
1104    }
1105
1106    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1107        self.store.write()
1108    }
1109}
1110
1111fn broadcast<F>(
1112    sender_id: ConnectionId,
1113    receiver_ids: Vec<ConnectionId>,
1114    mut f: F,
1115) -> anyhow::Result<()>
1116where
1117    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1118{
1119    let mut result = Ok(());
1120    for receiver_id in receiver_ids {
1121        if receiver_id != sender_id {
1122            if let Err(error) = f(receiver_id) {
1123                if result.is_ok() {
1124                    result = Err(error);
1125                }
1126            }
1127        }
1128    }
1129    result
1130}
1131
1132pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1133    let server = Server::new(app.state().clone(), rpc.clone(), None);
1134    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1135        let server = server.clone();
1136        async move {
1137            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1138
1139            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1140            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1141            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1142            let client_protocol_version: Option<u32> = request
1143                .header("X-Zed-Protocol-Version")
1144                .and_then(|v| v.as_str().parse().ok());
1145
1146            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1147                return Ok(Response::new(StatusCode::UpgradeRequired));
1148            }
1149
1150            let header = match request.header("Sec-Websocket-Key") {
1151                Some(h) => h.as_str(),
1152                None => return Err(anyhow!("expected sec-websocket-key"))?,
1153            };
1154
1155            let user_id = process_auth_header(&request).await?;
1156
1157            let mut response = Response::new(StatusCode::SwitchingProtocols);
1158            response.insert_header(UPGRADE, "websocket");
1159            response.insert_header(CONNECTION, "Upgrade");
1160            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1161            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1162            response.insert_header("Sec-Websocket-Version", "13");
1163
1164            let http_res: &mut tide::http::Response = response.as_mut();
1165            let upgrade_receiver = http_res.recv_upgrade().await;
1166            let addr = request.remote().unwrap_or("unknown").to_string();
1167            task::spawn(async move {
1168                if let Some(stream) = upgrade_receiver.await {
1169                    server
1170                        .handle_connection(
1171                            Connection::new(
1172                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1173                            ),
1174                            addr,
1175                            user_id,
1176                            None,
1177                        )
1178                        .await;
1179                }
1180            });
1181
1182            Ok(response)
1183        }
1184    });
1185}
1186
1187fn header_contains_ignore_case<T>(
1188    request: &tide::Request<T>,
1189    header_name: HeaderName,
1190    value: &str,
1191) -> bool {
1192    request
1193        .header(header_name)
1194        .map(|h| {
1195            h.as_str()
1196                .split(',')
1197                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1198        })
1199        .unwrap_or(false)
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204    use super::*;
1205    use crate::{
1206        auth,
1207        db::{tests::TestDb, UserId},
1208        github, AppState, Config,
1209    };
1210    use ::rpc::Peer;
1211    use async_std::task;
1212    use gpui::{executor, ModelHandle, TestAppContext};
1213    use parking_lot::Mutex;
1214    use postage::{mpsc, watch};
1215    use rand::prelude::*;
1216    use rpc::PeerId;
1217    use serde_json::json;
1218    use sqlx::types::time::OffsetDateTime;
1219    use std::{
1220        ops::Deref,
1221        path::Path,
1222        rc::Rc,
1223        sync::{
1224            atomic::{AtomicBool, Ordering::SeqCst},
1225            Arc,
1226        },
1227        time::Duration,
1228    };
1229    use zed::{
1230        client::{
1231            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1232            EstablishConnectionError, UserStore,
1233        },
1234        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1235        fs::{FakeFs, Fs as _},
1236        language::{
1237            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1238            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1239        },
1240        lsp,
1241        project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath},
1242    };
1243
1244    #[cfg(test)]
1245    #[ctor::ctor]
1246    fn init_logger() {
1247        if std::env::var("RUST_LOG").is_ok() {
1248            env_logger::init();
1249        }
1250    }
1251
1252    #[gpui::test(iterations = 10)]
1253    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1254        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1255        let lang_registry = Arc::new(LanguageRegistry::new());
1256        let fs = Arc::new(FakeFs::new(cx_a.background()));
1257        cx_a.foreground().forbid_parking();
1258
1259        // Connect to a server as 2 clients.
1260        let mut server = TestServer::start(cx_a.foreground()).await;
1261        let client_a = server.create_client(&mut cx_a, "user_a").await;
1262        let client_b = server.create_client(&mut cx_b, "user_b").await;
1263
1264        // Share a project as client A
1265        fs.insert_tree(
1266            "/a",
1267            json!({
1268                ".zed.toml": r#"collaborators = ["user_b"]"#,
1269                "a.txt": "a-contents",
1270                "b.txt": "b-contents",
1271            }),
1272        )
1273        .await;
1274        let project_a = cx_a.update(|cx| {
1275            Project::local(
1276                client_a.clone(),
1277                client_a.user_store.clone(),
1278                lang_registry.clone(),
1279                fs.clone(),
1280                cx,
1281            )
1282        });
1283        let (worktree_a, _) = project_a
1284            .update(&mut cx_a, |p, cx| {
1285                p.find_or_create_local_worktree("/a", false, cx)
1286            })
1287            .await
1288            .unwrap();
1289        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1290        worktree_a
1291            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1292            .await;
1293        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1294        project_a
1295            .update(&mut cx_a, |p, cx| p.share(cx))
1296            .await
1297            .unwrap();
1298
1299        // Join that project as client B
1300        let project_b = Project::remote(
1301            project_id,
1302            client_b.clone(),
1303            client_b.user_store.clone(),
1304            lang_registry.clone(),
1305            fs.clone(),
1306            &mut cx_b.to_async(),
1307        )
1308        .await
1309        .unwrap();
1310
1311        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1312            assert_eq!(
1313                project
1314                    .collaborators()
1315                    .get(&client_a.peer_id)
1316                    .unwrap()
1317                    .user
1318                    .github_login,
1319                "user_a"
1320            );
1321            project.replica_id()
1322        });
1323        project_a
1324            .condition(&cx_a, |tree, _| {
1325                tree.collaborators()
1326                    .get(&client_b.peer_id)
1327                    .map_or(false, |collaborator| {
1328                        collaborator.replica_id == replica_id_b
1329                            && collaborator.user.github_login == "user_b"
1330                    })
1331            })
1332            .await;
1333
1334        // Open the same file as client B and client A.
1335        let buffer_b = project_b
1336            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1337            .await
1338            .unwrap();
1339        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1340        buffer_b.read_with(&cx_b, |buf, cx| {
1341            assert_eq!(buf.read(cx).text(), "b-contents")
1342        });
1343        project_a.read_with(&cx_a, |project, cx| {
1344            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1345        });
1346        let buffer_a = project_a
1347            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1348            .await
1349            .unwrap();
1350
1351        let editor_b = cx_b.add_view(window_b, |cx| {
1352            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1353        });
1354
1355        // TODO
1356        // // Create a selection set as client B and see that selection set as client A.
1357        // buffer_a
1358        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1359        //     .await;
1360
1361        // Edit the buffer as client B and see that edit as client A.
1362        editor_b.update(&mut cx_b, |editor, cx| {
1363            editor.handle_input(&Input("ok, ".into()), cx)
1364        });
1365        buffer_a
1366            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1367            .await;
1368
1369        // TODO
1370        // // Remove the selection set as client B, see those selections disappear as client A.
1371        cx_b.update(move |_| drop(editor_b));
1372        // buffer_a
1373        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1374        //     .await;
1375
1376        // Close the buffer as client A, see that the buffer is closed.
1377        cx_a.update(move |_| drop(buffer_a));
1378        project_a
1379            .condition(&cx_a, |project, cx| {
1380                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1381            })
1382            .await;
1383
1384        // Dropping the client B's project removes client B from client A's collaborators.
1385        cx_b.update(move |_| drop(project_b));
1386        project_a
1387            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1388            .await;
1389    }
1390
1391    #[gpui::test(iterations = 10)]
1392    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1393        let lang_registry = Arc::new(LanguageRegistry::new());
1394        let fs = Arc::new(FakeFs::new(cx_a.background()));
1395        cx_a.foreground().forbid_parking();
1396
1397        // Connect to a server as 2 clients.
1398        let mut server = TestServer::start(cx_a.foreground()).await;
1399        let client_a = server.create_client(&mut cx_a, "user_a").await;
1400        let client_b = server.create_client(&mut cx_b, "user_b").await;
1401
1402        // Share a project as client A
1403        fs.insert_tree(
1404            "/a",
1405            json!({
1406                ".zed.toml": r#"collaborators = ["user_b"]"#,
1407                "a.txt": "a-contents",
1408                "b.txt": "b-contents",
1409            }),
1410        )
1411        .await;
1412        let project_a = cx_a.update(|cx| {
1413            Project::local(
1414                client_a.clone(),
1415                client_a.user_store.clone(),
1416                lang_registry.clone(),
1417                fs.clone(),
1418                cx,
1419            )
1420        });
1421        let (worktree_a, _) = project_a
1422            .update(&mut cx_a, |p, cx| {
1423                p.find_or_create_local_worktree("/a", false, cx)
1424            })
1425            .await
1426            .unwrap();
1427        worktree_a
1428            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1429            .await;
1430        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1431        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1432        project_a
1433            .update(&mut cx_a, |p, cx| p.share(cx))
1434            .await
1435            .unwrap();
1436        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1437
1438        // Join that project as client B
1439        let project_b = Project::remote(
1440            project_id,
1441            client_b.clone(),
1442            client_b.user_store.clone(),
1443            lang_registry.clone(),
1444            fs.clone(),
1445            &mut cx_b.to_async(),
1446        )
1447        .await
1448        .unwrap();
1449        project_b
1450            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1451            .await
1452            .unwrap();
1453
1454        // Unshare the project as client A
1455        project_a
1456            .update(&mut cx_a, |project, cx| project.unshare(cx))
1457            .await
1458            .unwrap();
1459        project_b
1460            .condition(&mut cx_b, |project, _| project.is_read_only())
1461            .await;
1462        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1463        drop(project_b);
1464
1465        // Share the project again and ensure guests can still join.
1466        project_a
1467            .update(&mut cx_a, |project, cx| project.share(cx))
1468            .await
1469            .unwrap();
1470        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1471
1472        let project_c = Project::remote(
1473            project_id,
1474            client_b.clone(),
1475            client_b.user_store.clone(),
1476            lang_registry.clone(),
1477            fs.clone(),
1478            &mut cx_b.to_async(),
1479        )
1480        .await
1481        .unwrap();
1482        project_c
1483            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1484            .await
1485            .unwrap();
1486    }
1487
1488    #[gpui::test(iterations = 10)]
1489    async fn test_propagate_saves_and_fs_changes(
1490        mut cx_a: TestAppContext,
1491        mut cx_b: TestAppContext,
1492        mut cx_c: TestAppContext,
1493    ) {
1494        let lang_registry = Arc::new(LanguageRegistry::new());
1495        let fs = Arc::new(FakeFs::new(cx_a.background()));
1496        cx_a.foreground().forbid_parking();
1497
1498        // Connect to a server as 3 clients.
1499        let mut server = TestServer::start(cx_a.foreground()).await;
1500        let client_a = server.create_client(&mut cx_a, "user_a").await;
1501        let client_b = server.create_client(&mut cx_b, "user_b").await;
1502        let client_c = server.create_client(&mut cx_c, "user_c").await;
1503
1504        // Share a worktree as client A.
1505        fs.insert_tree(
1506            "/a",
1507            json!({
1508                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1509                "file1": "",
1510                "file2": ""
1511            }),
1512        )
1513        .await;
1514        let project_a = cx_a.update(|cx| {
1515            Project::local(
1516                client_a.clone(),
1517                client_a.user_store.clone(),
1518                lang_registry.clone(),
1519                fs.clone(),
1520                cx,
1521            )
1522        });
1523        let (worktree_a, _) = project_a
1524            .update(&mut cx_a, |p, cx| {
1525                p.find_or_create_local_worktree("/a", false, cx)
1526            })
1527            .await
1528            .unwrap();
1529        worktree_a
1530            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1531            .await;
1532        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1533        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1534        project_a
1535            .update(&mut cx_a, |p, cx| p.share(cx))
1536            .await
1537            .unwrap();
1538
1539        // Join that worktree as clients B and C.
1540        let project_b = Project::remote(
1541            project_id,
1542            client_b.clone(),
1543            client_b.user_store.clone(),
1544            lang_registry.clone(),
1545            fs.clone(),
1546            &mut cx_b.to_async(),
1547        )
1548        .await
1549        .unwrap();
1550        let project_c = Project::remote(
1551            project_id,
1552            client_c.clone(),
1553            client_c.user_store.clone(),
1554            lang_registry.clone(),
1555            fs.clone(),
1556            &mut cx_c.to_async(),
1557        )
1558        .await
1559        .unwrap();
1560        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1561        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1562
1563        // Open and edit a buffer as both guests B and C.
1564        let buffer_b = project_b
1565            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1566            .await
1567            .unwrap();
1568        let buffer_c = project_c
1569            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1570            .await
1571            .unwrap();
1572        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1573        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1574
1575        // Open and edit that buffer as the host.
1576        let buffer_a = project_a
1577            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1578            .await
1579            .unwrap();
1580
1581        buffer_a
1582            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1583            .await;
1584        buffer_a.update(&mut cx_a, |buf, cx| {
1585            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1586        });
1587
1588        // Wait for edits to propagate
1589        buffer_a
1590            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1591            .await;
1592        buffer_b
1593            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1594            .await;
1595        buffer_c
1596            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1597            .await;
1598
1599        // Edit the buffer as the host and concurrently save as guest B.
1600        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1601        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1602        save_b.await.unwrap();
1603        assert_eq!(
1604            fs.load("/a/file1".as_ref()).await.unwrap(),
1605            "hi-a, i-am-c, i-am-b, i-am-a"
1606        );
1607        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1608        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1609        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1610
1611        // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise
1612        // when interpreting the change event it will mistakenly think that the file has been
1613        // deleted (because its path has changed) and will subsequently fail to detect the rename.
1614        worktree_a.flush_fs_events(&cx_a).await;
1615
1616        // Make changes on host's file system, see those changes on guest worktrees.
1617        fs.rename(
1618            "/a/file1".as_ref(),
1619            "/a/file1-renamed".as_ref(),
1620            Default::default(),
1621        )
1622        .await
1623        .unwrap();
1624        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1625            .await
1626            .unwrap();
1627        fs.insert_file(Path::new("/a/file4"), "4".into())
1628            .await
1629            .unwrap();
1630
1631        worktree_a
1632            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1633            .await;
1634        worktree_b
1635            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1636            .await;
1637        worktree_c
1638            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1639            .await;
1640        worktree_a.read_with(&cx_a, |tree, _| {
1641            assert_eq!(
1642                tree.paths()
1643                    .map(|p| p.to_string_lossy())
1644                    .collect::<Vec<_>>(),
1645                &[".zed.toml", "file1-renamed", "file3", "file4"]
1646            )
1647        });
1648        worktree_b.read_with(&cx_b, |tree, _| {
1649            assert_eq!(
1650                tree.paths()
1651                    .map(|p| p.to_string_lossy())
1652                    .collect::<Vec<_>>(),
1653                &[".zed.toml", "file1-renamed", "file3", "file4"]
1654            )
1655        });
1656        worktree_c.read_with(&cx_c, |tree, _| {
1657            assert_eq!(
1658                tree.paths()
1659                    .map(|p| p.to_string_lossy())
1660                    .collect::<Vec<_>>(),
1661                &[".zed.toml", "file1-renamed", "file3", "file4"]
1662            )
1663        });
1664
1665        // Ensure buffer files are updated as well.
1666        buffer_a
1667            .condition(&cx_a, |buf, _| {
1668                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1669            })
1670            .await;
1671        buffer_b
1672            .condition(&cx_b, |buf, _| {
1673                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1674            })
1675            .await;
1676        buffer_c
1677            .condition(&cx_c, |buf, _| {
1678                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1679            })
1680            .await;
1681    }
1682
1683    #[gpui::test(iterations = 10)]
1684    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1685        cx_a.foreground().forbid_parking();
1686        let lang_registry = Arc::new(LanguageRegistry::new());
1687        let fs = Arc::new(FakeFs::new(cx_a.background()));
1688
1689        // Connect to a server as 2 clients.
1690        let mut server = TestServer::start(cx_a.foreground()).await;
1691        let client_a = server.create_client(&mut cx_a, "user_a").await;
1692        let client_b = server.create_client(&mut cx_b, "user_b").await;
1693
1694        // Share a project as client A
1695        fs.insert_tree(
1696            "/dir",
1697            json!({
1698                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1699                "a.txt": "a-contents",
1700            }),
1701        )
1702        .await;
1703
1704        let project_a = cx_a.update(|cx| {
1705            Project::local(
1706                client_a.clone(),
1707                client_a.user_store.clone(),
1708                lang_registry.clone(),
1709                fs.clone(),
1710                cx,
1711            )
1712        });
1713        let (worktree_a, _) = project_a
1714            .update(&mut cx_a, |p, cx| {
1715                p.find_or_create_local_worktree("/dir", false, cx)
1716            })
1717            .await
1718            .unwrap();
1719        worktree_a
1720            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1721            .await;
1722        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1723        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1724        project_a
1725            .update(&mut cx_a, |p, cx| p.share(cx))
1726            .await
1727            .unwrap();
1728
1729        // Join that project as client B
1730        let project_b = Project::remote(
1731            project_id,
1732            client_b.clone(),
1733            client_b.user_store.clone(),
1734            lang_registry.clone(),
1735            fs.clone(),
1736            &mut cx_b.to_async(),
1737        )
1738        .await
1739        .unwrap();
1740        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1741
1742        // Open a buffer as client B
1743        let buffer_b = project_b
1744            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1745            .await
1746            .unwrap();
1747        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1748
1749        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1750        buffer_b.read_with(&cx_b, |buf, _| {
1751            assert!(buf.is_dirty());
1752            assert!(!buf.has_conflict());
1753        });
1754
1755        buffer_b
1756            .update(&mut cx_b, |buf, cx| buf.save(cx))
1757            .await
1758            .unwrap();
1759        worktree_b
1760            .condition(&cx_b, |_, cx| {
1761                buffer_b.read(cx).file().unwrap().mtime() != mtime
1762            })
1763            .await;
1764        buffer_b.read_with(&cx_b, |buf, _| {
1765            assert!(!buf.is_dirty());
1766            assert!(!buf.has_conflict());
1767        });
1768
1769        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1770        buffer_b.read_with(&cx_b, |buf, _| {
1771            assert!(buf.is_dirty());
1772            assert!(!buf.has_conflict());
1773        });
1774    }
1775
1776    #[gpui::test(iterations = 10)]
1777    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1778        cx_a.foreground().forbid_parking();
1779        let lang_registry = Arc::new(LanguageRegistry::new());
1780        let fs = Arc::new(FakeFs::new(cx_a.background()));
1781
1782        // Connect to a server as 2 clients.
1783        let mut server = TestServer::start(cx_a.foreground()).await;
1784        let client_a = server.create_client(&mut cx_a, "user_a").await;
1785        let client_b = server.create_client(&mut cx_b, "user_b").await;
1786
1787        // Share a project as client A
1788        fs.insert_tree(
1789            "/dir",
1790            json!({
1791                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1792                "a.txt": "a-contents",
1793            }),
1794        )
1795        .await;
1796
1797        let project_a = cx_a.update(|cx| {
1798            Project::local(
1799                client_a.clone(),
1800                client_a.user_store.clone(),
1801                lang_registry.clone(),
1802                fs.clone(),
1803                cx,
1804            )
1805        });
1806        let (worktree_a, _) = project_a
1807            .update(&mut cx_a, |p, cx| {
1808                p.find_or_create_local_worktree("/dir", false, cx)
1809            })
1810            .await
1811            .unwrap();
1812        worktree_a
1813            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1814            .await;
1815        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1816        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1817        project_a
1818            .update(&mut cx_a, |p, cx| p.share(cx))
1819            .await
1820            .unwrap();
1821
1822        // Join that project as client B
1823        let project_b = Project::remote(
1824            project_id,
1825            client_b.clone(),
1826            client_b.user_store.clone(),
1827            lang_registry.clone(),
1828            fs.clone(),
1829            &mut cx_b.to_async(),
1830        )
1831        .await
1832        .unwrap();
1833        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1834
1835        // Open a buffer as client B
1836        let buffer_b = project_b
1837            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1838            .await
1839            .unwrap();
1840        buffer_b.read_with(&cx_b, |buf, _| {
1841            assert!(!buf.is_dirty());
1842            assert!(!buf.has_conflict());
1843        });
1844
1845        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1846            .await
1847            .unwrap();
1848        buffer_b
1849            .condition(&cx_b, |buf, _| {
1850                buf.text() == "new contents" && !buf.is_dirty()
1851            })
1852            .await;
1853        buffer_b.read_with(&cx_b, |buf, _| {
1854            assert!(!buf.has_conflict());
1855        });
1856    }
1857
1858    #[gpui::test(iterations = 100)]
1859    async fn test_editing_while_guest_opens_buffer(
1860        mut cx_a: TestAppContext,
1861        mut cx_b: TestAppContext,
1862    ) {
1863        cx_a.foreground().forbid_parking();
1864        let lang_registry = Arc::new(LanguageRegistry::new());
1865        let fs = Arc::new(FakeFs::new(cx_a.background()));
1866
1867        // Connect to a server as 2 clients.
1868        let mut server = TestServer::start(cx_a.foreground()).await;
1869        let client_a = server.create_client(&mut cx_a, "user_a").await;
1870        let client_b = server.create_client(&mut cx_b, "user_b").await;
1871
1872        // Share a project as client A
1873        fs.insert_tree(
1874            "/dir",
1875            json!({
1876                ".zed.toml": r#"collaborators = ["user_b"]"#,
1877                "a.txt": "a-contents",
1878            }),
1879        )
1880        .await;
1881        let project_a = cx_a.update(|cx| {
1882            Project::local(
1883                client_a.clone(),
1884                client_a.user_store.clone(),
1885                lang_registry.clone(),
1886                fs.clone(),
1887                cx,
1888            )
1889        });
1890        let (worktree_a, _) = project_a
1891            .update(&mut cx_a, |p, cx| {
1892                p.find_or_create_local_worktree("/dir", false, cx)
1893            })
1894            .await
1895            .unwrap();
1896        worktree_a
1897            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1898            .await;
1899        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1900        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1901        project_a
1902            .update(&mut cx_a, |p, cx| p.share(cx))
1903            .await
1904            .unwrap();
1905
1906        // Join that project as client B
1907        let project_b = Project::remote(
1908            project_id,
1909            client_b.clone(),
1910            client_b.user_store.clone(),
1911            lang_registry.clone(),
1912            fs.clone(),
1913            &mut cx_b.to_async(),
1914        )
1915        .await
1916        .unwrap();
1917
1918        // Open a buffer as client A
1919        let buffer_a = project_a
1920            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1921            .await
1922            .unwrap();
1923
1924        // Start opening the same buffer as client B
1925        let buffer_b = cx_b
1926            .background()
1927            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1928        task::yield_now().await;
1929
1930        // Edit the buffer as client A while client B is still opening it.
1931        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1932
1933        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1934        let buffer_b = buffer_b.await.unwrap();
1935        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1936    }
1937
1938    #[gpui::test(iterations = 10)]
1939    async fn test_leaving_worktree_while_opening_buffer(
1940        mut cx_a: TestAppContext,
1941        mut cx_b: TestAppContext,
1942    ) {
1943        cx_a.foreground().forbid_parking();
1944        let lang_registry = Arc::new(LanguageRegistry::new());
1945        let fs = Arc::new(FakeFs::new(cx_a.background()));
1946
1947        // Connect to a server as 2 clients.
1948        let mut server = TestServer::start(cx_a.foreground()).await;
1949        let client_a = server.create_client(&mut cx_a, "user_a").await;
1950        let client_b = server.create_client(&mut cx_b, "user_b").await;
1951
1952        // Share a project as client A
1953        fs.insert_tree(
1954            "/dir",
1955            json!({
1956                ".zed.toml": r#"collaborators = ["user_b"]"#,
1957                "a.txt": "a-contents",
1958            }),
1959        )
1960        .await;
1961        let project_a = cx_a.update(|cx| {
1962            Project::local(
1963                client_a.clone(),
1964                client_a.user_store.clone(),
1965                lang_registry.clone(),
1966                fs.clone(),
1967                cx,
1968            )
1969        });
1970        let (worktree_a, _) = project_a
1971            .update(&mut cx_a, |p, cx| {
1972                p.find_or_create_local_worktree("/dir", false, cx)
1973            })
1974            .await
1975            .unwrap();
1976        worktree_a
1977            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1978            .await;
1979        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1980        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1981        project_a
1982            .update(&mut cx_a, |p, cx| p.share(cx))
1983            .await
1984            .unwrap();
1985
1986        // Join that project as client B
1987        let project_b = Project::remote(
1988            project_id,
1989            client_b.clone(),
1990            client_b.user_store.clone(),
1991            lang_registry.clone(),
1992            fs.clone(),
1993            &mut cx_b.to_async(),
1994        )
1995        .await
1996        .unwrap();
1997
1998        // See that a guest has joined as client A.
1999        project_a
2000            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2001            .await;
2002
2003        // Begin opening a buffer as client B, but leave the project before the open completes.
2004        let buffer_b = cx_b
2005            .background()
2006            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2007        cx_b.update(|_| drop(project_b));
2008        drop(buffer_b);
2009
2010        // See that the guest has left.
2011        project_a
2012            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2013            .await;
2014    }
2015
2016    #[gpui::test(iterations = 10)]
2017    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2018        cx_a.foreground().forbid_parking();
2019        let lang_registry = Arc::new(LanguageRegistry::new());
2020        let fs = Arc::new(FakeFs::new(cx_a.background()));
2021
2022        // Connect to a server as 2 clients.
2023        let mut server = TestServer::start(cx_a.foreground()).await;
2024        let client_a = server.create_client(&mut cx_a, "user_a").await;
2025        let client_b = server.create_client(&mut cx_b, "user_b").await;
2026
2027        // Share a project as client A
2028        fs.insert_tree(
2029            "/a",
2030            json!({
2031                ".zed.toml": r#"collaborators = ["user_b"]"#,
2032                "a.txt": "a-contents",
2033                "b.txt": "b-contents",
2034            }),
2035        )
2036        .await;
2037        let project_a = cx_a.update(|cx| {
2038            Project::local(
2039                client_a.clone(),
2040                client_a.user_store.clone(),
2041                lang_registry.clone(),
2042                fs.clone(),
2043                cx,
2044            )
2045        });
2046        let (worktree_a, _) = project_a
2047            .update(&mut cx_a, |p, cx| {
2048                p.find_or_create_local_worktree("/a", false, cx)
2049            })
2050            .await
2051            .unwrap();
2052        worktree_a
2053            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2054            .await;
2055        let project_id = project_a
2056            .update(&mut cx_a, |project, _| project.next_remote_id())
2057            .await;
2058        project_a
2059            .update(&mut cx_a, |project, cx| project.share(cx))
2060            .await
2061            .unwrap();
2062
2063        // Join that project as client B
2064        let _project_b = Project::remote(
2065            project_id,
2066            client_b.clone(),
2067            client_b.user_store.clone(),
2068            lang_registry.clone(),
2069            fs.clone(),
2070            &mut cx_b.to_async(),
2071        )
2072        .await
2073        .unwrap();
2074
2075        // See that a guest has joined as client A.
2076        project_a
2077            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2078            .await;
2079
2080        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2081        client_b.disconnect(&cx_b.to_async()).unwrap();
2082        project_a
2083            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2084            .await;
2085    }
2086
2087    #[gpui::test(iterations = 10)]
2088    async fn test_collaborating_with_diagnostics(
2089        mut cx_a: TestAppContext,
2090        mut cx_b: TestAppContext,
2091    ) {
2092        cx_a.foreground().forbid_parking();
2093        let mut lang_registry = Arc::new(LanguageRegistry::new());
2094        let fs = Arc::new(FakeFs::new(cx_a.background()));
2095
2096        // Set up a fake language server.
2097        let (language_server_config, mut fake_language_server) =
2098            LanguageServerConfig::fake(&cx_a).await;
2099        Arc::get_mut(&mut lang_registry)
2100            .unwrap()
2101            .add(Arc::new(Language::new(
2102                LanguageConfig {
2103                    name: "Rust".to_string(),
2104                    path_suffixes: vec!["rs".to_string()],
2105                    language_server: Some(language_server_config),
2106                    ..Default::default()
2107                },
2108                Some(tree_sitter_rust::language()),
2109            )));
2110
2111        // Connect to a server as 2 clients.
2112        let mut server = TestServer::start(cx_a.foreground()).await;
2113        let client_a = server.create_client(&mut cx_a, "user_a").await;
2114        let client_b = server.create_client(&mut cx_b, "user_b").await;
2115
2116        // Share a project as client A
2117        fs.insert_tree(
2118            "/a",
2119            json!({
2120                ".zed.toml": r#"collaborators = ["user_b"]"#,
2121                "a.rs": "let one = two",
2122                "other.rs": "",
2123            }),
2124        )
2125        .await;
2126        let project_a = cx_a.update(|cx| {
2127            Project::local(
2128                client_a.clone(),
2129                client_a.user_store.clone(),
2130                lang_registry.clone(),
2131                fs.clone(),
2132                cx,
2133            )
2134        });
2135        let (worktree_a, _) = project_a
2136            .update(&mut cx_a, |p, cx| {
2137                p.find_or_create_local_worktree("/a", false, cx)
2138            })
2139            .await
2140            .unwrap();
2141        worktree_a
2142            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2143            .await;
2144        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2145        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2146        project_a
2147            .update(&mut cx_a, |p, cx| p.share(cx))
2148            .await
2149            .unwrap();
2150
2151        // Cause the language server to start.
2152        let _ = cx_a
2153            .background()
2154            .spawn(project_a.update(&mut cx_a, |project, cx| {
2155                project.open_buffer(
2156                    ProjectPath {
2157                        worktree_id,
2158                        path: Path::new("other.rs").into(),
2159                    },
2160                    cx,
2161                )
2162            }))
2163            .await
2164            .unwrap();
2165
2166        // Simulate a language server reporting errors for a file.
2167        fake_language_server
2168            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2169                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2170                version: None,
2171                diagnostics: vec![lsp::Diagnostic {
2172                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2173                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2174                    message: "message 1".to_string(),
2175                    ..Default::default()
2176                }],
2177            })
2178            .await;
2179
2180        // Wait for server to see the diagnostics update.
2181        server
2182            .condition(|store| {
2183                let worktree = store
2184                    .project(project_id)
2185                    .unwrap()
2186                    .worktrees
2187                    .get(&worktree_id.to_proto())
2188                    .unwrap();
2189
2190                !worktree
2191                    .share
2192                    .as_ref()
2193                    .unwrap()
2194                    .diagnostic_summaries
2195                    .is_empty()
2196            })
2197            .await;
2198
2199        // Join the worktree as client B.
2200        let project_b = Project::remote(
2201            project_id,
2202            client_b.clone(),
2203            client_b.user_store.clone(),
2204            lang_registry.clone(),
2205            fs.clone(),
2206            &mut cx_b.to_async(),
2207        )
2208        .await
2209        .unwrap();
2210
2211        project_b.read_with(&cx_b, |project, cx| {
2212            assert_eq!(
2213                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2214                &[(
2215                    ProjectPath {
2216                        worktree_id,
2217                        path: Arc::from(Path::new("a.rs")),
2218                    },
2219                    DiagnosticSummary {
2220                        error_count: 1,
2221                        warning_count: 0,
2222                        ..Default::default()
2223                    },
2224                )]
2225            )
2226        });
2227
2228        // Simulate a language server reporting more errors for a file.
2229        fake_language_server
2230            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2231                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2232                version: None,
2233                diagnostics: vec![
2234                    lsp::Diagnostic {
2235                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2236                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2237                        message: "message 1".to_string(),
2238                        ..Default::default()
2239                    },
2240                    lsp::Diagnostic {
2241                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2242                        range: lsp::Range::new(
2243                            lsp::Position::new(0, 10),
2244                            lsp::Position::new(0, 13),
2245                        ),
2246                        message: "message 2".to_string(),
2247                        ..Default::default()
2248                    },
2249                ],
2250            })
2251            .await;
2252
2253        // Client b gets the updated summaries
2254        project_b
2255            .condition(&cx_b, |project, cx| {
2256                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2257                    == &[(
2258                        ProjectPath {
2259                            worktree_id,
2260                            path: Arc::from(Path::new("a.rs")),
2261                        },
2262                        DiagnosticSummary {
2263                            error_count: 1,
2264                            warning_count: 1,
2265                            ..Default::default()
2266                        },
2267                    )]
2268            })
2269            .await;
2270
2271        // Open the file with the errors on client B. They should be present.
2272        let buffer_b = cx_b
2273            .background()
2274            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2275            .await
2276            .unwrap();
2277
2278        buffer_b.read_with(&cx_b, |buffer, _| {
2279            assert_eq!(
2280                buffer
2281                    .snapshot()
2282                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2283                    .map(|entry| entry)
2284                    .collect::<Vec<_>>(),
2285                &[
2286                    DiagnosticEntry {
2287                        range: Point::new(0, 4)..Point::new(0, 7),
2288                        diagnostic: Diagnostic {
2289                            group_id: 0,
2290                            message: "message 1".to_string(),
2291                            severity: lsp::DiagnosticSeverity::ERROR,
2292                            is_primary: true,
2293                            ..Default::default()
2294                        }
2295                    },
2296                    DiagnosticEntry {
2297                        range: Point::new(0, 10)..Point::new(0, 13),
2298                        diagnostic: Diagnostic {
2299                            group_id: 1,
2300                            severity: lsp::DiagnosticSeverity::WARNING,
2301                            message: "message 2".to_string(),
2302                            is_primary: true,
2303                            ..Default::default()
2304                        }
2305                    }
2306                ]
2307            );
2308        });
2309    }
2310
2311    #[gpui::test(iterations = 10)]
2312    async fn test_collaborating_with_completion(
2313        mut cx_a: TestAppContext,
2314        mut cx_b: TestAppContext,
2315    ) {
2316        cx_a.foreground().forbid_parking();
2317        let mut lang_registry = Arc::new(LanguageRegistry::new());
2318        let fs = Arc::new(FakeFs::new(cx_a.background()));
2319
2320        // Set up a fake language server.
2321        let (language_server_config, mut fake_language_server) =
2322            LanguageServerConfig::fake_with_capabilities(
2323                lsp::ServerCapabilities {
2324                    completion_provider: Some(lsp::CompletionOptions {
2325                        trigger_characters: Some(vec![".".to_string()]),
2326                        ..Default::default()
2327                    }),
2328                    ..Default::default()
2329                },
2330                &cx_a,
2331            )
2332            .await;
2333        Arc::get_mut(&mut lang_registry)
2334            .unwrap()
2335            .add(Arc::new(Language::new(
2336                LanguageConfig {
2337                    name: "Rust".to_string(),
2338                    path_suffixes: vec!["rs".to_string()],
2339                    language_server: Some(language_server_config),
2340                    ..Default::default()
2341                },
2342                Some(tree_sitter_rust::language()),
2343            )));
2344
2345        // Connect to a server as 2 clients.
2346        let mut server = TestServer::start(cx_a.foreground()).await;
2347        let client_a = server.create_client(&mut cx_a, "user_a").await;
2348        let client_b = server.create_client(&mut cx_b, "user_b").await;
2349
2350        // Share a project as client A
2351        fs.insert_tree(
2352            "/a",
2353            json!({
2354                ".zed.toml": r#"collaborators = ["user_b"]"#,
2355                "main.rs": "fn main() { a }",
2356                "other.rs": "",
2357            }),
2358        )
2359        .await;
2360        let project_a = cx_a.update(|cx| {
2361            Project::local(
2362                client_a.clone(),
2363                client_a.user_store.clone(),
2364                lang_registry.clone(),
2365                fs.clone(),
2366                cx,
2367            )
2368        });
2369        let (worktree_a, _) = project_a
2370            .update(&mut cx_a, |p, cx| {
2371                p.find_or_create_local_worktree("/a", false, cx)
2372            })
2373            .await
2374            .unwrap();
2375        worktree_a
2376            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2377            .await;
2378        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2379        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2380        project_a
2381            .update(&mut cx_a, |p, cx| p.share(cx))
2382            .await
2383            .unwrap();
2384
2385        // Join the worktree as client B.
2386        let project_b = Project::remote(
2387            project_id,
2388            client_b.clone(),
2389            client_b.user_store.clone(),
2390            lang_registry.clone(),
2391            fs.clone(),
2392            &mut cx_b.to_async(),
2393        )
2394        .await
2395        .unwrap();
2396
2397        // Open a file in an editor as the guest.
2398        let buffer_b = project_b
2399            .update(&mut cx_b, |p, cx| {
2400                p.open_buffer((worktree_id, "main.rs"), cx)
2401            })
2402            .await
2403            .unwrap();
2404        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2405        let editor_b = cx_b.add_view(window_b, |cx| {
2406            Editor::for_buffer(
2407                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2408                Arc::new(|cx| EditorSettings::test(cx)),
2409                Some(project_b.clone()),
2410                cx,
2411            )
2412        });
2413
2414        // Type a completion trigger character as the guest.
2415        editor_b.update(&mut cx_b, |editor, cx| {
2416            editor.select_ranges([13..13], None, cx);
2417            editor.handle_input(&Input(".".into()), cx);
2418            cx.focus(&editor_b);
2419        });
2420
2421        // Receive a completion request as the host's language server.
2422        // Return some completions from the host's language server.
2423        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2424            assert_eq!(
2425                params.text_document_position.text_document.uri,
2426                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2427            );
2428            assert_eq!(
2429                params.text_document_position.position,
2430                lsp::Position::new(0, 14),
2431            );
2432
2433            Some(lsp::CompletionResponse::Array(vec![
2434                lsp::CompletionItem {
2435                    label: "first_method(…)".into(),
2436                    detail: Some("fn(&mut self, B) -> C".into()),
2437                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2438                        new_text: "first_method($1)".to_string(),
2439                        range: lsp::Range::new(
2440                            lsp::Position::new(0, 14),
2441                            lsp::Position::new(0, 14),
2442                        ),
2443                    })),
2444                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2445                    ..Default::default()
2446                },
2447                lsp::CompletionItem {
2448                    label: "second_method(…)".into(),
2449                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2450                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2451                        new_text: "second_method()".to_string(),
2452                        range: lsp::Range::new(
2453                            lsp::Position::new(0, 14),
2454                            lsp::Position::new(0, 14),
2455                        ),
2456                    })),
2457                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2458                    ..Default::default()
2459                },
2460            ]))
2461        });
2462
2463        // Open the buffer on the host.
2464        let buffer_a = project_a
2465            .update(&mut cx_a, |p, cx| {
2466                p.open_buffer((worktree_id, "main.rs"), cx)
2467            })
2468            .await
2469            .unwrap();
2470        buffer_a
2471            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2472            .await;
2473
2474        // Confirm a completion on the guest.
2475        editor_b.next_notification(&cx_b).await;
2476        editor_b.update(&mut cx_b, |editor, cx| {
2477            assert!(editor.context_menu_visible());
2478            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2479            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2480        });
2481
2482        // Return a resolved completion from the host's language server.
2483        // The resolved completion has an additional text edit.
2484        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2485            assert_eq!(params.label, "first_method(…)");
2486            lsp::CompletionItem {
2487                label: "first_method(…)".into(),
2488                detail: Some("fn(&mut self, B) -> C".into()),
2489                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2490                    new_text: "first_method($1)".to_string(),
2491                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2492                })),
2493                additional_text_edits: Some(vec![lsp::TextEdit {
2494                    new_text: "use d::SomeTrait;\n".to_string(),
2495                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2496                }]),
2497                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2498                ..Default::default()
2499            }
2500        });
2501
2502        buffer_a
2503            .condition(&cx_a, |buffer, _| {
2504                buffer.text() == "fn main() { a.first_method() }"
2505            })
2506            .await;
2507
2508        // The additional edit is applied.
2509        buffer_b
2510            .condition(&cx_b, |buffer, _| {
2511                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2512            })
2513            .await;
2514        assert_eq!(
2515            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2516            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2517        );
2518    }
2519
2520    #[gpui::test(iterations = 10)]
2521    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2522        cx_a.foreground().forbid_parking();
2523        let mut lang_registry = Arc::new(LanguageRegistry::new());
2524        let fs = Arc::new(FakeFs::new(cx_a.background()));
2525
2526        // Set up a fake language server.
2527        let (language_server_config, mut fake_language_server) =
2528            LanguageServerConfig::fake(&cx_a).await;
2529        Arc::get_mut(&mut lang_registry)
2530            .unwrap()
2531            .add(Arc::new(Language::new(
2532                LanguageConfig {
2533                    name: "Rust".to_string(),
2534                    path_suffixes: vec!["rs".to_string()],
2535                    language_server: Some(language_server_config),
2536                    ..Default::default()
2537                },
2538                Some(tree_sitter_rust::language()),
2539            )));
2540
2541        // Connect to a server as 2 clients.
2542        let mut server = TestServer::start(cx_a.foreground()).await;
2543        let client_a = server.create_client(&mut cx_a, "user_a").await;
2544        let client_b = server.create_client(&mut cx_b, "user_b").await;
2545
2546        // Share a project as client A
2547        fs.insert_tree(
2548            "/a",
2549            json!({
2550                ".zed.toml": r#"collaborators = ["user_b"]"#,
2551                "a.rs": "let one = two",
2552            }),
2553        )
2554        .await;
2555        let project_a = cx_a.update(|cx| {
2556            Project::local(
2557                client_a.clone(),
2558                client_a.user_store.clone(),
2559                lang_registry.clone(),
2560                fs.clone(),
2561                cx,
2562            )
2563        });
2564        let (worktree_a, _) = project_a
2565            .update(&mut cx_a, |p, cx| {
2566                p.find_or_create_local_worktree("/a", false, cx)
2567            })
2568            .await
2569            .unwrap();
2570        worktree_a
2571            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2572            .await;
2573        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2574        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2575        project_a
2576            .update(&mut cx_a, |p, cx| p.share(cx))
2577            .await
2578            .unwrap();
2579
2580        // Join the worktree as client B.
2581        let project_b = Project::remote(
2582            project_id,
2583            client_b.clone(),
2584            client_b.user_store.clone(),
2585            lang_registry.clone(),
2586            fs.clone(),
2587            &mut cx_b.to_async(),
2588        )
2589        .await
2590        .unwrap();
2591
2592        let buffer_b = cx_b
2593            .background()
2594            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2595            .await
2596            .unwrap();
2597
2598        let format = project_b.update(&mut cx_b, |project, cx| {
2599            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2600        });
2601
2602        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2603            Some(vec![
2604                lsp::TextEdit {
2605                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2606                    new_text: "h".to_string(),
2607                },
2608                lsp::TextEdit {
2609                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2610                    new_text: "y".to_string(),
2611                },
2612            ])
2613        });
2614
2615        format.await.unwrap();
2616        assert_eq!(
2617            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2618            "let honey = two"
2619        );
2620    }
2621
2622    #[gpui::test(iterations = 10)]
2623    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2624        cx_a.foreground().forbid_parking();
2625        let mut lang_registry = Arc::new(LanguageRegistry::new());
2626        let fs = Arc::new(FakeFs::new(cx_a.background()));
2627        fs.insert_tree(
2628            "/root-1",
2629            json!({
2630                ".zed.toml": r#"collaborators = ["user_b"]"#,
2631                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2632            }),
2633        )
2634        .await;
2635        fs.insert_tree(
2636            "/root-2",
2637            json!({
2638                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2639            }),
2640        )
2641        .await;
2642
2643        // Set up a fake language server.
2644        let (language_server_config, mut fake_language_server) =
2645            LanguageServerConfig::fake(&cx_a).await;
2646        Arc::get_mut(&mut lang_registry)
2647            .unwrap()
2648            .add(Arc::new(Language::new(
2649                LanguageConfig {
2650                    name: "Rust".to_string(),
2651                    path_suffixes: vec!["rs".to_string()],
2652                    language_server: Some(language_server_config),
2653                    ..Default::default()
2654                },
2655                Some(tree_sitter_rust::language()),
2656            )));
2657
2658        // Connect to a server as 2 clients.
2659        let mut server = TestServer::start(cx_a.foreground()).await;
2660        let client_a = server.create_client(&mut cx_a, "user_a").await;
2661        let client_b = server.create_client(&mut cx_b, "user_b").await;
2662
2663        // Share a project as client A
2664        let project_a = cx_a.update(|cx| {
2665            Project::local(
2666                client_a.clone(),
2667                client_a.user_store.clone(),
2668                lang_registry.clone(),
2669                fs.clone(),
2670                cx,
2671            )
2672        });
2673        let (worktree_a, _) = project_a
2674            .update(&mut cx_a, |p, cx| {
2675                p.find_or_create_local_worktree("/root-1", false, cx)
2676            })
2677            .await
2678            .unwrap();
2679        worktree_a
2680            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2681            .await;
2682        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2683        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2684        project_a
2685            .update(&mut cx_a, |p, cx| p.share(cx))
2686            .await
2687            .unwrap();
2688
2689        // Join the worktree as client B.
2690        let project_b = Project::remote(
2691            project_id,
2692            client_b.clone(),
2693            client_b.user_store.clone(),
2694            lang_registry.clone(),
2695            fs.clone(),
2696            &mut cx_b.to_async(),
2697        )
2698        .await
2699        .unwrap();
2700
2701        // Open the file on client B.
2702        let buffer_b = cx_b
2703            .background()
2704            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2705            .await
2706            .unwrap();
2707
2708        // Request the definition of a symbol as the guest.
2709        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2710        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2711            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2712                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2713                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2714            )))
2715        });
2716
2717        let definitions_1 = definitions_1.await.unwrap();
2718        cx_b.read(|cx| {
2719            assert_eq!(definitions_1.len(), 1);
2720            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2721            let target_buffer = definitions_1[0].target_buffer.read(cx);
2722            assert_eq!(
2723                target_buffer.text(),
2724                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2725            );
2726            assert_eq!(
2727                definitions_1[0].target_range.to_point(target_buffer),
2728                Point::new(0, 6)..Point::new(0, 9)
2729            );
2730        });
2731
2732        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2733        // the previous call to `definition`.
2734        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2735        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2736            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2737                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2738                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2739            )))
2740        });
2741
2742        let definitions_2 = definitions_2.await.unwrap();
2743        cx_b.read(|cx| {
2744            assert_eq!(definitions_2.len(), 1);
2745            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2746            let target_buffer = definitions_2[0].target_buffer.read(cx);
2747            assert_eq!(
2748                target_buffer.text(),
2749                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2750            );
2751            assert_eq!(
2752                definitions_2[0].target_range.to_point(target_buffer),
2753                Point::new(1, 6)..Point::new(1, 11)
2754            );
2755        });
2756        assert_eq!(
2757            definitions_1[0].target_buffer,
2758            definitions_2[0].target_buffer
2759        );
2760
2761        cx_b.update(|_| {
2762            drop(definitions_1);
2763            drop(definitions_2);
2764        });
2765        project_b
2766            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2767            .await;
2768    }
2769
2770    #[gpui::test(iterations = 10)]
2771    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2772        mut cx_a: TestAppContext,
2773        mut cx_b: TestAppContext,
2774        mut rng: StdRng,
2775    ) {
2776        cx_a.foreground().forbid_parking();
2777        let mut lang_registry = Arc::new(LanguageRegistry::new());
2778        let fs = Arc::new(FakeFs::new(cx_a.background()));
2779        fs.insert_tree(
2780            "/root",
2781            json!({
2782                ".zed.toml": r#"collaborators = ["user_b"]"#,
2783                "a.rs": "const ONE: usize = b::TWO;",
2784                "b.rs": "const TWO: usize = 2",
2785            }),
2786        )
2787        .await;
2788
2789        // Set up a fake language server.
2790        let (language_server_config, mut fake_language_server) =
2791            LanguageServerConfig::fake(&cx_a).await;
2792        Arc::get_mut(&mut lang_registry)
2793            .unwrap()
2794            .add(Arc::new(Language::new(
2795                LanguageConfig {
2796                    name: "Rust".to_string(),
2797                    path_suffixes: vec!["rs".to_string()],
2798                    language_server: Some(language_server_config),
2799                    ..Default::default()
2800                },
2801                Some(tree_sitter_rust::language()),
2802            )));
2803
2804        // Connect to a server as 2 clients.
2805        let mut server = TestServer::start(cx_a.foreground()).await;
2806        let client_a = server.create_client(&mut cx_a, "user_a").await;
2807        let client_b = server.create_client(&mut cx_b, "user_b").await;
2808
2809        // Share a project as client A
2810        let project_a = cx_a.update(|cx| {
2811            Project::local(
2812                client_a.clone(),
2813                client_a.user_store.clone(),
2814                lang_registry.clone(),
2815                fs.clone(),
2816                cx,
2817            )
2818        });
2819        let (worktree_a, _) = project_a
2820            .update(&mut cx_a, |p, cx| {
2821                p.find_or_create_local_worktree("/root", false, cx)
2822            })
2823            .await
2824            .unwrap();
2825        worktree_a
2826            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2827            .await;
2828        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2829        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2830        project_a
2831            .update(&mut cx_a, |p, cx| p.share(cx))
2832            .await
2833            .unwrap();
2834
2835        // Join the worktree as client B.
2836        let project_b = Project::remote(
2837            project_id,
2838            client_b.clone(),
2839            client_b.user_store.clone(),
2840            lang_registry.clone(),
2841            fs.clone(),
2842            &mut cx_b.to_async(),
2843        )
2844        .await
2845        .unwrap();
2846
2847        let buffer_b1 = cx_b
2848            .background()
2849            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2850            .await
2851            .unwrap();
2852
2853        let definitions;
2854        let buffer_b2;
2855        if rng.gen() {
2856            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2857            buffer_b2 =
2858                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2859        } else {
2860            buffer_b2 =
2861                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2862            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2863        }
2864
2865        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2866            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2867                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2868                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2869            )))
2870        });
2871
2872        let buffer_b2 = buffer_b2.await.unwrap();
2873        let definitions = definitions.await.unwrap();
2874        assert_eq!(definitions.len(), 1);
2875        assert_eq!(definitions[0].target_buffer, buffer_b2);
2876    }
2877
2878    #[gpui::test(iterations = 10)]
2879    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2880        cx_a.foreground().forbid_parking();
2881
2882        // Connect to a server as 2 clients.
2883        let mut server = TestServer::start(cx_a.foreground()).await;
2884        let client_a = server.create_client(&mut cx_a, "user_a").await;
2885        let client_b = server.create_client(&mut cx_b, "user_b").await;
2886
2887        // Create an org that includes these 2 users.
2888        let db = &server.app_state.db;
2889        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2890        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2891            .await
2892            .unwrap();
2893        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2894            .await
2895            .unwrap();
2896
2897        // Create a channel that includes all the users.
2898        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2899        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2900            .await
2901            .unwrap();
2902        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2903            .await
2904            .unwrap();
2905        db.create_channel_message(
2906            channel_id,
2907            client_b.current_user_id(&cx_b),
2908            "hello A, it's B.",
2909            OffsetDateTime::now_utc(),
2910            1,
2911        )
2912        .await
2913        .unwrap();
2914
2915        let channels_a = cx_a
2916            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2917        channels_a
2918            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2919            .await;
2920        channels_a.read_with(&cx_a, |list, _| {
2921            assert_eq!(
2922                list.available_channels().unwrap(),
2923                &[ChannelDetails {
2924                    id: channel_id.to_proto(),
2925                    name: "test-channel".to_string()
2926                }]
2927            )
2928        });
2929        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2930            this.get_channel(channel_id.to_proto(), cx).unwrap()
2931        });
2932        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2933        channel_a
2934            .condition(&cx_a, |channel, _| {
2935                channel_messages(channel)
2936                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2937            })
2938            .await;
2939
2940        let channels_b = cx_b
2941            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2942        channels_b
2943            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2944            .await;
2945        channels_b.read_with(&cx_b, |list, _| {
2946            assert_eq!(
2947                list.available_channels().unwrap(),
2948                &[ChannelDetails {
2949                    id: channel_id.to_proto(),
2950                    name: "test-channel".to_string()
2951                }]
2952            )
2953        });
2954
2955        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2956            this.get_channel(channel_id.to_proto(), cx).unwrap()
2957        });
2958        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2959        channel_b
2960            .condition(&cx_b, |channel, _| {
2961                channel_messages(channel)
2962                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2963            })
2964            .await;
2965
2966        channel_a
2967            .update(&mut cx_a, |channel, cx| {
2968                channel
2969                    .send_message("oh, hi B.".to_string(), cx)
2970                    .unwrap()
2971                    .detach();
2972                let task = channel.send_message("sup".to_string(), cx).unwrap();
2973                assert_eq!(
2974                    channel_messages(channel),
2975                    &[
2976                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2977                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2978                        ("user_a".to_string(), "sup".to_string(), true)
2979                    ]
2980                );
2981                task
2982            })
2983            .await
2984            .unwrap();
2985
2986        channel_b
2987            .condition(&cx_b, |channel, _| {
2988                channel_messages(channel)
2989                    == [
2990                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2991                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2992                        ("user_a".to_string(), "sup".to_string(), false),
2993                    ]
2994            })
2995            .await;
2996
2997        assert_eq!(
2998            server
2999                .state()
3000                .await
3001                .channel(channel_id)
3002                .unwrap()
3003                .connection_ids
3004                .len(),
3005            2
3006        );
3007        cx_b.update(|_| drop(channel_b));
3008        server
3009            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3010            .await;
3011
3012        cx_a.update(|_| drop(channel_a));
3013        server
3014            .condition(|state| state.channel(channel_id).is_none())
3015            .await;
3016    }
3017
3018    #[gpui::test(iterations = 10)]
3019    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3020        cx_a.foreground().forbid_parking();
3021
3022        let mut server = TestServer::start(cx_a.foreground()).await;
3023        let client_a = server.create_client(&mut cx_a, "user_a").await;
3024
3025        let db = &server.app_state.db;
3026        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3027        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3028        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3029            .await
3030            .unwrap();
3031        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3032            .await
3033            .unwrap();
3034
3035        let channels_a = cx_a
3036            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3037        channels_a
3038            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3039            .await;
3040        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3041            this.get_channel(channel_id.to_proto(), cx).unwrap()
3042        });
3043
3044        // Messages aren't allowed to be too long.
3045        channel_a
3046            .update(&mut cx_a, |channel, cx| {
3047                let long_body = "this is long.\n".repeat(1024);
3048                channel.send_message(long_body, cx).unwrap()
3049            })
3050            .await
3051            .unwrap_err();
3052
3053        // Messages aren't allowed to be blank.
3054        channel_a.update(&mut cx_a, |channel, cx| {
3055            channel.send_message(String::new(), cx).unwrap_err()
3056        });
3057
3058        // Leading and trailing whitespace are trimmed.
3059        channel_a
3060            .update(&mut cx_a, |channel, cx| {
3061                channel
3062                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3063                    .unwrap()
3064            })
3065            .await
3066            .unwrap();
3067        assert_eq!(
3068            db.get_channel_messages(channel_id, 10, None)
3069                .await
3070                .unwrap()
3071                .iter()
3072                .map(|m| &m.body)
3073                .collect::<Vec<_>>(),
3074            &["surrounded by whitespace"]
3075        );
3076    }
3077
3078    #[gpui::test(iterations = 10)]
3079    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3080        cx_a.foreground().forbid_parking();
3081
3082        // Connect to a server as 2 clients.
3083        let mut server = TestServer::start(cx_a.foreground()).await;
3084        let client_a = server.create_client(&mut cx_a, "user_a").await;
3085        let client_b = server.create_client(&mut cx_b, "user_b").await;
3086        let mut status_b = client_b.status();
3087
3088        // Create an org that includes these 2 users.
3089        let db = &server.app_state.db;
3090        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3091        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3092            .await
3093            .unwrap();
3094        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3095            .await
3096            .unwrap();
3097
3098        // Create a channel that includes all the users.
3099        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3100        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3101            .await
3102            .unwrap();
3103        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3104            .await
3105            .unwrap();
3106        db.create_channel_message(
3107            channel_id,
3108            client_b.current_user_id(&cx_b),
3109            "hello A, it's B.",
3110            OffsetDateTime::now_utc(),
3111            2,
3112        )
3113        .await
3114        .unwrap();
3115
3116        let channels_a = cx_a
3117            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3118        channels_a
3119            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3120            .await;
3121
3122        channels_a.read_with(&cx_a, |list, _| {
3123            assert_eq!(
3124                list.available_channels().unwrap(),
3125                &[ChannelDetails {
3126                    id: channel_id.to_proto(),
3127                    name: "test-channel".to_string()
3128                }]
3129            )
3130        });
3131        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3132            this.get_channel(channel_id.to_proto(), cx).unwrap()
3133        });
3134        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3135        channel_a
3136            .condition(&cx_a, |channel, _| {
3137                channel_messages(channel)
3138                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3139            })
3140            .await;
3141
3142        let channels_b = cx_b
3143            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3144        channels_b
3145            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3146            .await;
3147        channels_b.read_with(&cx_b, |list, _| {
3148            assert_eq!(
3149                list.available_channels().unwrap(),
3150                &[ChannelDetails {
3151                    id: channel_id.to_proto(),
3152                    name: "test-channel".to_string()
3153                }]
3154            )
3155        });
3156
3157        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3158            this.get_channel(channel_id.to_proto(), cx).unwrap()
3159        });
3160        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3161        channel_b
3162            .condition(&cx_b, |channel, _| {
3163                channel_messages(channel)
3164                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3165            })
3166            .await;
3167
3168        // Disconnect client B, ensuring we can still access its cached channel data.
3169        server.forbid_connections();
3170        server.disconnect_client(client_b.current_user_id(&cx_b));
3171        while !matches!(
3172            status_b.next().await,
3173            Some(client::Status::ReconnectionError { .. })
3174        ) {}
3175
3176        channels_b.read_with(&cx_b, |channels, _| {
3177            assert_eq!(
3178                channels.available_channels().unwrap(),
3179                [ChannelDetails {
3180                    id: channel_id.to_proto(),
3181                    name: "test-channel".to_string()
3182                }]
3183            )
3184        });
3185        channel_b.read_with(&cx_b, |channel, _| {
3186            assert_eq!(
3187                channel_messages(channel),
3188                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3189            )
3190        });
3191
3192        // Send a message from client B while it is disconnected.
3193        channel_b
3194            .update(&mut cx_b, |channel, cx| {
3195                let task = channel
3196                    .send_message("can you see this?".to_string(), cx)
3197                    .unwrap();
3198                assert_eq!(
3199                    channel_messages(channel),
3200                    &[
3201                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3202                        ("user_b".to_string(), "can you see this?".to_string(), true)
3203                    ]
3204                );
3205                task
3206            })
3207            .await
3208            .unwrap_err();
3209
3210        // Send a message from client A while B is disconnected.
3211        channel_a
3212            .update(&mut cx_a, |channel, cx| {
3213                channel
3214                    .send_message("oh, hi B.".to_string(), cx)
3215                    .unwrap()
3216                    .detach();
3217                let task = channel.send_message("sup".to_string(), cx).unwrap();
3218                assert_eq!(
3219                    channel_messages(channel),
3220                    &[
3221                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3222                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3223                        ("user_a".to_string(), "sup".to_string(), true)
3224                    ]
3225                );
3226                task
3227            })
3228            .await
3229            .unwrap();
3230
3231        // Give client B a chance to reconnect.
3232        server.allow_connections();
3233        cx_b.foreground().advance_clock(Duration::from_secs(10));
3234
3235        // Verify that B sees the new messages upon reconnection, as well as the message client B
3236        // sent while offline.
3237        channel_b
3238            .condition(&cx_b, |channel, _| {
3239                channel_messages(channel)
3240                    == [
3241                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3242                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3243                        ("user_a".to_string(), "sup".to_string(), false),
3244                        ("user_b".to_string(), "can you see this?".to_string(), false),
3245                    ]
3246            })
3247            .await;
3248
3249        // Ensure client A and B can communicate normally after reconnection.
3250        channel_a
3251            .update(&mut cx_a, |channel, cx| {
3252                channel.send_message("you online?".to_string(), cx).unwrap()
3253            })
3254            .await
3255            .unwrap();
3256        channel_b
3257            .condition(&cx_b, |channel, _| {
3258                channel_messages(channel)
3259                    == [
3260                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3261                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3262                        ("user_a".to_string(), "sup".to_string(), false),
3263                        ("user_b".to_string(), "can you see this?".to_string(), false),
3264                        ("user_a".to_string(), "you online?".to_string(), false),
3265                    ]
3266            })
3267            .await;
3268
3269        channel_b
3270            .update(&mut cx_b, |channel, cx| {
3271                channel.send_message("yep".to_string(), cx).unwrap()
3272            })
3273            .await
3274            .unwrap();
3275        channel_a
3276            .condition(&cx_a, |channel, _| {
3277                channel_messages(channel)
3278                    == [
3279                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3280                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3281                        ("user_a".to_string(), "sup".to_string(), false),
3282                        ("user_b".to_string(), "can you see this?".to_string(), false),
3283                        ("user_a".to_string(), "you online?".to_string(), false),
3284                        ("user_b".to_string(), "yep".to_string(), false),
3285                    ]
3286            })
3287            .await;
3288    }
3289
3290    #[gpui::test(iterations = 10)]
3291    async fn test_contacts(
3292        mut cx_a: TestAppContext,
3293        mut cx_b: TestAppContext,
3294        mut cx_c: TestAppContext,
3295    ) {
3296        cx_a.foreground().forbid_parking();
3297        let lang_registry = Arc::new(LanguageRegistry::new());
3298        let fs = Arc::new(FakeFs::new(cx_a.background()));
3299
3300        // Connect to a server as 3 clients.
3301        let mut server = TestServer::start(cx_a.foreground()).await;
3302        let client_a = server.create_client(&mut cx_a, "user_a").await;
3303        let client_b = server.create_client(&mut cx_b, "user_b").await;
3304        let client_c = server.create_client(&mut cx_c, "user_c").await;
3305
3306        // Share a worktree as client A.
3307        fs.insert_tree(
3308            "/a",
3309            json!({
3310                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3311            }),
3312        )
3313        .await;
3314
3315        let project_a = cx_a.update(|cx| {
3316            Project::local(
3317                client_a.clone(),
3318                client_a.user_store.clone(),
3319                lang_registry.clone(),
3320                fs.clone(),
3321                cx,
3322            )
3323        });
3324        let (worktree_a, _) = project_a
3325            .update(&mut cx_a, |p, cx| {
3326                p.find_or_create_local_worktree("/a", false, cx)
3327            })
3328            .await
3329            .unwrap();
3330        worktree_a
3331            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3332            .await;
3333
3334        client_a
3335            .user_store
3336            .condition(&cx_a, |user_store, _| {
3337                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3338            })
3339            .await;
3340        client_b
3341            .user_store
3342            .condition(&cx_b, |user_store, _| {
3343                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3344            })
3345            .await;
3346        client_c
3347            .user_store
3348            .condition(&cx_c, |user_store, _| {
3349                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3350            })
3351            .await;
3352
3353        let project_id = project_a
3354            .update(&mut cx_a, |project, _| project.next_remote_id())
3355            .await;
3356        project_a
3357            .update(&mut cx_a, |project, cx| project.share(cx))
3358            .await
3359            .unwrap();
3360
3361        let _project_b = Project::remote(
3362            project_id,
3363            client_b.clone(),
3364            client_b.user_store.clone(),
3365            lang_registry.clone(),
3366            fs.clone(),
3367            &mut cx_b.to_async(),
3368        )
3369        .await
3370        .unwrap();
3371
3372        client_a
3373            .user_store
3374            .condition(&cx_a, |user_store, _| {
3375                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3376            })
3377            .await;
3378        client_b
3379            .user_store
3380            .condition(&cx_b, |user_store, _| {
3381                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3382            })
3383            .await;
3384        client_c
3385            .user_store
3386            .condition(&cx_c, |user_store, _| {
3387                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3388            })
3389            .await;
3390
3391        project_a
3392            .condition(&cx_a, |project, _| {
3393                project.collaborators().contains_key(&client_b.peer_id)
3394            })
3395            .await;
3396
3397        cx_a.update(move |_| drop(project_a));
3398        client_a
3399            .user_store
3400            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3401            .await;
3402        client_b
3403            .user_store
3404            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3405            .await;
3406        client_c
3407            .user_store
3408            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3409            .await;
3410
3411        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3412            user_store
3413                .contacts()
3414                .iter()
3415                .map(|contact| {
3416                    let worktrees = contact
3417                        .projects
3418                        .iter()
3419                        .map(|p| {
3420                            (
3421                                p.worktree_root_names[0].as_str(),
3422                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3423                            )
3424                        })
3425                        .collect();
3426                    (contact.user.github_login.as_str(), worktrees)
3427                })
3428                .collect()
3429        }
3430    }
3431
3432    struct TestServer {
3433        peer: Arc<Peer>,
3434        app_state: Arc<AppState>,
3435        server: Arc<Server>,
3436        foreground: Rc<executor::Foreground>,
3437        notifications: mpsc::Receiver<()>,
3438        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3439        forbid_connections: Arc<AtomicBool>,
3440        _test_db: TestDb,
3441    }
3442
3443    impl TestServer {
3444        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3445            let test_db = TestDb::new();
3446            let app_state = Self::build_app_state(&test_db).await;
3447            let peer = Peer::new();
3448            let notifications = mpsc::channel(128);
3449            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3450            Self {
3451                peer,
3452                app_state,
3453                server,
3454                foreground,
3455                notifications: notifications.1,
3456                connection_killers: Default::default(),
3457                forbid_connections: Default::default(),
3458                _test_db: test_db,
3459            }
3460        }
3461
3462        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3463            let http = FakeHttpClient::with_404_response();
3464            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3465            let client_name = name.to_string();
3466            let mut client = Client::new(http.clone());
3467            let server = self.server.clone();
3468            let connection_killers = self.connection_killers.clone();
3469            let forbid_connections = self.forbid_connections.clone();
3470            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3471
3472            Arc::get_mut(&mut client)
3473                .unwrap()
3474                .override_authenticate(move |cx| {
3475                    cx.spawn(|_| async move {
3476                        let access_token = "the-token".to_string();
3477                        Ok(Credentials {
3478                            user_id: user_id.0 as u64,
3479                            access_token,
3480                        })
3481                    })
3482                })
3483                .override_establish_connection(move |credentials, cx| {
3484                    assert_eq!(credentials.user_id, user_id.0 as u64);
3485                    assert_eq!(credentials.access_token, "the-token");
3486
3487                    let server = server.clone();
3488                    let connection_killers = connection_killers.clone();
3489                    let forbid_connections = forbid_connections.clone();
3490                    let client_name = client_name.clone();
3491                    let connection_id_tx = connection_id_tx.clone();
3492                    cx.spawn(move |cx| async move {
3493                        if forbid_connections.load(SeqCst) {
3494                            Err(EstablishConnectionError::other(anyhow!(
3495                                "server is forbidding connections"
3496                            )))
3497                        } else {
3498                            let (client_conn, server_conn, kill_conn) =
3499                                Connection::in_memory(cx.background());
3500                            connection_killers.lock().insert(user_id, kill_conn);
3501                            cx.background()
3502                                .spawn(server.handle_connection(
3503                                    server_conn,
3504                                    client_name,
3505                                    user_id,
3506                                    Some(connection_id_tx),
3507                                ))
3508                                .detach();
3509                            Ok(client_conn)
3510                        }
3511                    })
3512                });
3513
3514            client
3515                .authenticate_and_connect(&cx.to_async())
3516                .await
3517                .unwrap();
3518
3519            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3520            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3521            let mut authed_user =
3522                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3523            while authed_user.next().await.unwrap().is_none() {}
3524
3525            TestClient {
3526                client,
3527                peer_id,
3528                user_store,
3529            }
3530        }
3531
3532        fn disconnect_client(&self, user_id: UserId) {
3533            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3534                let _ = kill_conn.try_send(Some(()));
3535            }
3536        }
3537
3538        fn forbid_connections(&self) {
3539            self.forbid_connections.store(true, SeqCst);
3540        }
3541
3542        fn allow_connections(&self) {
3543            self.forbid_connections.store(false, SeqCst);
3544        }
3545
3546        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3547            let mut config = Config::default();
3548            config.session_secret = "a".repeat(32);
3549            config.database_url = test_db.url.clone();
3550            let github_client = github::AppClient::test();
3551            Arc::new(AppState {
3552                db: test_db.db().clone(),
3553                handlebars: Default::default(),
3554                auth_client: auth::build_client("", ""),
3555                repo_client: github::RepoClient::test(&github_client),
3556                github_client,
3557                config,
3558            })
3559        }
3560
3561        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3562            self.server.store.read()
3563        }
3564
3565        async fn condition<F>(&mut self, mut predicate: F)
3566        where
3567            F: FnMut(&Store) -> bool,
3568        {
3569            async_std::future::timeout(Duration::from_millis(500), async {
3570                while !(predicate)(&*self.server.store.read()) {
3571                    self.foreground.start_waiting();
3572                    self.notifications.next().await;
3573                    self.foreground.finish_waiting();
3574                }
3575            })
3576            .await
3577            .expect("condition timed out");
3578        }
3579    }
3580
3581    impl Drop for TestServer {
3582        fn drop(&mut self) {
3583            self.peer.reset();
3584        }
3585    }
3586
3587    struct TestClient {
3588        client: Arc<Client>,
3589        pub peer_id: PeerId,
3590        pub user_store: ModelHandle<UserStore>,
3591    }
3592
3593    impl Deref for TestClient {
3594        type Target = Arc<Client>;
3595
3596        fn deref(&self) -> &Self::Target {
3597            &self.client
3598        }
3599    }
3600
3601    impl TestClient {
3602        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3603            UserId::from_proto(
3604                self.user_store
3605                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3606            )
3607        }
3608    }
3609
3610    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3611        channel
3612            .messages()
3613            .cursor::<()>()
3614            .map(|m| {
3615                (
3616                    m.sender.github_login.clone(),
3617                    m.body.clone(),
3618                    m.is_pending(),
3619                )
3620            })
3621            .collect()
3622    }
3623
3624    struct EmptyView;
3625
3626    impl gpui::Entity for EmptyView {
3627        type Event = ();
3628    }
3629
3630    impl gpui::View for EmptyView {
3631        fn ui_name() -> &'static str {
3632            "empty view"
3633        }
3634
3635        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3636            gpui::Element::boxed(gpui::elements::Empty)
3637        }
3638    }
3639}