rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::get_channels)
  88            .add_handler(Server::get_users)
  89            .add_handler(Server::join_channel)
  90            .add_handler(Server::leave_channel)
  91            .add_handler(Server::send_channel_message)
  92            .add_handler(Server::get_channel_messages);
  93
  94        Arc::new(server)
  95    }
  96
  97    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  98    where
  99        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 100        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 101        M: EnvelopedMessage,
 102    {
 103        let prev_handler = self.handlers.insert(
 104            TypeId::of::<M>(),
 105            Box::new(move |server, envelope| {
 106                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 107                (handler)(server, *envelope).boxed()
 108            }),
 109        );
 110        if prev_handler.is_some() {
 111            panic!("registered a handler for the same message twice");
 112        }
 113        self
 114    }
 115
 116    pub fn handle_connection(
 117        self: &Arc<Self>,
 118        connection: Connection,
 119        addr: String,
 120        user_id: UserId,
 121        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 122    ) -> impl Future<Output = ()> {
 123        let mut this = self.clone();
 124        async move {
 125            let (connection_id, handle_io, mut incoming_rx) =
 126                this.peer.add_connection(connection).await;
 127
 128            if let Some(send_connection_id) = send_connection_id.as_mut() {
 129                let _ = send_connection_id.send(connection_id).await;
 130            }
 131
 132            this.state_mut().add_connection(connection_id, user_id);
 133            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 134                log::error!("error updating contacts for {:?}: {}", user_id, err);
 135            }
 136
 137            let handle_io = handle_io.fuse();
 138            futures::pin_mut!(handle_io);
 139            loop {
 140                let next_message = incoming_rx.next().fuse();
 141                futures::pin_mut!(next_message);
 142                futures::select_biased! {
 143                    message = next_message => {
 144                        if let Some(message) = message {
 145                            let start_time = Instant::now();
 146                            log::info!("RPC message received: {}", message.payload_type_name());
 147                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 148                                if let Err(err) = (handler)(this.clone(), message).await {
 149                                    log::error!("error handling message: {:?}", err);
 150                                } else {
 151                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 152                                }
 153
 154                                if let Some(mut notifications) = this.notifications.clone() {
 155                                    let _ = notifications.send(()).await;
 156                                }
 157                            } else {
 158                                log::warn!("unhandled message: {}", message.payload_type_name());
 159                            }
 160                        } else {
 161                            log::info!("rpc connection closed {:?}", addr);
 162                            break;
 163                        }
 164                    }
 165                    handle_io = handle_io => {
 166                        if let Err(err) = handle_io {
 167                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 168                        }
 169                        break;
 170                    }
 171                }
 172            }
 173
 174            if let Err(err) = this.sign_out(connection_id).await {
 175                log::error!("error signing out connection {:?} - {:?}", addr, err);
 176            }
 177        }
 178    }
 179
 180    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 181        self.peer.disconnect(connection_id);
 182        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 183
 184        for (project_id, project) in removed_connection.hosted_projects {
 185            if let Some(share) = project.share {
 186                broadcast(
 187                    connection_id,
 188                    share.guests.keys().copied().collect(),
 189                    |conn_id| {
 190                        self.peer
 191                            .send(conn_id, proto::UnshareProject { project_id })
 192                    },
 193                )
 194                .await?;
 195            }
 196        }
 197
 198        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 199            broadcast(connection_id, peer_ids, |conn_id| {
 200                self.peer.send(
 201                    conn_id,
 202                    proto::RemoveProjectCollaborator {
 203                        project_id,
 204                        peer_id: connection_id.0,
 205                    },
 206                )
 207            })
 208            .await?;
 209        }
 210
 211        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 212            .await?;
 213
 214        Ok(())
 215    }
 216
 217    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 218        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 219        Ok(())
 220    }
 221
 222    async fn register_project(
 223        mut self: Arc<Server>,
 224        request: TypedEnvelope<proto::RegisterProject>,
 225    ) -> tide::Result<()> {
 226        let project_id = {
 227            let mut state = self.state_mut();
 228            let user_id = state.user_id_for_connection(request.sender_id)?;
 229            state.register_project(request.sender_id, user_id)
 230        };
 231        self.peer
 232            .respond(
 233                request.receipt(),
 234                proto::RegisterProjectResponse { project_id },
 235            )
 236            .await?;
 237        Ok(())
 238    }
 239
 240    async fn unregister_project(
 241        mut self: Arc<Server>,
 242        request: TypedEnvelope<proto::UnregisterProject>,
 243    ) -> tide::Result<()> {
 244        let project = self
 245            .state_mut()
 246            .unregister_project(request.payload.project_id, request.sender_id)
 247            .ok_or_else(|| anyhow!("no such project"))?;
 248        self.update_contacts_for_users(project.authorized_user_ids().iter())
 249            .await?;
 250        Ok(())
 251    }
 252
 253    async fn share_project(
 254        mut self: Arc<Server>,
 255        request: TypedEnvelope<proto::ShareProject>,
 256    ) -> tide::Result<()> {
 257        self.state_mut()
 258            .share_project(request.payload.project_id, request.sender_id);
 259        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 260        Ok(())
 261    }
 262
 263    async fn unshare_project(
 264        mut self: Arc<Server>,
 265        request: TypedEnvelope<proto::UnshareProject>,
 266    ) -> tide::Result<()> {
 267        let project_id = request.payload.project_id;
 268        let project = self
 269            .state_mut()
 270            .unshare_project(project_id, request.sender_id)?;
 271
 272        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 273            self.peer
 274                .send(conn_id, proto::UnshareProject { project_id })
 275        })
 276        .await?;
 277        self.update_contacts_for_users(&project.authorized_user_ids)
 278            .await?;
 279
 280        Ok(())
 281    }
 282
 283    async fn join_project(
 284        mut self: Arc<Server>,
 285        request: TypedEnvelope<proto::JoinProject>,
 286    ) -> tide::Result<()> {
 287        let project_id = request.payload.project_id;
 288
 289        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 290        let response_data = self
 291            .state_mut()
 292            .join_project(request.sender_id, user_id, project_id)
 293            .and_then(|joined| {
 294                let share = joined.project.share()?;
 295                let peer_count = share.guests.len();
 296                let mut collaborators = Vec::with_capacity(peer_count);
 297                collaborators.push(proto::Collaborator {
 298                    peer_id: joined.project.host_connection_id.0,
 299                    replica_id: 0,
 300                    user_id: joined.project.host_user_id.to_proto(),
 301                });
 302                let worktrees = joined
 303                    .project
 304                    .worktrees
 305                    .iter()
 306                    .filter_map(|(id, worktree)| {
 307                        worktree.share.as_ref().map(|share| proto::Worktree {
 308                            id: *id,
 309                            root_name: worktree.root_name.clone(),
 310                            entries: share.entries.values().cloned().collect(),
 311                            diagnostic_summaries: share
 312                                .diagnostic_summaries
 313                                .values()
 314                                .cloned()
 315                                .collect(),
 316                            weak: worktree.weak,
 317                        })
 318                    })
 319                    .collect();
 320                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 321                    if *peer_conn_id != request.sender_id {
 322                        collaborators.push(proto::Collaborator {
 323                            peer_id: peer_conn_id.0,
 324                            replica_id: *peer_replica_id as u32,
 325                            user_id: peer_user_id.to_proto(),
 326                        });
 327                    }
 328                }
 329                let response = proto::JoinProjectResponse {
 330                    worktrees,
 331                    replica_id: joined.replica_id as u32,
 332                    collaborators,
 333                };
 334                let connection_ids = joined.project.connection_ids();
 335                let contact_user_ids = joined.project.authorized_user_ids();
 336                Ok((response, connection_ids, contact_user_ids))
 337            });
 338
 339        match response_data {
 340            Ok((response, connection_ids, contact_user_ids)) => {
 341                broadcast(request.sender_id, connection_ids, |conn_id| {
 342                    self.peer.send(
 343                        conn_id,
 344                        proto::AddProjectCollaborator {
 345                            project_id: project_id,
 346                            collaborator: Some(proto::Collaborator {
 347                                peer_id: request.sender_id.0,
 348                                replica_id: response.replica_id,
 349                                user_id: user_id.to_proto(),
 350                            }),
 351                        },
 352                    )
 353                })
 354                .await?;
 355                self.peer.respond(request.receipt(), response).await?;
 356                self.update_contacts_for_users(&contact_user_ids).await?;
 357            }
 358            Err(error) => {
 359                self.peer
 360                    .respond_with_error(
 361                        request.receipt(),
 362                        proto::Error {
 363                            message: error.to_string(),
 364                        },
 365                    )
 366                    .await?;
 367            }
 368        }
 369
 370        Ok(())
 371    }
 372
 373    async fn leave_project(
 374        mut self: Arc<Server>,
 375        request: TypedEnvelope<proto::LeaveProject>,
 376    ) -> tide::Result<()> {
 377        let sender_id = request.sender_id;
 378        let project_id = request.payload.project_id;
 379        let worktree = self.state_mut().leave_project(sender_id, project_id);
 380        if let Some(worktree) = worktree {
 381            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 382                self.peer.send(
 383                    conn_id,
 384                    proto::RemoveProjectCollaborator {
 385                        project_id,
 386                        peer_id: sender_id.0,
 387                    },
 388                )
 389            })
 390            .await?;
 391            self.update_contacts_for_users(&worktree.authorized_user_ids)
 392                .await?;
 393        }
 394        Ok(())
 395    }
 396
 397    async fn register_worktree(
 398        mut self: Arc<Server>,
 399        request: TypedEnvelope<proto::RegisterWorktree>,
 400    ) -> tide::Result<()> {
 401        let receipt = request.receipt();
 402        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 403
 404        let mut contact_user_ids = HashSet::default();
 405        contact_user_ids.insert(host_user_id);
 406        for github_login in request.payload.authorized_logins {
 407            match self.app_state.db.create_user(&github_login, false).await {
 408                Ok(contact_user_id) => {
 409                    contact_user_ids.insert(contact_user_id);
 410                }
 411                Err(err) => {
 412                    let message = err.to_string();
 413                    self.peer
 414                        .respond_with_error(receipt, proto::Error { message })
 415                        .await?;
 416                    return Ok(());
 417                }
 418            }
 419        }
 420
 421        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 422        let ok = self.state_mut().register_worktree(
 423            request.payload.project_id,
 424            request.payload.worktree_id,
 425            Worktree {
 426                authorized_user_ids: contact_user_ids.clone(),
 427                root_name: request.payload.root_name,
 428                share: None,
 429                weak: false,
 430            },
 431        );
 432
 433        if ok {
 434            self.peer.respond(receipt, proto::Ack {}).await?;
 435            self.update_contacts_for_users(&contact_user_ids).await?;
 436        } else {
 437            self.peer
 438                .respond_with_error(
 439                    receipt,
 440                    proto::Error {
 441                        message: NO_SUCH_PROJECT.to_string(),
 442                    },
 443                )
 444                .await?;
 445        }
 446
 447        Ok(())
 448    }
 449
 450    async fn unregister_worktree(
 451        mut self: Arc<Server>,
 452        request: TypedEnvelope<proto::UnregisterWorktree>,
 453    ) -> tide::Result<()> {
 454        let project_id = request.payload.project_id;
 455        let worktree_id = request.payload.worktree_id;
 456        let (worktree, guest_connection_ids) =
 457            self.state_mut()
 458                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 459
 460        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 461            self.peer.send(
 462                conn_id,
 463                proto::UnregisterWorktree {
 464                    project_id,
 465                    worktree_id,
 466                },
 467            )
 468        })
 469        .await?;
 470        self.update_contacts_for_users(&worktree.authorized_user_ids)
 471            .await?;
 472        Ok(())
 473    }
 474
 475    async fn share_worktree(
 476        mut self: Arc<Server>,
 477        mut request: TypedEnvelope<proto::ShareWorktree>,
 478    ) -> tide::Result<()> {
 479        let worktree = request
 480            .payload
 481            .worktree
 482            .as_mut()
 483            .ok_or_else(|| anyhow!("missing worktree"))?;
 484        let entries = worktree
 485            .entries
 486            .iter()
 487            .map(|entry| (entry.id, entry.clone()))
 488            .collect();
 489        let diagnostic_summaries = worktree
 490            .diagnostic_summaries
 491            .iter()
 492            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 493            .collect();
 494
 495        let shared_worktree = self.state_mut().share_worktree(
 496            request.payload.project_id,
 497            worktree.id,
 498            request.sender_id,
 499            entries,
 500            diagnostic_summaries,
 501        );
 502        if let Some(shared_worktree) = shared_worktree {
 503            broadcast(
 504                request.sender_id,
 505                shared_worktree.connection_ids,
 506                |connection_id| {
 507                    self.peer.forward_send(
 508                        request.sender_id,
 509                        connection_id,
 510                        request.payload.clone(),
 511                    )
 512                },
 513            )
 514            .await?;
 515            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 516            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
 517                .await?;
 518        } else {
 519            self.peer
 520                .respond_with_error(
 521                    request.receipt(),
 522                    proto::Error {
 523                        message: "no such worktree".to_string(),
 524                    },
 525                )
 526                .await?;
 527        }
 528        Ok(())
 529    }
 530
 531    async fn update_worktree(
 532        mut self: Arc<Server>,
 533        request: TypedEnvelope<proto::UpdateWorktree>,
 534    ) -> tide::Result<()> {
 535        let connection_ids = self
 536            .state_mut()
 537            .update_worktree(
 538                request.sender_id,
 539                request.payload.project_id,
 540                request.payload.worktree_id,
 541                &request.payload.removed_entries,
 542                &request.payload.updated_entries,
 543            )
 544            .ok_or_else(|| anyhow!("no such worktree"))?;
 545
 546        broadcast(request.sender_id, connection_ids, |connection_id| {
 547            self.peer
 548                .forward_send(request.sender_id, connection_id, request.payload.clone())
 549        })
 550        .await?;
 551
 552        Ok(())
 553    }
 554
 555    async fn update_diagnostic_summary(
 556        mut self: Arc<Server>,
 557        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 558    ) -> tide::Result<()> {
 559        let receiver_ids = request
 560            .payload
 561            .summary
 562            .clone()
 563            .and_then(|summary| {
 564                self.state_mut().update_diagnostic_summary(
 565                    request.payload.project_id,
 566                    request.payload.worktree_id,
 567                    request.sender_id,
 568                    summary,
 569                )
 570            })
 571            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 572
 573        broadcast(request.sender_id, receiver_ids, |connection_id| {
 574            self.peer
 575                .forward_send(request.sender_id, connection_id, request.payload.clone())
 576        })
 577        .await?;
 578        Ok(())
 579    }
 580
 581    async fn disk_based_diagnostics_updating(
 582        self: Arc<Server>,
 583        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 584    ) -> tide::Result<()> {
 585        let receiver_ids = self
 586            .state()
 587            .project_connection_ids(request.payload.project_id, request.sender_id)
 588            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 589        broadcast(request.sender_id, receiver_ids, |connection_id| {
 590            self.peer
 591                .forward_send(request.sender_id, connection_id, request.payload.clone())
 592        })
 593        .await?;
 594        Ok(())
 595    }
 596
 597    async fn disk_based_diagnostics_updated(
 598        self: Arc<Server>,
 599        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 600    ) -> tide::Result<()> {
 601        let receiver_ids = self
 602            .state()
 603            .project_connection_ids(request.payload.project_id, request.sender_id)
 604            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 605        broadcast(request.sender_id, receiver_ids, |connection_id| {
 606            self.peer
 607                .forward_send(request.sender_id, connection_id, request.payload.clone())
 608        })
 609        .await?;
 610        Ok(())
 611    }
 612
 613    async fn get_definition(
 614        self: Arc<Server>,
 615        request: TypedEnvelope<proto::GetDefinition>,
 616    ) -> tide::Result<()> {
 617        let receipt = request.receipt();
 618        let host_connection_id = self
 619            .state()
 620            .read_project(request.payload.project_id, request.sender_id)
 621            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 622            .host_connection_id;
 623        let response = self
 624            .peer
 625            .forward_request(request.sender_id, host_connection_id, request.payload)
 626            .await?;
 627        self.peer.respond(receipt, response).await?;
 628        Ok(())
 629    }
 630
 631    async fn open_buffer(
 632        self: Arc<Server>,
 633        request: TypedEnvelope<proto::OpenBuffer>,
 634    ) -> tide::Result<()> {
 635        let receipt = request.receipt();
 636        let host_connection_id = self
 637            .state()
 638            .read_project(request.payload.project_id, request.sender_id)
 639            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 640            .host_connection_id;
 641        let response = self
 642            .peer
 643            .forward_request(request.sender_id, host_connection_id, request.payload)
 644            .await?;
 645        self.peer.respond(receipt, response).await?;
 646        Ok(())
 647    }
 648
 649    async fn close_buffer(
 650        self: Arc<Server>,
 651        request: TypedEnvelope<proto::CloseBuffer>,
 652    ) -> tide::Result<()> {
 653        let host_connection_id = self
 654            .state()
 655            .read_project(request.payload.project_id, request.sender_id)
 656            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 657            .host_connection_id;
 658        self.peer
 659            .forward_send(request.sender_id, host_connection_id, request.payload)
 660            .await?;
 661        Ok(())
 662    }
 663
 664    async fn save_buffer(
 665        self: Arc<Server>,
 666        request: TypedEnvelope<proto::SaveBuffer>,
 667    ) -> tide::Result<()> {
 668        let host;
 669        let guests;
 670        {
 671            let state = self.state();
 672            let project = state
 673                .read_project(request.payload.project_id, request.sender_id)
 674                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 675            host = project.host_connection_id;
 676            guests = project.guest_connection_ids()
 677        }
 678
 679        let sender = request.sender_id;
 680        let receipt = request.receipt();
 681        let response = self
 682            .peer
 683            .forward_request(sender, host, request.payload.clone())
 684            .await?;
 685
 686        broadcast(host, guests, |conn_id| {
 687            let response = response.clone();
 688            let peer = &self.peer;
 689            async move {
 690                if conn_id == sender {
 691                    peer.respond(receipt, response).await
 692                } else {
 693                    peer.forward_send(host, conn_id, response).await
 694                }
 695            }
 696        })
 697        .await?;
 698
 699        Ok(())
 700    }
 701
 702    async fn format_buffer(
 703        self: Arc<Server>,
 704        request: TypedEnvelope<proto::FormatBuffer>,
 705    ) -> tide::Result<()> {
 706        let host;
 707        {
 708            let state = self.state();
 709            let project = state
 710                .read_project(request.payload.project_id, request.sender_id)
 711                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 712            host = project.host_connection_id;
 713        }
 714
 715        let sender = request.sender_id;
 716        let receipt = request.receipt();
 717        let response = self
 718            .peer
 719            .forward_request(sender, host, request.payload.clone())
 720            .await?;
 721        self.peer.respond(receipt, response).await?;
 722
 723        Ok(())
 724    }
 725
 726    async fn get_completions(
 727        self: Arc<Server>,
 728        request: TypedEnvelope<proto::GetCompletions>,
 729    ) -> tide::Result<()> {
 730        let host;
 731        {
 732            let state = self.state();
 733            let project = state
 734                .read_project(request.payload.project_id, request.sender_id)
 735                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 736            host = project.host_connection_id;
 737        }
 738
 739        let sender = request.sender_id;
 740        let receipt = request.receipt();
 741        let response = self
 742            .peer
 743            .forward_request(sender, host, request.payload.clone())
 744            .await?;
 745        self.peer.respond(receipt, response).await?;
 746
 747        Ok(())
 748    }
 749
 750    async fn update_buffer(
 751        self: Arc<Server>,
 752        request: TypedEnvelope<proto::UpdateBuffer>,
 753    ) -> tide::Result<()> {
 754        let receiver_ids = self
 755            .state()
 756            .project_connection_ids(request.payload.project_id, request.sender_id)
 757            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 758        broadcast(request.sender_id, receiver_ids, |connection_id| {
 759            self.peer
 760                .forward_send(request.sender_id, connection_id, request.payload.clone())
 761        })
 762        .await?;
 763        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 764        Ok(())
 765    }
 766
 767    async fn update_buffer_file(
 768        self: Arc<Server>,
 769        request: TypedEnvelope<proto::UpdateBufferFile>,
 770    ) -> tide::Result<()> {
 771        let receiver_ids = self
 772            .state()
 773            .project_connection_ids(request.payload.project_id, request.sender_id)
 774            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 775        broadcast(request.sender_id, receiver_ids, |connection_id| {
 776            self.peer
 777                .forward_send(request.sender_id, connection_id, request.payload.clone())
 778        })
 779        .await?;
 780        Ok(())
 781    }
 782
 783    async fn buffer_reloaded(
 784        self: Arc<Server>,
 785        request: TypedEnvelope<proto::BufferReloaded>,
 786    ) -> tide::Result<()> {
 787        let receiver_ids = self
 788            .state()
 789            .project_connection_ids(request.payload.project_id, request.sender_id)
 790            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 791        broadcast(request.sender_id, receiver_ids, |connection_id| {
 792            self.peer
 793                .forward_send(request.sender_id, connection_id, request.payload.clone())
 794        })
 795        .await?;
 796        Ok(())
 797    }
 798
 799    async fn buffer_saved(
 800        self: Arc<Server>,
 801        request: TypedEnvelope<proto::BufferSaved>,
 802    ) -> tide::Result<()> {
 803        let receiver_ids = self
 804            .state()
 805            .project_connection_ids(request.payload.project_id, request.sender_id)
 806            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 807        broadcast(request.sender_id, receiver_ids, |connection_id| {
 808            self.peer
 809                .forward_send(request.sender_id, connection_id, request.payload.clone())
 810        })
 811        .await?;
 812        Ok(())
 813    }
 814
 815    async fn get_channels(
 816        self: Arc<Server>,
 817        request: TypedEnvelope<proto::GetChannels>,
 818    ) -> tide::Result<()> {
 819        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 820        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 821        self.peer
 822            .respond(
 823                request.receipt(),
 824                proto::GetChannelsResponse {
 825                    channels: channels
 826                        .into_iter()
 827                        .map(|chan| proto::Channel {
 828                            id: chan.id.to_proto(),
 829                            name: chan.name,
 830                        })
 831                        .collect(),
 832                },
 833            )
 834            .await?;
 835        Ok(())
 836    }
 837
 838    async fn get_users(
 839        self: Arc<Server>,
 840        request: TypedEnvelope<proto::GetUsers>,
 841    ) -> tide::Result<()> {
 842        let receipt = request.receipt();
 843        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 844        let users = self
 845            .app_state
 846            .db
 847            .get_users_by_ids(user_ids)
 848            .await?
 849            .into_iter()
 850            .map(|user| proto::User {
 851                id: user.id.to_proto(),
 852                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 853                github_login: user.github_login,
 854            })
 855            .collect();
 856        self.peer
 857            .respond(receipt, proto::GetUsersResponse { users })
 858            .await?;
 859        Ok(())
 860    }
 861
 862    async fn update_contacts_for_users<'a>(
 863        self: &Arc<Server>,
 864        user_ids: impl IntoIterator<Item = &'a UserId>,
 865    ) -> tide::Result<()> {
 866        let mut send_futures = Vec::new();
 867
 868        {
 869            let state = self.state();
 870            for user_id in user_ids {
 871                let contacts = state.contacts_for_user(*user_id);
 872                for connection_id in state.connection_ids_for_user(*user_id) {
 873                    send_futures.push(self.peer.send(
 874                        connection_id,
 875                        proto::UpdateContacts {
 876                            contacts: contacts.clone(),
 877                        },
 878                    ));
 879                }
 880            }
 881        }
 882        futures::future::try_join_all(send_futures).await?;
 883
 884        Ok(())
 885    }
 886
 887    async fn join_channel(
 888        mut self: Arc<Self>,
 889        request: TypedEnvelope<proto::JoinChannel>,
 890    ) -> tide::Result<()> {
 891        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 892        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 893        if !self
 894            .app_state
 895            .db
 896            .can_user_access_channel(user_id, channel_id)
 897            .await?
 898        {
 899            Err(anyhow!("access denied"))?;
 900        }
 901
 902        self.state_mut().join_channel(request.sender_id, channel_id);
 903        let messages = self
 904            .app_state
 905            .db
 906            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 907            .await?
 908            .into_iter()
 909            .map(|msg| proto::ChannelMessage {
 910                id: msg.id.to_proto(),
 911                body: msg.body,
 912                timestamp: msg.sent_at.unix_timestamp() as u64,
 913                sender_id: msg.sender_id.to_proto(),
 914                nonce: Some(msg.nonce.as_u128().into()),
 915            })
 916            .collect::<Vec<_>>();
 917        self.peer
 918            .respond(
 919                request.receipt(),
 920                proto::JoinChannelResponse {
 921                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 922                    messages,
 923                },
 924            )
 925            .await?;
 926        Ok(())
 927    }
 928
 929    async fn leave_channel(
 930        mut self: Arc<Self>,
 931        request: TypedEnvelope<proto::LeaveChannel>,
 932    ) -> tide::Result<()> {
 933        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 934        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 935        if !self
 936            .app_state
 937            .db
 938            .can_user_access_channel(user_id, channel_id)
 939            .await?
 940        {
 941            Err(anyhow!("access denied"))?;
 942        }
 943
 944        self.state_mut()
 945            .leave_channel(request.sender_id, channel_id);
 946
 947        Ok(())
 948    }
 949
 950    async fn send_channel_message(
 951        self: Arc<Self>,
 952        request: TypedEnvelope<proto::SendChannelMessage>,
 953    ) -> tide::Result<()> {
 954        let receipt = request.receipt();
 955        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 956        let user_id;
 957        let connection_ids;
 958        {
 959            let state = self.state();
 960            user_id = state.user_id_for_connection(request.sender_id)?;
 961            if let Some(ids) = state.channel_connection_ids(channel_id) {
 962                connection_ids = ids;
 963            } else {
 964                return Ok(());
 965            }
 966        }
 967
 968        // Validate the message body.
 969        let body = request.payload.body.trim().to_string();
 970        if body.len() > MAX_MESSAGE_LEN {
 971            self.peer
 972                .respond_with_error(
 973                    receipt,
 974                    proto::Error {
 975                        message: "message is too long".to_string(),
 976                    },
 977                )
 978                .await?;
 979            return Ok(());
 980        }
 981        if body.is_empty() {
 982            self.peer
 983                .respond_with_error(
 984                    receipt,
 985                    proto::Error {
 986                        message: "message can't be blank".to_string(),
 987                    },
 988                )
 989                .await?;
 990            return Ok(());
 991        }
 992
 993        let timestamp = OffsetDateTime::now_utc();
 994        let nonce = if let Some(nonce) = request.payload.nonce {
 995            nonce
 996        } else {
 997            self.peer
 998                .respond_with_error(
 999                    receipt,
1000                    proto::Error {
1001                        message: "nonce can't be blank".to_string(),
1002                    },
1003                )
1004                .await?;
1005            return Ok(());
1006        };
1007
1008        let message_id = self
1009            .app_state
1010            .db
1011            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1012            .await?
1013            .to_proto();
1014        let message = proto::ChannelMessage {
1015            sender_id: user_id.to_proto(),
1016            id: message_id,
1017            body,
1018            timestamp: timestamp.unix_timestamp() as u64,
1019            nonce: Some(nonce),
1020        };
1021        broadcast(request.sender_id, connection_ids, |conn_id| {
1022            self.peer.send(
1023                conn_id,
1024                proto::ChannelMessageSent {
1025                    channel_id: channel_id.to_proto(),
1026                    message: Some(message.clone()),
1027                },
1028            )
1029        })
1030        .await?;
1031        self.peer
1032            .respond(
1033                receipt,
1034                proto::SendChannelMessageResponse {
1035                    message: Some(message),
1036                },
1037            )
1038            .await?;
1039        Ok(())
1040    }
1041
1042    async fn get_channel_messages(
1043        self: Arc<Self>,
1044        request: TypedEnvelope<proto::GetChannelMessages>,
1045    ) -> tide::Result<()> {
1046        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1047        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1048        if !self
1049            .app_state
1050            .db
1051            .can_user_access_channel(user_id, channel_id)
1052            .await?
1053        {
1054            Err(anyhow!("access denied"))?;
1055        }
1056
1057        let messages = self
1058            .app_state
1059            .db
1060            .get_channel_messages(
1061                channel_id,
1062                MESSAGE_COUNT_PER_PAGE,
1063                Some(MessageId::from_proto(request.payload.before_message_id)),
1064            )
1065            .await?
1066            .into_iter()
1067            .map(|msg| proto::ChannelMessage {
1068                id: msg.id.to_proto(),
1069                body: msg.body,
1070                timestamp: msg.sent_at.unix_timestamp() as u64,
1071                sender_id: msg.sender_id.to_proto(),
1072                nonce: Some(msg.nonce.as_u128().into()),
1073            })
1074            .collect::<Vec<_>>();
1075        self.peer
1076            .respond(
1077                request.receipt(),
1078                proto::GetChannelMessagesResponse {
1079                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1080                    messages,
1081                },
1082            )
1083            .await?;
1084        Ok(())
1085    }
1086
1087    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1088        self.store.read()
1089    }
1090
1091    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1092        self.store.write()
1093    }
1094}
1095
1096pub async fn broadcast<F, T>(
1097    sender_id: ConnectionId,
1098    receiver_ids: Vec<ConnectionId>,
1099    mut f: F,
1100) -> anyhow::Result<()>
1101where
1102    F: FnMut(ConnectionId) -> T,
1103    T: Future<Output = anyhow::Result<()>>,
1104{
1105    let futures = receiver_ids
1106        .into_iter()
1107        .filter(|id| *id != sender_id)
1108        .map(|id| f(id));
1109    futures::future::try_join_all(futures).await?;
1110    Ok(())
1111}
1112
1113pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1114    let server = Server::new(app.state().clone(), rpc.clone(), None);
1115    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1116        let server = server.clone();
1117        async move {
1118            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1119
1120            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1121            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1122            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1123            let client_protocol_version: Option<u32> = request
1124                .header("X-Zed-Protocol-Version")
1125                .and_then(|v| v.as_str().parse().ok());
1126
1127            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1128                return Ok(Response::new(StatusCode::UpgradeRequired));
1129            }
1130
1131            let header = match request.header("Sec-Websocket-Key") {
1132                Some(h) => h.as_str(),
1133                None => return Err(anyhow!("expected sec-websocket-key"))?,
1134            };
1135
1136            let user_id = process_auth_header(&request).await?;
1137
1138            let mut response = Response::new(StatusCode::SwitchingProtocols);
1139            response.insert_header(UPGRADE, "websocket");
1140            response.insert_header(CONNECTION, "Upgrade");
1141            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1142            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1143            response.insert_header("Sec-Websocket-Version", "13");
1144
1145            let http_res: &mut tide::http::Response = response.as_mut();
1146            let upgrade_receiver = http_res.recv_upgrade().await;
1147            let addr = request.remote().unwrap_or("unknown").to_string();
1148            task::spawn(async move {
1149                if let Some(stream) = upgrade_receiver.await {
1150                    server
1151                        .handle_connection(
1152                            Connection::new(
1153                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1154                            ),
1155                            addr,
1156                            user_id,
1157                            None,
1158                        )
1159                        .await;
1160                }
1161            });
1162
1163            Ok(response)
1164        }
1165    });
1166}
1167
1168fn header_contains_ignore_case<T>(
1169    request: &tide::Request<T>,
1170    header_name: HeaderName,
1171    value: &str,
1172) -> bool {
1173    request
1174        .header(header_name)
1175        .map(|h| {
1176            h.as_str()
1177                .split(',')
1178                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1179        })
1180        .unwrap_or(false)
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185    use super::*;
1186    use crate::{
1187        auth,
1188        db::{tests::TestDb, UserId},
1189        github, AppState, Config,
1190    };
1191    use ::rpc::Peer;
1192    use async_std::task;
1193    use gpui::{executor, ModelHandle, TestAppContext};
1194    use parking_lot::Mutex;
1195    use postage::{mpsc, watch};
1196    use rand::prelude::*;
1197    use rpc::PeerId;
1198    use serde_json::json;
1199    use sqlx::types::time::OffsetDateTime;
1200    use std::{
1201        ops::Deref,
1202        path::Path,
1203        rc::Rc,
1204        sync::{
1205            atomic::{AtomicBool, Ordering::SeqCst},
1206            Arc,
1207        },
1208        time::Duration,
1209    };
1210    use zed::{
1211        client::{
1212            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1213            EstablishConnectionError, UserStore,
1214        },
1215        editor::{Editor, EditorSettings, Input, MultiBuffer},
1216        fs::{FakeFs, Fs as _},
1217        language::{
1218            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1219            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1220        },
1221        lsp,
1222        project::{DiagnosticSummary, Project, ProjectPath},
1223    };
1224
1225    #[gpui::test]
1226    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1227        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1228        let lang_registry = Arc::new(LanguageRegistry::new());
1229        let fs = Arc::new(FakeFs::new(cx_a.background()));
1230        cx_a.foreground().forbid_parking();
1231
1232        // Connect to a server as 2 clients.
1233        let mut server = TestServer::start(cx_a.foreground()).await;
1234        let client_a = server.create_client(&mut cx_a, "user_a").await;
1235        let client_b = server.create_client(&mut cx_b, "user_b").await;
1236
1237        // Share a project as client A
1238        fs.insert_tree(
1239            "/a",
1240            json!({
1241                ".zed.toml": r#"collaborators = ["user_b"]"#,
1242                "a.txt": "a-contents",
1243                "b.txt": "b-contents",
1244            }),
1245        )
1246        .await;
1247        let project_a = cx_a.update(|cx| {
1248            Project::local(
1249                client_a.clone(),
1250                client_a.user_store.clone(),
1251                lang_registry.clone(),
1252                fs.clone(),
1253                cx,
1254            )
1255        });
1256        let (worktree_a, _) = project_a
1257            .update(&mut cx_a, |p, cx| {
1258                p.find_or_create_local_worktree("/a", false, cx)
1259            })
1260            .await
1261            .unwrap();
1262        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1263        worktree_a
1264            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1265            .await;
1266        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1267        project_a
1268            .update(&mut cx_a, |p, cx| p.share(cx))
1269            .await
1270            .unwrap();
1271
1272        // Join that project as client B
1273        let project_b = Project::remote(
1274            project_id,
1275            client_b.clone(),
1276            client_b.user_store.clone(),
1277            lang_registry.clone(),
1278            fs.clone(),
1279            &mut cx_b.to_async(),
1280        )
1281        .await
1282        .unwrap();
1283
1284        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1285            assert_eq!(
1286                project
1287                    .collaborators()
1288                    .get(&client_a.peer_id)
1289                    .unwrap()
1290                    .user
1291                    .github_login,
1292                "user_a"
1293            );
1294            project.replica_id()
1295        });
1296        project_a
1297            .condition(&cx_a, |tree, _| {
1298                tree.collaborators()
1299                    .get(&client_b.peer_id)
1300                    .map_or(false, |collaborator| {
1301                        collaborator.replica_id == replica_id_b
1302                            && collaborator.user.github_login == "user_b"
1303                    })
1304            })
1305            .await;
1306
1307        // Open the same file as client B and client A.
1308        let buffer_b = project_b
1309            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1310            .await
1311            .unwrap();
1312        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1313        buffer_b.read_with(&cx_b, |buf, cx| {
1314            assert_eq!(buf.read(cx).text(), "b-contents")
1315        });
1316        project_a.read_with(&cx_a, |project, cx| {
1317            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1318        });
1319        let buffer_a = project_a
1320            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1321            .await
1322            .unwrap();
1323
1324        let editor_b = cx_b.add_view(window_b, |cx| {
1325            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1326        });
1327
1328        // TODO
1329        // // Create a selection set as client B and see that selection set as client A.
1330        // buffer_a
1331        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1332        //     .await;
1333
1334        // Edit the buffer as client B and see that edit as client A.
1335        editor_b.update(&mut cx_b, |editor, cx| {
1336            editor.handle_input(&Input("ok, ".into()), cx)
1337        });
1338        buffer_a
1339            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1340            .await;
1341
1342        // TODO
1343        // // Remove the selection set as client B, see those selections disappear as client A.
1344        cx_b.update(move |_| drop(editor_b));
1345        // buffer_a
1346        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1347        //     .await;
1348
1349        // Close the buffer as client A, see that the buffer is closed.
1350        cx_a.update(move |_| drop(buffer_a));
1351        project_a
1352            .condition(&cx_a, |project, cx| {
1353                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1354            })
1355            .await;
1356
1357        // Dropping the client B's project removes client B from client A's collaborators.
1358        cx_b.update(move |_| drop(project_b));
1359        project_a
1360            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1361            .await;
1362    }
1363
1364    #[gpui::test]
1365    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1366        let lang_registry = Arc::new(LanguageRegistry::new());
1367        let fs = Arc::new(FakeFs::new(cx_a.background()));
1368        cx_a.foreground().forbid_parking();
1369
1370        // Connect to a server as 2 clients.
1371        let mut server = TestServer::start(cx_a.foreground()).await;
1372        let client_a = server.create_client(&mut cx_a, "user_a").await;
1373        let client_b = server.create_client(&mut cx_b, "user_b").await;
1374
1375        // Share a project as client A
1376        fs.insert_tree(
1377            "/a",
1378            json!({
1379                ".zed.toml": r#"collaborators = ["user_b"]"#,
1380                "a.txt": "a-contents",
1381                "b.txt": "b-contents",
1382            }),
1383        )
1384        .await;
1385        let project_a = cx_a.update(|cx| {
1386            Project::local(
1387                client_a.clone(),
1388                client_a.user_store.clone(),
1389                lang_registry.clone(),
1390                fs.clone(),
1391                cx,
1392            )
1393        });
1394        let (worktree_a, _) = project_a
1395            .update(&mut cx_a, |p, cx| {
1396                p.find_or_create_local_worktree("/a", false, cx)
1397            })
1398            .await
1399            .unwrap();
1400        worktree_a
1401            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1402            .await;
1403        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1404        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1405        project_a
1406            .update(&mut cx_a, |p, cx| p.share(cx))
1407            .await
1408            .unwrap();
1409        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1410
1411        // Join that project as client B
1412        let project_b = Project::remote(
1413            project_id,
1414            client_b.clone(),
1415            client_b.user_store.clone(),
1416            lang_registry.clone(),
1417            fs.clone(),
1418            &mut cx_b.to_async(),
1419        )
1420        .await
1421        .unwrap();
1422        project_b
1423            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1424            .await
1425            .unwrap();
1426
1427        // Unshare the project as client A
1428        project_a
1429            .update(&mut cx_a, |project, cx| project.unshare(cx))
1430            .await
1431            .unwrap();
1432        project_b
1433            .condition(&mut cx_b, |project, _| project.is_read_only())
1434            .await;
1435        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1436        drop(project_b);
1437
1438        // Share the project again and ensure guests can still join.
1439        project_a
1440            .update(&mut cx_a, |project, cx| project.share(cx))
1441            .await
1442            .unwrap();
1443        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1444
1445        let project_c = Project::remote(
1446            project_id,
1447            client_b.clone(),
1448            client_b.user_store.clone(),
1449            lang_registry.clone(),
1450            fs.clone(),
1451            &mut cx_b.to_async(),
1452        )
1453        .await
1454        .unwrap();
1455        project_c
1456            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1457            .await
1458            .unwrap();
1459    }
1460
1461    #[gpui::test]
1462    async fn test_propagate_saves_and_fs_changes(
1463        mut cx_a: TestAppContext,
1464        mut cx_b: TestAppContext,
1465        mut cx_c: TestAppContext,
1466    ) {
1467        let lang_registry = Arc::new(LanguageRegistry::new());
1468        let fs = Arc::new(FakeFs::new(cx_a.background()));
1469        cx_a.foreground().forbid_parking();
1470
1471        // Connect to a server as 3 clients.
1472        let mut server = TestServer::start(cx_a.foreground()).await;
1473        let client_a = server.create_client(&mut cx_a, "user_a").await;
1474        let client_b = server.create_client(&mut cx_b, "user_b").await;
1475        let client_c = server.create_client(&mut cx_c, "user_c").await;
1476
1477        // Share a worktree as client A.
1478        fs.insert_tree(
1479            "/a",
1480            json!({
1481                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1482                "file1": "",
1483                "file2": ""
1484            }),
1485        )
1486        .await;
1487        let project_a = cx_a.update(|cx| {
1488            Project::local(
1489                client_a.clone(),
1490                client_a.user_store.clone(),
1491                lang_registry.clone(),
1492                fs.clone(),
1493                cx,
1494            )
1495        });
1496        let (worktree_a, _) = project_a
1497            .update(&mut cx_a, |p, cx| {
1498                p.find_or_create_local_worktree("/a", false, cx)
1499            })
1500            .await
1501            .unwrap();
1502        worktree_a
1503            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1504            .await;
1505        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1506        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1507        project_a
1508            .update(&mut cx_a, |p, cx| p.share(cx))
1509            .await
1510            .unwrap();
1511
1512        // Join that worktree as clients B and C.
1513        let project_b = Project::remote(
1514            project_id,
1515            client_b.clone(),
1516            client_b.user_store.clone(),
1517            lang_registry.clone(),
1518            fs.clone(),
1519            &mut cx_b.to_async(),
1520        )
1521        .await
1522        .unwrap();
1523        let project_c = Project::remote(
1524            project_id,
1525            client_c.clone(),
1526            client_c.user_store.clone(),
1527            lang_registry.clone(),
1528            fs.clone(),
1529            &mut cx_c.to_async(),
1530        )
1531        .await
1532        .unwrap();
1533        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1534        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1535
1536        // Open and edit a buffer as both guests B and C.
1537        let buffer_b = project_b
1538            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1539            .await
1540            .unwrap();
1541        let buffer_c = project_c
1542            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1543            .await
1544            .unwrap();
1545        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1546        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1547
1548        // Open and edit that buffer as the host.
1549        let buffer_a = project_a
1550            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1551            .await
1552            .unwrap();
1553
1554        buffer_a
1555            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1556            .await;
1557        buffer_a.update(&mut cx_a, |buf, cx| {
1558            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1559        });
1560
1561        // Wait for edits to propagate
1562        buffer_a
1563            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1564            .await;
1565        buffer_b
1566            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1567            .await;
1568        buffer_c
1569            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1570            .await;
1571
1572        // Edit the buffer as the host and concurrently save as guest B.
1573        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1574        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1575        save_b.await.unwrap();
1576        assert_eq!(
1577            fs.load("/a/file1".as_ref()).await.unwrap(),
1578            "hi-a, i-am-c, i-am-b, i-am-a"
1579        );
1580        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1581        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1582        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1583
1584        // Make changes on host's file system, see those changes on guest worktrees.
1585        fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1586            .await
1587            .unwrap();
1588        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1589            .await
1590            .unwrap();
1591        fs.insert_file(Path::new("/a/file4"), "4".into())
1592            .await
1593            .unwrap();
1594
1595        worktree_a
1596            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1597            .await;
1598        worktree_b
1599            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1600            .await;
1601        worktree_c
1602            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1603            .await;
1604        worktree_a.read_with(&cx_a, |tree, _| {
1605            assert_eq!(
1606                tree.paths()
1607                    .map(|p| p.to_string_lossy())
1608                    .collect::<Vec<_>>(),
1609                &[".zed.toml", "file1-renamed", "file3", "file4"]
1610            )
1611        });
1612        worktree_b.read_with(&cx_b, |tree, _| {
1613            assert_eq!(
1614                tree.paths()
1615                    .map(|p| p.to_string_lossy())
1616                    .collect::<Vec<_>>(),
1617                &[".zed.toml", "file1-renamed", "file3", "file4"]
1618            )
1619        });
1620        worktree_c.read_with(&cx_c, |tree, _| {
1621            assert_eq!(
1622                tree.paths()
1623                    .map(|p| p.to_string_lossy())
1624                    .collect::<Vec<_>>(),
1625                &[".zed.toml", "file1-renamed", "file3", "file4"]
1626            )
1627        });
1628
1629        // Ensure buffer files are updated as well.
1630        buffer_a
1631            .condition(&cx_a, |buf, _| {
1632                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1633            })
1634            .await;
1635        buffer_b
1636            .condition(&cx_b, |buf, _| {
1637                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1638            })
1639            .await;
1640        buffer_c
1641            .condition(&cx_c, |buf, _| {
1642                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1643            })
1644            .await;
1645    }
1646
1647    #[gpui::test]
1648    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1649        cx_a.foreground().forbid_parking();
1650        let lang_registry = Arc::new(LanguageRegistry::new());
1651        let fs = Arc::new(FakeFs::new(cx_a.background()));
1652
1653        // Connect to a server as 2 clients.
1654        let mut server = TestServer::start(cx_a.foreground()).await;
1655        let client_a = server.create_client(&mut cx_a, "user_a").await;
1656        let client_b = server.create_client(&mut cx_b, "user_b").await;
1657
1658        // Share a project as client A
1659        fs.insert_tree(
1660            "/dir",
1661            json!({
1662                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1663                "a.txt": "a-contents",
1664            }),
1665        )
1666        .await;
1667
1668        let project_a = cx_a.update(|cx| {
1669            Project::local(
1670                client_a.clone(),
1671                client_a.user_store.clone(),
1672                lang_registry.clone(),
1673                fs.clone(),
1674                cx,
1675            )
1676        });
1677        let (worktree_a, _) = project_a
1678            .update(&mut cx_a, |p, cx| {
1679                p.find_or_create_local_worktree("/dir", false, cx)
1680            })
1681            .await
1682            .unwrap();
1683        worktree_a
1684            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1685            .await;
1686        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1687        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1688        project_a
1689            .update(&mut cx_a, |p, cx| p.share(cx))
1690            .await
1691            .unwrap();
1692
1693        // Join that project as client B
1694        let project_b = Project::remote(
1695            project_id,
1696            client_b.clone(),
1697            client_b.user_store.clone(),
1698            lang_registry.clone(),
1699            fs.clone(),
1700            &mut cx_b.to_async(),
1701        )
1702        .await
1703        .unwrap();
1704        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1705
1706        // Open a buffer as client B
1707        let buffer_b = project_b
1708            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1709            .await
1710            .unwrap();
1711        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1712
1713        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1714        buffer_b.read_with(&cx_b, |buf, _| {
1715            assert!(buf.is_dirty());
1716            assert!(!buf.has_conflict());
1717        });
1718
1719        buffer_b
1720            .update(&mut cx_b, |buf, cx| buf.save(cx))
1721            .await
1722            .unwrap();
1723        worktree_b
1724            .condition(&cx_b, |_, cx| {
1725                buffer_b.read(cx).file().unwrap().mtime() != mtime
1726            })
1727            .await;
1728        buffer_b.read_with(&cx_b, |buf, _| {
1729            assert!(!buf.is_dirty());
1730            assert!(!buf.has_conflict());
1731        });
1732
1733        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1734        buffer_b.read_with(&cx_b, |buf, _| {
1735            assert!(buf.is_dirty());
1736            assert!(!buf.has_conflict());
1737        });
1738    }
1739
1740    #[gpui::test]
1741    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1742        cx_a.foreground().forbid_parking();
1743        let lang_registry = Arc::new(LanguageRegistry::new());
1744        let fs = Arc::new(FakeFs::new(cx_a.background()));
1745
1746        // Connect to a server as 2 clients.
1747        let mut server = TestServer::start(cx_a.foreground()).await;
1748        let client_a = server.create_client(&mut cx_a, "user_a").await;
1749        let client_b = server.create_client(&mut cx_b, "user_b").await;
1750
1751        // Share a project as client A
1752        fs.insert_tree(
1753            "/dir",
1754            json!({
1755                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1756                "a.txt": "a-contents",
1757            }),
1758        )
1759        .await;
1760
1761        let project_a = cx_a.update(|cx| {
1762            Project::local(
1763                client_a.clone(),
1764                client_a.user_store.clone(),
1765                lang_registry.clone(),
1766                fs.clone(),
1767                cx,
1768            )
1769        });
1770        let (worktree_a, _) = project_a
1771            .update(&mut cx_a, |p, cx| {
1772                p.find_or_create_local_worktree("/dir", false, cx)
1773            })
1774            .await
1775            .unwrap();
1776        worktree_a
1777            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1778            .await;
1779        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1780        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1781        project_a
1782            .update(&mut cx_a, |p, cx| p.share(cx))
1783            .await
1784            .unwrap();
1785
1786        // Join that project as client B
1787        let project_b = Project::remote(
1788            project_id,
1789            client_b.clone(),
1790            client_b.user_store.clone(),
1791            lang_registry.clone(),
1792            fs.clone(),
1793            &mut cx_b.to_async(),
1794        )
1795        .await
1796        .unwrap();
1797        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1798
1799        // Open a buffer as client B
1800        let buffer_b = project_b
1801            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1802            .await
1803            .unwrap();
1804        buffer_b.read_with(&cx_b, |buf, _| {
1805            assert!(!buf.is_dirty());
1806            assert!(!buf.has_conflict());
1807        });
1808
1809        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1810            .await
1811            .unwrap();
1812        buffer_b
1813            .condition(&cx_b, |buf, _| {
1814                buf.text() == "new contents" && !buf.is_dirty()
1815            })
1816            .await;
1817        buffer_b.read_with(&cx_b, |buf, _| {
1818            assert!(!buf.has_conflict());
1819        });
1820    }
1821
1822    #[gpui::test(iterations = 100)]
1823    async fn test_editing_while_guest_opens_buffer(
1824        mut cx_a: TestAppContext,
1825        mut cx_b: TestAppContext,
1826    ) {
1827        cx_a.foreground().forbid_parking();
1828        let lang_registry = Arc::new(LanguageRegistry::new());
1829        let fs = Arc::new(FakeFs::new(cx_a.background()));
1830
1831        // Connect to a server as 2 clients.
1832        let mut server = TestServer::start(cx_a.foreground()).await;
1833        let client_a = server.create_client(&mut cx_a, "user_a").await;
1834        let client_b = server.create_client(&mut cx_b, "user_b").await;
1835
1836        // Share a project as client A
1837        fs.insert_tree(
1838            "/dir",
1839            json!({
1840                ".zed.toml": r#"collaborators = ["user_b"]"#,
1841                "a.txt": "a-contents",
1842            }),
1843        )
1844        .await;
1845        let project_a = cx_a.update(|cx| {
1846            Project::local(
1847                client_a.clone(),
1848                client_a.user_store.clone(),
1849                lang_registry.clone(),
1850                fs.clone(),
1851                cx,
1852            )
1853        });
1854        let (worktree_a, _) = project_a
1855            .update(&mut cx_a, |p, cx| {
1856                p.find_or_create_local_worktree("/dir", false, cx)
1857            })
1858            .await
1859            .unwrap();
1860        worktree_a
1861            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1862            .await;
1863        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1864        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1865        project_a
1866            .update(&mut cx_a, |p, cx| p.share(cx))
1867            .await
1868            .unwrap();
1869
1870        // Join that project as client B
1871        let project_b = Project::remote(
1872            project_id,
1873            client_b.clone(),
1874            client_b.user_store.clone(),
1875            lang_registry.clone(),
1876            fs.clone(),
1877            &mut cx_b.to_async(),
1878        )
1879        .await
1880        .unwrap();
1881
1882        // Open a buffer as client A
1883        let buffer_a = project_a
1884            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1885            .await
1886            .unwrap();
1887
1888        // Start opening the same buffer as client B
1889        let buffer_b = cx_b
1890            .background()
1891            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1892        task::yield_now().await;
1893
1894        // Edit the buffer as client A while client B is still opening it.
1895        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1896
1897        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1898        let buffer_b = buffer_b.await.unwrap();
1899        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1900    }
1901
1902    #[gpui::test]
1903    async fn test_leaving_worktree_while_opening_buffer(
1904        mut cx_a: TestAppContext,
1905        mut cx_b: TestAppContext,
1906    ) {
1907        cx_a.foreground().forbid_parking();
1908        let lang_registry = Arc::new(LanguageRegistry::new());
1909        let fs = Arc::new(FakeFs::new(cx_a.background()));
1910
1911        // Connect to a server as 2 clients.
1912        let mut server = TestServer::start(cx_a.foreground()).await;
1913        let client_a = server.create_client(&mut cx_a, "user_a").await;
1914        let client_b = server.create_client(&mut cx_b, "user_b").await;
1915
1916        // Share a project as client A
1917        fs.insert_tree(
1918            "/dir",
1919            json!({
1920                ".zed.toml": r#"collaborators = ["user_b"]"#,
1921                "a.txt": "a-contents",
1922            }),
1923        )
1924        .await;
1925        let project_a = cx_a.update(|cx| {
1926            Project::local(
1927                client_a.clone(),
1928                client_a.user_store.clone(),
1929                lang_registry.clone(),
1930                fs.clone(),
1931                cx,
1932            )
1933        });
1934        let (worktree_a, _) = project_a
1935            .update(&mut cx_a, |p, cx| {
1936                p.find_or_create_local_worktree("/dir", false, cx)
1937            })
1938            .await
1939            .unwrap();
1940        worktree_a
1941            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1942            .await;
1943        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1944        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1945        project_a
1946            .update(&mut cx_a, |p, cx| p.share(cx))
1947            .await
1948            .unwrap();
1949
1950        // Join that project as client B
1951        let project_b = Project::remote(
1952            project_id,
1953            client_b.clone(),
1954            client_b.user_store.clone(),
1955            lang_registry.clone(),
1956            fs.clone(),
1957            &mut cx_b.to_async(),
1958        )
1959        .await
1960        .unwrap();
1961
1962        // See that a guest has joined as client A.
1963        project_a
1964            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1965            .await;
1966
1967        // Begin opening a buffer as client B, but leave the project before the open completes.
1968        let buffer_b = cx_b
1969            .background()
1970            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1971        cx_b.update(|_| drop(project_b));
1972        drop(buffer_b);
1973
1974        // See that the guest has left.
1975        project_a
1976            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1977            .await;
1978    }
1979
1980    #[gpui::test]
1981    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1982        cx_a.foreground().forbid_parking();
1983        let lang_registry = Arc::new(LanguageRegistry::new());
1984        let fs = Arc::new(FakeFs::new(cx_a.background()));
1985
1986        // Connect to a server as 2 clients.
1987        let mut server = TestServer::start(cx_a.foreground()).await;
1988        let client_a = server.create_client(&mut cx_a, "user_a").await;
1989        let client_b = server.create_client(&mut cx_b, "user_b").await;
1990
1991        // Share a project as client A
1992        fs.insert_tree(
1993            "/a",
1994            json!({
1995                ".zed.toml": r#"collaborators = ["user_b"]"#,
1996                "a.txt": "a-contents",
1997                "b.txt": "b-contents",
1998            }),
1999        )
2000        .await;
2001        let project_a = cx_a.update(|cx| {
2002            Project::local(
2003                client_a.clone(),
2004                client_a.user_store.clone(),
2005                lang_registry.clone(),
2006                fs.clone(),
2007                cx,
2008            )
2009        });
2010        let (worktree_a, _) = project_a
2011            .update(&mut cx_a, |p, cx| {
2012                p.find_or_create_local_worktree("/a", false, cx)
2013            })
2014            .await
2015            .unwrap();
2016        worktree_a
2017            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2018            .await;
2019        let project_id = project_a
2020            .update(&mut cx_a, |project, _| project.next_remote_id())
2021            .await;
2022        project_a
2023            .update(&mut cx_a, |project, cx| project.share(cx))
2024            .await
2025            .unwrap();
2026
2027        // Join that project as client B
2028        let _project_b = Project::remote(
2029            project_id,
2030            client_b.clone(),
2031            client_b.user_store.clone(),
2032            lang_registry.clone(),
2033            fs.clone(),
2034            &mut cx_b.to_async(),
2035        )
2036        .await
2037        .unwrap();
2038
2039        // See that a guest has joined as client A.
2040        project_a
2041            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2042            .await;
2043
2044        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2045        client_b.disconnect(&cx_b.to_async()).unwrap();
2046        project_a
2047            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2048            .await;
2049    }
2050
2051    #[gpui::test]
2052    async fn test_collaborating_with_diagnostics(
2053        mut cx_a: TestAppContext,
2054        mut cx_b: TestAppContext,
2055    ) {
2056        cx_a.foreground().forbid_parking();
2057        let mut lang_registry = Arc::new(LanguageRegistry::new());
2058        let fs = Arc::new(FakeFs::new(cx_a.background()));
2059
2060        // Set up a fake language server.
2061        let (language_server_config, mut fake_language_server) =
2062            LanguageServerConfig::fake(cx_a.background()).await;
2063        Arc::get_mut(&mut lang_registry)
2064            .unwrap()
2065            .add(Arc::new(Language::new(
2066                LanguageConfig {
2067                    name: "Rust".to_string(),
2068                    path_suffixes: vec!["rs".to_string()],
2069                    language_server: Some(language_server_config),
2070                    ..Default::default()
2071                },
2072                Some(tree_sitter_rust::language()),
2073            )));
2074
2075        // Connect to a server as 2 clients.
2076        let mut server = TestServer::start(cx_a.foreground()).await;
2077        let client_a = server.create_client(&mut cx_a, "user_a").await;
2078        let client_b = server.create_client(&mut cx_b, "user_b").await;
2079
2080        // Share a project as client A
2081        fs.insert_tree(
2082            "/a",
2083            json!({
2084                ".zed.toml": r#"collaborators = ["user_b"]"#,
2085                "a.rs": "let one = two",
2086                "other.rs": "",
2087            }),
2088        )
2089        .await;
2090        let project_a = cx_a.update(|cx| {
2091            Project::local(
2092                client_a.clone(),
2093                client_a.user_store.clone(),
2094                lang_registry.clone(),
2095                fs.clone(),
2096                cx,
2097            )
2098        });
2099        let (worktree_a, _) = project_a
2100            .update(&mut cx_a, |p, cx| {
2101                p.find_or_create_local_worktree("/a", false, cx)
2102            })
2103            .await
2104            .unwrap();
2105        worktree_a
2106            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2107            .await;
2108        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2109        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2110        project_a
2111            .update(&mut cx_a, |p, cx| p.share(cx))
2112            .await
2113            .unwrap();
2114
2115        // Cause the language server to start.
2116        let _ = cx_a
2117            .background()
2118            .spawn(project_a.update(&mut cx_a, |project, cx| {
2119                project.open_buffer(
2120                    ProjectPath {
2121                        worktree_id,
2122                        path: Path::new("other.rs").into(),
2123                    },
2124                    cx,
2125                )
2126            }))
2127            .await
2128            .unwrap();
2129
2130        // Simulate a language server reporting errors for a file.
2131        fake_language_server
2132            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2133                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2134                version: None,
2135                diagnostics: vec![lsp::Diagnostic {
2136                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2137                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2138                    message: "message 1".to_string(),
2139                    ..Default::default()
2140                }],
2141            })
2142            .await;
2143
2144        // Wait for server to see the diagnostics update.
2145        server
2146            .condition(|store| {
2147                let worktree = store
2148                    .project(project_id)
2149                    .unwrap()
2150                    .worktrees
2151                    .get(&worktree_id.to_proto())
2152                    .unwrap();
2153
2154                !worktree
2155                    .share
2156                    .as_ref()
2157                    .unwrap()
2158                    .diagnostic_summaries
2159                    .is_empty()
2160            })
2161            .await;
2162
2163        // Join the worktree as client B.
2164        let project_b = Project::remote(
2165            project_id,
2166            client_b.clone(),
2167            client_b.user_store.clone(),
2168            lang_registry.clone(),
2169            fs.clone(),
2170            &mut cx_b.to_async(),
2171        )
2172        .await
2173        .unwrap();
2174
2175        project_b.read_with(&cx_b, |project, cx| {
2176            assert_eq!(
2177                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2178                &[(
2179                    ProjectPath {
2180                        worktree_id,
2181                        path: Arc::from(Path::new("a.rs")),
2182                    },
2183                    DiagnosticSummary {
2184                        error_count: 1,
2185                        warning_count: 0,
2186                        ..Default::default()
2187                    },
2188                )]
2189            )
2190        });
2191
2192        // Simulate a language server reporting more errors for a file.
2193        fake_language_server
2194            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2195                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2196                version: None,
2197                diagnostics: vec![
2198                    lsp::Diagnostic {
2199                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2200                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2201                        message: "message 1".to_string(),
2202                        ..Default::default()
2203                    },
2204                    lsp::Diagnostic {
2205                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2206                        range: lsp::Range::new(
2207                            lsp::Position::new(0, 10),
2208                            lsp::Position::new(0, 13),
2209                        ),
2210                        message: "message 2".to_string(),
2211                        ..Default::default()
2212                    },
2213                ],
2214            })
2215            .await;
2216
2217        // Client b gets the updated summaries
2218        project_b
2219            .condition(&cx_b, |project, cx| {
2220                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2221                    == &[(
2222                        ProjectPath {
2223                            worktree_id,
2224                            path: Arc::from(Path::new("a.rs")),
2225                        },
2226                        DiagnosticSummary {
2227                            error_count: 1,
2228                            warning_count: 1,
2229                            ..Default::default()
2230                        },
2231                    )]
2232            })
2233            .await;
2234
2235        // Open the file with the errors on client B. They should be present.
2236        let buffer_b = cx_b
2237            .background()
2238            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2239            .await
2240            .unwrap();
2241
2242        buffer_b.read_with(&cx_b, |buffer, _| {
2243            assert_eq!(
2244                buffer
2245                    .snapshot()
2246                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2247                    .map(|entry| entry)
2248                    .collect::<Vec<_>>(),
2249                &[
2250                    DiagnosticEntry {
2251                        range: Point::new(0, 4)..Point::new(0, 7),
2252                        diagnostic: Diagnostic {
2253                            group_id: 0,
2254                            message: "message 1".to_string(),
2255                            severity: lsp::DiagnosticSeverity::ERROR,
2256                            is_primary: true,
2257                            ..Default::default()
2258                        }
2259                    },
2260                    DiagnosticEntry {
2261                        range: Point::new(0, 10)..Point::new(0, 13),
2262                        diagnostic: Diagnostic {
2263                            group_id: 1,
2264                            severity: lsp::DiagnosticSeverity::WARNING,
2265                            message: "message 2".to_string(),
2266                            is_primary: true,
2267                            ..Default::default()
2268                        }
2269                    }
2270                ]
2271            );
2272        });
2273    }
2274
2275    #[gpui::test]
2276    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2277        cx_a.foreground().forbid_parking();
2278        let mut lang_registry = Arc::new(LanguageRegistry::new());
2279        let fs = Arc::new(FakeFs::new(cx_a.background()));
2280
2281        // Set up a fake language server.
2282        let (language_server_config, mut fake_language_server) =
2283            LanguageServerConfig::fake(cx_a.background()).await;
2284        Arc::get_mut(&mut lang_registry)
2285            .unwrap()
2286            .add(Arc::new(Language::new(
2287                LanguageConfig {
2288                    name: "Rust".to_string(),
2289                    path_suffixes: vec!["rs".to_string()],
2290                    language_server: Some(language_server_config),
2291                    ..Default::default()
2292                },
2293                Some(tree_sitter_rust::language()),
2294            )));
2295
2296        // Connect to a server as 2 clients.
2297        let mut server = TestServer::start(cx_a.foreground()).await;
2298        let client_a = server.create_client(&mut cx_a, "user_a").await;
2299        let client_b = server.create_client(&mut cx_b, "user_b").await;
2300
2301        // Share a project as client A
2302        fs.insert_tree(
2303            "/a",
2304            json!({
2305                ".zed.toml": r#"collaborators = ["user_b"]"#,
2306                "a.rs": "let one = two",
2307            }),
2308        )
2309        .await;
2310        let project_a = cx_a.update(|cx| {
2311            Project::local(
2312                client_a.clone(),
2313                client_a.user_store.clone(),
2314                lang_registry.clone(),
2315                fs.clone(),
2316                cx,
2317            )
2318        });
2319        let (worktree_a, _) = project_a
2320            .update(&mut cx_a, |p, cx| {
2321                p.find_or_create_local_worktree("/a", false, cx)
2322            })
2323            .await
2324            .unwrap();
2325        worktree_a
2326            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2327            .await;
2328        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2329        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2330        project_a
2331            .update(&mut cx_a, |p, cx| p.share(cx))
2332            .await
2333            .unwrap();
2334
2335        // Join the worktree as client B.
2336        let project_b = Project::remote(
2337            project_id,
2338            client_b.clone(),
2339            client_b.user_store.clone(),
2340            lang_registry.clone(),
2341            fs.clone(),
2342            &mut cx_b.to_async(),
2343        )
2344        .await
2345        .unwrap();
2346
2347        let buffer_b = cx_b
2348            .background()
2349            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2350            .await
2351            .unwrap();
2352
2353        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2354        let (request_id, _) = fake_language_server
2355            .receive_request::<lsp::request::Formatting>()
2356            .await;
2357        fake_language_server
2358            .respond(
2359                request_id,
2360                Some(vec![
2361                    lsp::TextEdit {
2362                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2363                        new_text: "h".to_string(),
2364                    },
2365                    lsp::TextEdit {
2366                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2367                        new_text: "y".to_string(),
2368                    },
2369                ]),
2370            )
2371            .await;
2372        format.await.unwrap();
2373        assert_eq!(
2374            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2375            "let honey = two"
2376        );
2377    }
2378
2379    #[gpui::test]
2380    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2381        cx_a.foreground().forbid_parking();
2382        let mut lang_registry = Arc::new(LanguageRegistry::new());
2383        let fs = Arc::new(FakeFs::new(cx_a.background()));
2384        fs.insert_tree(
2385            "/root-1",
2386            json!({
2387                ".zed.toml": r#"collaborators = ["user_b"]"#,
2388                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2389            }),
2390        )
2391        .await;
2392        fs.insert_tree(
2393            "/root-2",
2394            json!({
2395                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2396            }),
2397        )
2398        .await;
2399
2400        // Set up a fake language server.
2401        let (language_server_config, mut fake_language_server) =
2402            LanguageServerConfig::fake(cx_a.background()).await;
2403        Arc::get_mut(&mut lang_registry)
2404            .unwrap()
2405            .add(Arc::new(Language::new(
2406                LanguageConfig {
2407                    name: "Rust".to_string(),
2408                    path_suffixes: vec!["rs".to_string()],
2409                    language_server: Some(language_server_config),
2410                    ..Default::default()
2411                },
2412                Some(tree_sitter_rust::language()),
2413            )));
2414
2415        // Connect to a server as 2 clients.
2416        let mut server = TestServer::start(cx_a.foreground()).await;
2417        let client_a = server.create_client(&mut cx_a, "user_a").await;
2418        let client_b = server.create_client(&mut cx_b, "user_b").await;
2419
2420        // Share a project as client A
2421        let project_a = cx_a.update(|cx| {
2422            Project::local(
2423                client_a.clone(),
2424                client_a.user_store.clone(),
2425                lang_registry.clone(),
2426                fs.clone(),
2427                cx,
2428            )
2429        });
2430        let (worktree_a, _) = project_a
2431            .update(&mut cx_a, |p, cx| {
2432                p.find_or_create_local_worktree("/root-1", false, cx)
2433            })
2434            .await
2435            .unwrap();
2436        worktree_a
2437            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2438            .await;
2439        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2440        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2441        project_a
2442            .update(&mut cx_a, |p, cx| p.share(cx))
2443            .await
2444            .unwrap();
2445
2446        // Join the worktree as client B.
2447        let project_b = Project::remote(
2448            project_id,
2449            client_b.clone(),
2450            client_b.user_store.clone(),
2451            lang_registry.clone(),
2452            fs.clone(),
2453            &mut cx_b.to_async(),
2454        )
2455        .await
2456        .unwrap();
2457
2458        // Open the file to be formatted on client B.
2459        let buffer_b = cx_b
2460            .background()
2461            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2462            .await
2463            .unwrap();
2464
2465        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2466        let (request_id, _) = fake_language_server
2467            .receive_request::<lsp::request::GotoDefinition>()
2468            .await;
2469        fake_language_server
2470            .respond(
2471                request_id,
2472                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2473                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2474                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2475                ))),
2476            )
2477            .await;
2478        let definitions_1 = definitions_1.await.unwrap();
2479        cx_b.read(|cx| {
2480            assert_eq!(definitions_1.len(), 1);
2481            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2482            let target_buffer = definitions_1[0].target_buffer.read(cx);
2483            assert_eq!(
2484                target_buffer.text(),
2485                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2486            );
2487            assert_eq!(
2488                definitions_1[0].target_range.to_point(target_buffer),
2489                Point::new(0, 6)..Point::new(0, 9)
2490            );
2491        });
2492
2493        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2494        // the previous call to `definition`.
2495        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2496        let (request_id, _) = fake_language_server
2497            .receive_request::<lsp::request::GotoDefinition>()
2498            .await;
2499        fake_language_server
2500            .respond(
2501                request_id,
2502                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2503                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2504                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2505                ))),
2506            )
2507            .await;
2508        let definitions_2 = definitions_2.await.unwrap();
2509        cx_b.read(|cx| {
2510            assert_eq!(definitions_2.len(), 1);
2511            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2512            let target_buffer = definitions_2[0].target_buffer.read(cx);
2513            assert_eq!(
2514                target_buffer.text(),
2515                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2516            );
2517            assert_eq!(
2518                definitions_2[0].target_range.to_point(target_buffer),
2519                Point::new(1, 6)..Point::new(1, 11)
2520            );
2521        });
2522        assert_eq!(
2523            definitions_1[0].target_buffer,
2524            definitions_2[0].target_buffer
2525        );
2526
2527        cx_b.update(|_| {
2528            drop(definitions_1);
2529            drop(definitions_2);
2530        });
2531        project_b
2532            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2533            .await;
2534    }
2535
2536    #[gpui::test]
2537    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2538        mut cx_a: TestAppContext,
2539        mut cx_b: TestAppContext,
2540        mut rng: StdRng,
2541    ) {
2542        cx_a.foreground().forbid_parking();
2543        let mut lang_registry = Arc::new(LanguageRegistry::new());
2544        let fs = Arc::new(FakeFs::new(cx_a.background()));
2545        fs.insert_tree(
2546            "/root",
2547            json!({
2548                ".zed.toml": r#"collaborators = ["user_b"]"#,
2549                "a.rs": "const ONE: usize = b::TWO;",
2550                "b.rs": "const TWO: usize = 2",
2551            }),
2552        )
2553        .await;
2554
2555        // Set up a fake language server.
2556        let (language_server_config, mut fake_language_server) =
2557            LanguageServerConfig::fake(cx_a.background()).await;
2558        Arc::get_mut(&mut lang_registry)
2559            .unwrap()
2560            .add(Arc::new(Language::new(
2561                LanguageConfig {
2562                    name: "Rust".to_string(),
2563                    path_suffixes: vec!["rs".to_string()],
2564                    language_server: Some(language_server_config),
2565                    ..Default::default()
2566                },
2567                Some(tree_sitter_rust::language()),
2568            )));
2569
2570        // Connect to a server as 2 clients.
2571        let mut server = TestServer::start(cx_a.foreground()).await;
2572        let client_a = server.create_client(&mut cx_a, "user_a").await;
2573        let client_b = server.create_client(&mut cx_b, "user_b").await;
2574
2575        // Share a project as client A
2576        let project_a = cx_a.update(|cx| {
2577            Project::local(
2578                client_a.clone(),
2579                client_a.user_store.clone(),
2580                lang_registry.clone(),
2581                fs.clone(),
2582                cx,
2583            )
2584        });
2585        let (worktree_a, _) = project_a
2586            .update(&mut cx_a, |p, cx| {
2587                p.find_or_create_local_worktree("/root", false, cx)
2588            })
2589            .await
2590            .unwrap();
2591        worktree_a
2592            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2593            .await;
2594        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2595        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2596        project_a
2597            .update(&mut cx_a, |p, cx| p.share(cx))
2598            .await
2599            .unwrap();
2600
2601        // Join the worktree as client B.
2602        let project_b = Project::remote(
2603            project_id,
2604            client_b.clone(),
2605            client_b.user_store.clone(),
2606            lang_registry.clone(),
2607            fs.clone(),
2608            &mut cx_b.to_async(),
2609        )
2610        .await
2611        .unwrap();
2612
2613        let buffer_b1 = cx_b
2614            .background()
2615            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2616            .await
2617            .unwrap();
2618
2619        let definitions;
2620        let buffer_b2;
2621        if rng.gen() {
2622            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2623            buffer_b2 =
2624                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2625        } else {
2626            buffer_b2 =
2627                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2628            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2629        }
2630
2631        let (request_id, _) = fake_language_server
2632            .receive_request::<lsp::request::GotoDefinition>()
2633            .await;
2634        fake_language_server
2635            .respond(
2636                request_id,
2637                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2638                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2639                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2640                ))),
2641            )
2642            .await;
2643
2644        let buffer_b2 = buffer_b2.await.unwrap();
2645        let definitions = definitions.await.unwrap();
2646        assert_eq!(definitions.len(), 1);
2647        assert_eq!(definitions[0].target_buffer, buffer_b2);
2648    }
2649
2650    #[gpui::test]
2651    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2652        cx_a.foreground().forbid_parking();
2653
2654        // Connect to a server as 2 clients.
2655        let mut server = TestServer::start(cx_a.foreground()).await;
2656        let client_a = server.create_client(&mut cx_a, "user_a").await;
2657        let client_b = server.create_client(&mut cx_b, "user_b").await;
2658
2659        // Create an org that includes these 2 users.
2660        let db = &server.app_state.db;
2661        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2662        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2663            .await
2664            .unwrap();
2665        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2666            .await
2667            .unwrap();
2668
2669        // Create a channel that includes all the users.
2670        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2671        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2672            .await
2673            .unwrap();
2674        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2675            .await
2676            .unwrap();
2677        db.create_channel_message(
2678            channel_id,
2679            client_b.current_user_id(&cx_b),
2680            "hello A, it's B.",
2681            OffsetDateTime::now_utc(),
2682            1,
2683        )
2684        .await
2685        .unwrap();
2686
2687        let channels_a = cx_a
2688            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2689        channels_a
2690            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2691            .await;
2692        channels_a.read_with(&cx_a, |list, _| {
2693            assert_eq!(
2694                list.available_channels().unwrap(),
2695                &[ChannelDetails {
2696                    id: channel_id.to_proto(),
2697                    name: "test-channel".to_string()
2698                }]
2699            )
2700        });
2701        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2702            this.get_channel(channel_id.to_proto(), cx).unwrap()
2703        });
2704        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2705        channel_a
2706            .condition(&cx_a, |channel, _| {
2707                channel_messages(channel)
2708                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2709            })
2710            .await;
2711
2712        let channels_b = cx_b
2713            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2714        channels_b
2715            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2716            .await;
2717        channels_b.read_with(&cx_b, |list, _| {
2718            assert_eq!(
2719                list.available_channels().unwrap(),
2720                &[ChannelDetails {
2721                    id: channel_id.to_proto(),
2722                    name: "test-channel".to_string()
2723                }]
2724            )
2725        });
2726
2727        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2728            this.get_channel(channel_id.to_proto(), cx).unwrap()
2729        });
2730        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2731        channel_b
2732            .condition(&cx_b, |channel, _| {
2733                channel_messages(channel)
2734                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2735            })
2736            .await;
2737
2738        channel_a
2739            .update(&mut cx_a, |channel, cx| {
2740                channel
2741                    .send_message("oh, hi B.".to_string(), cx)
2742                    .unwrap()
2743                    .detach();
2744                let task = channel.send_message("sup".to_string(), cx).unwrap();
2745                assert_eq!(
2746                    channel_messages(channel),
2747                    &[
2748                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2749                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2750                        ("user_a".to_string(), "sup".to_string(), true)
2751                    ]
2752                );
2753                task
2754            })
2755            .await
2756            .unwrap();
2757
2758        channel_b
2759            .condition(&cx_b, |channel, _| {
2760                channel_messages(channel)
2761                    == [
2762                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2763                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2764                        ("user_a".to_string(), "sup".to_string(), false),
2765                    ]
2766            })
2767            .await;
2768
2769        assert_eq!(
2770            server
2771                .state()
2772                .await
2773                .channel(channel_id)
2774                .unwrap()
2775                .connection_ids
2776                .len(),
2777            2
2778        );
2779        cx_b.update(|_| drop(channel_b));
2780        server
2781            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2782            .await;
2783
2784        cx_a.update(|_| drop(channel_a));
2785        server
2786            .condition(|state| state.channel(channel_id).is_none())
2787            .await;
2788    }
2789
2790    #[gpui::test]
2791    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2792        cx_a.foreground().forbid_parking();
2793
2794        let mut server = TestServer::start(cx_a.foreground()).await;
2795        let client_a = server.create_client(&mut cx_a, "user_a").await;
2796
2797        let db = &server.app_state.db;
2798        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2799        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2800        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2801            .await
2802            .unwrap();
2803        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2804            .await
2805            .unwrap();
2806
2807        let channels_a = cx_a
2808            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2809        channels_a
2810            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2811            .await;
2812        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2813            this.get_channel(channel_id.to_proto(), cx).unwrap()
2814        });
2815
2816        // Messages aren't allowed to be too long.
2817        channel_a
2818            .update(&mut cx_a, |channel, cx| {
2819                let long_body = "this is long.\n".repeat(1024);
2820                channel.send_message(long_body, cx).unwrap()
2821            })
2822            .await
2823            .unwrap_err();
2824
2825        // Messages aren't allowed to be blank.
2826        channel_a.update(&mut cx_a, |channel, cx| {
2827            channel.send_message(String::new(), cx).unwrap_err()
2828        });
2829
2830        // Leading and trailing whitespace are trimmed.
2831        channel_a
2832            .update(&mut cx_a, |channel, cx| {
2833                channel
2834                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2835                    .unwrap()
2836            })
2837            .await
2838            .unwrap();
2839        assert_eq!(
2840            db.get_channel_messages(channel_id, 10, None)
2841                .await
2842                .unwrap()
2843                .iter()
2844                .map(|m| &m.body)
2845                .collect::<Vec<_>>(),
2846            &["surrounded by whitespace"]
2847        );
2848    }
2849
2850    #[gpui::test]
2851    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2852        cx_a.foreground().forbid_parking();
2853
2854        // Connect to a server as 2 clients.
2855        let mut server = TestServer::start(cx_a.foreground()).await;
2856        let client_a = server.create_client(&mut cx_a, "user_a").await;
2857        let client_b = server.create_client(&mut cx_b, "user_b").await;
2858        let mut status_b = client_b.status();
2859
2860        // Create an org that includes these 2 users.
2861        let db = &server.app_state.db;
2862        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2863        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2864            .await
2865            .unwrap();
2866        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2867            .await
2868            .unwrap();
2869
2870        // Create a channel that includes all the users.
2871        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2872        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2873            .await
2874            .unwrap();
2875        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2876            .await
2877            .unwrap();
2878        db.create_channel_message(
2879            channel_id,
2880            client_b.current_user_id(&cx_b),
2881            "hello A, it's B.",
2882            OffsetDateTime::now_utc(),
2883            2,
2884        )
2885        .await
2886        .unwrap();
2887
2888        let channels_a = cx_a
2889            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2890        channels_a
2891            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2892            .await;
2893
2894        channels_a.read_with(&cx_a, |list, _| {
2895            assert_eq!(
2896                list.available_channels().unwrap(),
2897                &[ChannelDetails {
2898                    id: channel_id.to_proto(),
2899                    name: "test-channel".to_string()
2900                }]
2901            )
2902        });
2903        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2904            this.get_channel(channel_id.to_proto(), cx).unwrap()
2905        });
2906        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2907        channel_a
2908            .condition(&cx_a, |channel, _| {
2909                channel_messages(channel)
2910                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2911            })
2912            .await;
2913
2914        let channels_b = cx_b
2915            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2916        channels_b
2917            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2918            .await;
2919        channels_b.read_with(&cx_b, |list, _| {
2920            assert_eq!(
2921                list.available_channels().unwrap(),
2922                &[ChannelDetails {
2923                    id: channel_id.to_proto(),
2924                    name: "test-channel".to_string()
2925                }]
2926            )
2927        });
2928
2929        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2930            this.get_channel(channel_id.to_proto(), cx).unwrap()
2931        });
2932        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2933        channel_b
2934            .condition(&cx_b, |channel, _| {
2935                channel_messages(channel)
2936                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2937            })
2938            .await;
2939
2940        // Disconnect client B, ensuring we can still access its cached channel data.
2941        server.forbid_connections();
2942        server.disconnect_client(client_b.current_user_id(&cx_b));
2943        while !matches!(
2944            status_b.next().await,
2945            Some(client::Status::ReconnectionError { .. })
2946        ) {}
2947
2948        channels_b.read_with(&cx_b, |channels, _| {
2949            assert_eq!(
2950                channels.available_channels().unwrap(),
2951                [ChannelDetails {
2952                    id: channel_id.to_proto(),
2953                    name: "test-channel".to_string()
2954                }]
2955            )
2956        });
2957        channel_b.read_with(&cx_b, |channel, _| {
2958            assert_eq!(
2959                channel_messages(channel),
2960                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2961            )
2962        });
2963
2964        // Send a message from client B while it is disconnected.
2965        channel_b
2966            .update(&mut cx_b, |channel, cx| {
2967                let task = channel
2968                    .send_message("can you see this?".to_string(), cx)
2969                    .unwrap();
2970                assert_eq!(
2971                    channel_messages(channel),
2972                    &[
2973                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2974                        ("user_b".to_string(), "can you see this?".to_string(), true)
2975                    ]
2976                );
2977                task
2978            })
2979            .await
2980            .unwrap_err();
2981
2982        // Send a message from client A while B is disconnected.
2983        channel_a
2984            .update(&mut cx_a, |channel, cx| {
2985                channel
2986                    .send_message("oh, hi B.".to_string(), cx)
2987                    .unwrap()
2988                    .detach();
2989                let task = channel.send_message("sup".to_string(), cx).unwrap();
2990                assert_eq!(
2991                    channel_messages(channel),
2992                    &[
2993                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2994                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2995                        ("user_a".to_string(), "sup".to_string(), true)
2996                    ]
2997                );
2998                task
2999            })
3000            .await
3001            .unwrap();
3002
3003        // Give client B a chance to reconnect.
3004        server.allow_connections();
3005        cx_b.foreground().advance_clock(Duration::from_secs(10));
3006
3007        // Verify that B sees the new messages upon reconnection, as well as the message client B
3008        // sent while offline.
3009        channel_b
3010            .condition(&cx_b, |channel, _| {
3011                channel_messages(channel)
3012                    == [
3013                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3014                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3015                        ("user_a".to_string(), "sup".to_string(), false),
3016                        ("user_b".to_string(), "can you see this?".to_string(), false),
3017                    ]
3018            })
3019            .await;
3020
3021        // Ensure client A and B can communicate normally after reconnection.
3022        channel_a
3023            .update(&mut cx_a, |channel, cx| {
3024                channel.send_message("you online?".to_string(), cx).unwrap()
3025            })
3026            .await
3027            .unwrap();
3028        channel_b
3029            .condition(&cx_b, |channel, _| {
3030                channel_messages(channel)
3031                    == [
3032                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3033                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3034                        ("user_a".to_string(), "sup".to_string(), false),
3035                        ("user_b".to_string(), "can you see this?".to_string(), false),
3036                        ("user_a".to_string(), "you online?".to_string(), false),
3037                    ]
3038            })
3039            .await;
3040
3041        channel_b
3042            .update(&mut cx_b, |channel, cx| {
3043                channel.send_message("yep".to_string(), cx).unwrap()
3044            })
3045            .await
3046            .unwrap();
3047        channel_a
3048            .condition(&cx_a, |channel, _| {
3049                channel_messages(channel)
3050                    == [
3051                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3052                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3053                        ("user_a".to_string(), "sup".to_string(), false),
3054                        ("user_b".to_string(), "can you see this?".to_string(), false),
3055                        ("user_a".to_string(), "you online?".to_string(), false),
3056                        ("user_b".to_string(), "yep".to_string(), false),
3057                    ]
3058            })
3059            .await;
3060    }
3061
3062    #[gpui::test]
3063    async fn test_contacts(
3064        mut cx_a: TestAppContext,
3065        mut cx_b: TestAppContext,
3066        mut cx_c: TestAppContext,
3067    ) {
3068        cx_a.foreground().forbid_parking();
3069        let lang_registry = Arc::new(LanguageRegistry::new());
3070        let fs = Arc::new(FakeFs::new(cx_a.background()));
3071
3072        // Connect to a server as 3 clients.
3073        let mut server = TestServer::start(cx_a.foreground()).await;
3074        let client_a = server.create_client(&mut cx_a, "user_a").await;
3075        let client_b = server.create_client(&mut cx_b, "user_b").await;
3076        let client_c = server.create_client(&mut cx_c, "user_c").await;
3077
3078        // Share a worktree as client A.
3079        fs.insert_tree(
3080            "/a",
3081            json!({
3082                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3083            }),
3084        )
3085        .await;
3086
3087        let project_a = cx_a.update(|cx| {
3088            Project::local(
3089                client_a.clone(),
3090                client_a.user_store.clone(),
3091                lang_registry.clone(),
3092                fs.clone(),
3093                cx,
3094            )
3095        });
3096        let (worktree_a, _) = project_a
3097            .update(&mut cx_a, |p, cx| {
3098                p.find_or_create_local_worktree("/a", false, cx)
3099            })
3100            .await
3101            .unwrap();
3102        worktree_a
3103            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3104            .await;
3105
3106        client_a
3107            .user_store
3108            .condition(&cx_a, |user_store, _| {
3109                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3110            })
3111            .await;
3112        client_b
3113            .user_store
3114            .condition(&cx_b, |user_store, _| {
3115                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3116            })
3117            .await;
3118        client_c
3119            .user_store
3120            .condition(&cx_c, |user_store, _| {
3121                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3122            })
3123            .await;
3124
3125        let project_id = project_a
3126            .update(&mut cx_a, |project, _| project.next_remote_id())
3127            .await;
3128        project_a
3129            .update(&mut cx_a, |project, cx| project.share(cx))
3130            .await
3131            .unwrap();
3132
3133        let _project_b = Project::remote(
3134            project_id,
3135            client_b.clone(),
3136            client_b.user_store.clone(),
3137            lang_registry.clone(),
3138            fs.clone(),
3139            &mut cx_b.to_async(),
3140        )
3141        .await
3142        .unwrap();
3143
3144        client_a
3145            .user_store
3146            .condition(&cx_a, |user_store, _| {
3147                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3148            })
3149            .await;
3150        client_b
3151            .user_store
3152            .condition(&cx_b, |user_store, _| {
3153                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3154            })
3155            .await;
3156        client_c
3157            .user_store
3158            .condition(&cx_c, |user_store, _| {
3159                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3160            })
3161            .await;
3162
3163        project_a
3164            .condition(&cx_a, |project, _| {
3165                project.collaborators().contains_key(&client_b.peer_id)
3166            })
3167            .await;
3168
3169        cx_a.update(move |_| drop(project_a));
3170        client_a
3171            .user_store
3172            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3173            .await;
3174        client_b
3175            .user_store
3176            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3177            .await;
3178        client_c
3179            .user_store
3180            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3181            .await;
3182
3183        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3184            user_store
3185                .contacts()
3186                .iter()
3187                .map(|contact| {
3188                    let worktrees = contact
3189                        .projects
3190                        .iter()
3191                        .map(|p| {
3192                            (
3193                                p.worktree_root_names[0].as_str(),
3194                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3195                            )
3196                        })
3197                        .collect();
3198                    (contact.user.github_login.as_str(), worktrees)
3199                })
3200                .collect()
3201        }
3202    }
3203
3204    struct TestServer {
3205        peer: Arc<Peer>,
3206        app_state: Arc<AppState>,
3207        server: Arc<Server>,
3208        foreground: Rc<executor::Foreground>,
3209        notifications: mpsc::Receiver<()>,
3210        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3211        forbid_connections: Arc<AtomicBool>,
3212        _test_db: TestDb,
3213    }
3214
3215    impl TestServer {
3216        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3217            let test_db = TestDb::new();
3218            let app_state = Self::build_app_state(&test_db).await;
3219            let peer = Peer::new();
3220            let notifications = mpsc::channel(128);
3221            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3222            Self {
3223                peer,
3224                app_state,
3225                server,
3226                foreground,
3227                notifications: notifications.1,
3228                connection_killers: Default::default(),
3229                forbid_connections: Default::default(),
3230                _test_db: test_db,
3231            }
3232        }
3233
3234        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3235            let http = FakeHttpClient::with_404_response();
3236            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3237            let client_name = name.to_string();
3238            let mut client = Client::new(http.clone());
3239            let server = self.server.clone();
3240            let connection_killers = self.connection_killers.clone();
3241            let forbid_connections = self.forbid_connections.clone();
3242            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3243
3244            Arc::get_mut(&mut client)
3245                .unwrap()
3246                .override_authenticate(move |cx| {
3247                    cx.spawn(|_| async move {
3248                        let access_token = "the-token".to_string();
3249                        Ok(Credentials {
3250                            user_id: user_id.0 as u64,
3251                            access_token,
3252                        })
3253                    })
3254                })
3255                .override_establish_connection(move |credentials, cx| {
3256                    assert_eq!(credentials.user_id, user_id.0 as u64);
3257                    assert_eq!(credentials.access_token, "the-token");
3258
3259                    let server = server.clone();
3260                    let connection_killers = connection_killers.clone();
3261                    let forbid_connections = forbid_connections.clone();
3262                    let client_name = client_name.clone();
3263                    let connection_id_tx = connection_id_tx.clone();
3264                    cx.spawn(move |cx| async move {
3265                        if forbid_connections.load(SeqCst) {
3266                            Err(EstablishConnectionError::other(anyhow!(
3267                                "server is forbidding connections"
3268                            )))
3269                        } else {
3270                            let (client_conn, server_conn, kill_conn) =
3271                                Connection::in_memory(cx.background());
3272                            connection_killers.lock().insert(user_id, kill_conn);
3273                            cx.background()
3274                                .spawn(server.handle_connection(
3275                                    server_conn,
3276                                    client_name,
3277                                    user_id,
3278                                    Some(connection_id_tx),
3279                                ))
3280                                .detach();
3281                            Ok(client_conn)
3282                        }
3283                    })
3284                });
3285
3286            client
3287                .authenticate_and_connect(&cx.to_async())
3288                .await
3289                .unwrap();
3290
3291            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3292            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3293            let mut authed_user =
3294                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3295            while authed_user.next().await.unwrap().is_none() {}
3296
3297            TestClient {
3298                client,
3299                peer_id,
3300                user_store,
3301            }
3302        }
3303
3304        fn disconnect_client(&self, user_id: UserId) {
3305            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3306                let _ = kill_conn.try_send(Some(()));
3307            }
3308        }
3309
3310        fn forbid_connections(&self) {
3311            self.forbid_connections.store(true, SeqCst);
3312        }
3313
3314        fn allow_connections(&self) {
3315            self.forbid_connections.store(false, SeqCst);
3316        }
3317
3318        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3319            let mut config = Config::default();
3320            config.session_secret = "a".repeat(32);
3321            config.database_url = test_db.url.clone();
3322            let github_client = github::AppClient::test();
3323            Arc::new(AppState {
3324                db: test_db.db().clone(),
3325                handlebars: Default::default(),
3326                auth_client: auth::build_client("", ""),
3327                repo_client: github::RepoClient::test(&github_client),
3328                github_client,
3329                config,
3330            })
3331        }
3332
3333        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3334            self.server.store.read()
3335        }
3336
3337        async fn condition<F>(&mut self, mut predicate: F)
3338        where
3339            F: FnMut(&Store) -> bool,
3340        {
3341            async_std::future::timeout(Duration::from_millis(500), async {
3342                while !(predicate)(&*self.server.store.read()) {
3343                    self.foreground.start_waiting();
3344                    self.notifications.next().await;
3345                    self.foreground.finish_waiting();
3346                }
3347            })
3348            .await
3349            .expect("condition timed out");
3350        }
3351    }
3352
3353    impl Drop for TestServer {
3354        fn drop(&mut self) {
3355            self.peer.reset();
3356        }
3357    }
3358
3359    struct TestClient {
3360        client: Arc<Client>,
3361        pub peer_id: PeerId,
3362        pub user_store: ModelHandle<UserStore>,
3363    }
3364
3365    impl Deref for TestClient {
3366        type Target = Arc<Client>;
3367
3368        fn deref(&self) -> &Self::Target {
3369            &self.client
3370        }
3371    }
3372
3373    impl TestClient {
3374        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3375            UserId::from_proto(
3376                self.user_store
3377                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3378            )
3379        }
3380    }
3381
3382    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3383        channel
3384            .messages()
3385            .cursor::<()>()
3386            .map(|m| {
3387                (
3388                    m.sender.github_login.clone(),
3389                    m.body.clone(),
3390                    m.is_pending(),
3391                )
3392            })
3393            .collect()
3394    }
3395
3396    struct EmptyView;
3397
3398    impl gpui::Entity for EmptyView {
3399        type Event = ();
3400    }
3401
3402    impl gpui::View for EmptyView {
3403        fn ui_name() -> &'static str {
3404            "empty view"
3405        }
3406
3407        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3408            gpui::Element::boxed(gpui::elements::Empty)
3409        }
3410    }
3411}