rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updated)
  76            .add_handler(Server::open_buffer)
  77            .add_handler(Server::close_buffer)
  78            .add_handler(Server::update_buffer)
  79            .add_handler(Server::buffer_saved)
  80            .add_handler(Server::save_buffer)
  81            .add_handler(Server::get_channels)
  82            .add_handler(Server::get_users)
  83            .add_handler(Server::join_channel)
  84            .add_handler(Server::leave_channel)
  85            .add_handler(Server::send_channel_message)
  86            .add_handler(Server::get_channel_messages);
  87
  88        Arc::new(server)
  89    }
  90
  91    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  92    where
  93        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  94        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  95        M: EnvelopedMessage,
  96    {
  97        let prev_handler = self.handlers.insert(
  98            TypeId::of::<M>(),
  99            Box::new(move |server, envelope| {
 100                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 101                (handler)(server, *envelope).boxed()
 102            }),
 103        );
 104        if prev_handler.is_some() {
 105            panic!("registered a handler for the same message twice");
 106        }
 107        self
 108    }
 109
 110    pub fn handle_connection(
 111        self: &Arc<Self>,
 112        connection: Connection,
 113        addr: String,
 114        user_id: UserId,
 115        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 116    ) -> impl Future<Output = ()> {
 117        let mut this = self.clone();
 118        async move {
 119            let (connection_id, handle_io, mut incoming_rx) =
 120                this.peer.add_connection(connection).await;
 121
 122            if let Some(send_connection_id) = send_connection_id.as_mut() {
 123                let _ = send_connection_id.send(connection_id).await;
 124            }
 125
 126            this.state_mut().add_connection(connection_id, user_id);
 127            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 128                log::error!("error updating contacts for {:?}: {}", user_id, err);
 129            }
 130
 131            let handle_io = handle_io.fuse();
 132            futures::pin_mut!(handle_io);
 133            loop {
 134                let next_message = incoming_rx.recv().fuse();
 135                futures::pin_mut!(next_message);
 136                futures::select_biased! {
 137                    message = next_message => {
 138                        if let Some(message) = message {
 139                            let start_time = Instant::now();
 140                            log::info!("RPC message received: {}", message.payload_type_name());
 141                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 142                                if let Err(err) = (handler)(this.clone(), message).await {
 143                                    log::error!("error handling message: {:?}", err);
 144                                } else {
 145                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 146                                }
 147
 148                                if let Some(mut notifications) = this.notifications.clone() {
 149                                    let _ = notifications.send(()).await;
 150                                }
 151                            } else {
 152                                log::warn!("unhandled message: {}", message.payload_type_name());
 153                            }
 154                        } else {
 155                            log::info!("rpc connection closed {:?}", addr);
 156                            break;
 157                        }
 158                    }
 159                    handle_io = handle_io => {
 160                        if let Err(err) = handle_io {
 161                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 162                        }
 163                        break;
 164                    }
 165                }
 166            }
 167
 168            if let Err(err) = this.sign_out(connection_id).await {
 169                log::error!("error signing out connection {:?} - {:?}", addr, err);
 170            }
 171        }
 172    }
 173
 174    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 175        self.peer.disconnect(connection_id).await;
 176        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 177
 178        for (project_id, project) in removed_connection.hosted_projects {
 179            if let Some(share) = project.share {
 180                broadcast(
 181                    connection_id,
 182                    share.guests.keys().copied().collect(),
 183                    |conn_id| {
 184                        self.peer
 185                            .send(conn_id, proto::UnshareProject { project_id })
 186                    },
 187                )
 188                .await?;
 189            }
 190        }
 191
 192        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 193            broadcast(connection_id, peer_ids, |conn_id| {
 194                self.peer.send(
 195                    conn_id,
 196                    proto::RemoveProjectCollaborator {
 197                        project_id,
 198                        peer_id: connection_id.0,
 199                    },
 200                )
 201            })
 202            .await?;
 203        }
 204
 205        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 206            .await?;
 207
 208        Ok(())
 209    }
 210
 211    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 212        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 213        Ok(())
 214    }
 215
 216    async fn register_project(
 217        mut self: Arc<Server>,
 218        request: TypedEnvelope<proto::RegisterProject>,
 219    ) -> tide::Result<()> {
 220        let project_id = {
 221            let mut state = self.state_mut();
 222            let user_id = state.user_id_for_connection(request.sender_id)?;
 223            state.register_project(request.sender_id, user_id)
 224        };
 225        self.peer
 226            .respond(
 227                request.receipt(),
 228                proto::RegisterProjectResponse { project_id },
 229            )
 230            .await?;
 231        Ok(())
 232    }
 233
 234    async fn unregister_project(
 235        mut self: Arc<Server>,
 236        request: TypedEnvelope<proto::UnregisterProject>,
 237    ) -> tide::Result<()> {
 238        let project = self
 239            .state_mut()
 240            .unregister_project(request.payload.project_id, request.sender_id)
 241            .ok_or_else(|| anyhow!("no such project"))?;
 242        self.update_contacts_for_users(project.authorized_user_ids().iter())
 243            .await?;
 244        Ok(())
 245    }
 246
 247    async fn share_project(
 248        mut self: Arc<Server>,
 249        request: TypedEnvelope<proto::ShareProject>,
 250    ) -> tide::Result<()> {
 251        self.state_mut()
 252            .share_project(request.payload.project_id, request.sender_id);
 253        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 254        Ok(())
 255    }
 256
 257    async fn unshare_project(
 258        mut self: Arc<Server>,
 259        request: TypedEnvelope<proto::UnshareProject>,
 260    ) -> tide::Result<()> {
 261        let project_id = request.payload.project_id;
 262        let project = self
 263            .state_mut()
 264            .unshare_project(project_id, request.sender_id)?;
 265
 266        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 267            self.peer
 268                .send(conn_id, proto::UnshareProject { project_id })
 269        })
 270        .await?;
 271        self.update_contacts_for_users(&project.authorized_user_ids)
 272            .await?;
 273
 274        Ok(())
 275    }
 276
 277    async fn join_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::JoinProject>,
 280    ) -> tide::Result<()> {
 281        let project_id = request.payload.project_id;
 282
 283        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 284        let response_data = self
 285            .state_mut()
 286            .join_project(request.sender_id, user_id, project_id)
 287            .and_then(|joined| {
 288                let share = joined.project.share()?;
 289                let peer_count = share.guests.len();
 290                let mut collaborators = Vec::with_capacity(peer_count);
 291                collaborators.push(proto::Collaborator {
 292                    peer_id: joined.project.host_connection_id.0,
 293                    replica_id: 0,
 294                    user_id: joined.project.host_user_id.to_proto(),
 295                });
 296                let worktrees = joined
 297                    .project
 298                    .worktrees
 299                    .iter()
 300                    .filter_map(|(id, worktree)| {
 301                        worktree.share.as_ref().map(|share| proto::Worktree {
 302                            id: *id,
 303                            root_name: worktree.root_name.clone(),
 304                            entries: share.entries.values().cloned().collect(),
 305                            diagnostic_summaries: share
 306                                .diagnostic_summaries
 307                                .values()
 308                                .cloned()
 309                                .collect(),
 310                        })
 311                    })
 312                    .collect();
 313                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 314                    if *peer_conn_id != request.sender_id {
 315                        collaborators.push(proto::Collaborator {
 316                            peer_id: peer_conn_id.0,
 317                            replica_id: *peer_replica_id as u32,
 318                            user_id: peer_user_id.to_proto(),
 319                        });
 320                    }
 321                }
 322                let response = proto::JoinProjectResponse {
 323                    worktrees,
 324                    replica_id: joined.replica_id as u32,
 325                    collaborators,
 326                };
 327                let connection_ids = joined.project.connection_ids();
 328                let contact_user_ids = joined.project.authorized_user_ids();
 329                Ok((response, connection_ids, contact_user_ids))
 330            });
 331
 332        match response_data {
 333            Ok((response, connection_ids, contact_user_ids)) => {
 334                broadcast(request.sender_id, connection_ids, |conn_id| {
 335                    self.peer.send(
 336                        conn_id,
 337                        proto::AddProjectCollaborator {
 338                            project_id: project_id,
 339                            collaborator: Some(proto::Collaborator {
 340                                peer_id: request.sender_id.0,
 341                                replica_id: response.replica_id,
 342                                user_id: user_id.to_proto(),
 343                            }),
 344                        },
 345                    )
 346                })
 347                .await?;
 348                self.peer.respond(request.receipt(), response).await?;
 349                self.update_contacts_for_users(&contact_user_ids).await?;
 350            }
 351            Err(error) => {
 352                self.peer
 353                    .respond_with_error(
 354                        request.receipt(),
 355                        proto::Error {
 356                            message: error.to_string(),
 357                        },
 358                    )
 359                    .await?;
 360            }
 361        }
 362
 363        Ok(())
 364    }
 365
 366    async fn leave_project(
 367        mut self: Arc<Server>,
 368        request: TypedEnvelope<proto::LeaveProject>,
 369    ) -> tide::Result<()> {
 370        let sender_id = request.sender_id;
 371        let project_id = request.payload.project_id;
 372        let worktree = self.state_mut().leave_project(sender_id, project_id);
 373        if let Some(worktree) = worktree {
 374            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 375                self.peer.send(
 376                    conn_id,
 377                    proto::RemoveProjectCollaborator {
 378                        project_id,
 379                        peer_id: sender_id.0,
 380                    },
 381                )
 382            })
 383            .await?;
 384            self.update_contacts_for_users(&worktree.authorized_user_ids)
 385                .await?;
 386        }
 387        Ok(())
 388    }
 389
 390    async fn register_worktree(
 391        mut self: Arc<Server>,
 392        request: TypedEnvelope<proto::RegisterWorktree>,
 393    ) -> tide::Result<()> {
 394        let receipt = request.receipt();
 395        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 396
 397        let mut contact_user_ids = HashSet::default();
 398        contact_user_ids.insert(host_user_id);
 399        for github_login in request.payload.authorized_logins {
 400            match self.app_state.db.create_user(&github_login, false).await {
 401                Ok(contact_user_id) => {
 402                    contact_user_ids.insert(contact_user_id);
 403                }
 404                Err(err) => {
 405                    let message = err.to_string();
 406                    self.peer
 407                        .respond_with_error(receipt, proto::Error { message })
 408                        .await?;
 409                    return Ok(());
 410                }
 411            }
 412        }
 413
 414        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 415        let ok = self.state_mut().register_worktree(
 416            request.payload.project_id,
 417            request.payload.worktree_id,
 418            Worktree {
 419                authorized_user_ids: contact_user_ids.clone(),
 420                root_name: request.payload.root_name,
 421                share: None,
 422            },
 423        );
 424
 425        if ok {
 426            self.peer.respond(receipt, proto::Ack {}).await?;
 427            self.update_contacts_for_users(&contact_user_ids).await?;
 428        } else {
 429            self.peer
 430                .respond_with_error(
 431                    receipt,
 432                    proto::Error {
 433                        message: NO_SUCH_PROJECT.to_string(),
 434                    },
 435                )
 436                .await?;
 437        }
 438
 439        Ok(())
 440    }
 441
 442    async fn unregister_worktree(
 443        mut self: Arc<Server>,
 444        request: TypedEnvelope<proto::UnregisterWorktree>,
 445    ) -> tide::Result<()> {
 446        let project_id = request.payload.project_id;
 447        let worktree_id = request.payload.worktree_id;
 448        let (worktree, guest_connection_ids) =
 449            self.state_mut()
 450                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 451
 452        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 453            self.peer.send(
 454                conn_id,
 455                proto::UnregisterWorktree {
 456                    project_id,
 457                    worktree_id,
 458                },
 459            )
 460        })
 461        .await?;
 462        self.update_contacts_for_users(&worktree.authorized_user_ids)
 463            .await?;
 464        Ok(())
 465    }
 466
 467    async fn share_worktree(
 468        mut self: Arc<Server>,
 469        mut request: TypedEnvelope<proto::ShareWorktree>,
 470    ) -> tide::Result<()> {
 471        let worktree = request
 472            .payload
 473            .worktree
 474            .as_mut()
 475            .ok_or_else(|| anyhow!("missing worktree"))?;
 476        let entries = mem::take(&mut worktree.entries)
 477            .into_iter()
 478            .map(|entry| (entry.id, entry))
 479            .collect();
 480
 481        let diagnostic_summaries = mem::take(&mut worktree.diagnostic_summaries)
 482            .into_iter()
 483            .map(|summary| (PathBuf::from(summary.path.clone()), summary))
 484            .collect();
 485
 486        let contact_user_ids = self.state_mut().share_worktree(
 487            request.payload.project_id,
 488            worktree.id,
 489            request.sender_id,
 490            entries,
 491            diagnostic_summaries,
 492        );
 493        if let Some(contact_user_ids) = contact_user_ids {
 494            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 495            self.update_contacts_for_users(&contact_user_ids).await?;
 496        } else {
 497            self.peer
 498                .respond_with_error(
 499                    request.receipt(),
 500                    proto::Error {
 501                        message: "no such worktree".to_string(),
 502                    },
 503                )
 504                .await?;
 505        }
 506        Ok(())
 507    }
 508
 509    async fn update_worktree(
 510        mut self: Arc<Server>,
 511        request: TypedEnvelope<proto::UpdateWorktree>,
 512    ) -> tide::Result<()> {
 513        let connection_ids = self
 514            .state_mut()
 515            .update_worktree(
 516                request.sender_id,
 517                request.payload.project_id,
 518                request.payload.worktree_id,
 519                &request.payload.removed_entries,
 520                &request.payload.updated_entries,
 521            )
 522            .ok_or_else(|| anyhow!("no such worktree"))?;
 523
 524        broadcast(request.sender_id, connection_ids, |connection_id| {
 525            self.peer
 526                .forward_send(request.sender_id, connection_id, request.payload.clone())
 527        })
 528        .await?;
 529
 530        Ok(())
 531    }
 532
 533    async fn update_diagnostic_summary(
 534        mut self: Arc<Server>,
 535        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = request
 538            .payload
 539            .summary
 540            .clone()
 541            .and_then(|summary| {
 542                self.state_mut().update_diagnostic_summary(
 543                    request.payload.project_id,
 544                    request.payload.worktree_id,
 545                    request.sender_id,
 546                    summary,
 547                )
 548            })
 549            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 550
 551        broadcast(request.sender_id, receiver_ids, |connection_id| {
 552            self.peer
 553                .forward_send(request.sender_id, connection_id, request.payload.clone())
 554        })
 555        .await?;
 556        Ok(())
 557    }
 558
 559    async fn disk_based_diagnostics_updated(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 562    ) -> tide::Result<()> {
 563        let receiver_ids = self
 564            .state()
 565            .project_connection_ids(request.payload.project_id, request.sender_id)
 566            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 567        broadcast(request.sender_id, receiver_ids, |connection_id| {
 568            self.peer
 569                .forward_send(request.sender_id, connection_id, request.payload.clone())
 570        })
 571        .await?;
 572        Ok(())
 573    }
 574
 575    async fn open_buffer(
 576        self: Arc<Server>,
 577        request: TypedEnvelope<proto::OpenBuffer>,
 578    ) -> tide::Result<()> {
 579        let receipt = request.receipt();
 580        let host_connection_id = self
 581            .state()
 582            .read_project(request.payload.project_id, request.sender_id)
 583            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 584            .host_connection_id;
 585        let response = self
 586            .peer
 587            .forward_request(request.sender_id, host_connection_id, request.payload)
 588            .await?;
 589        self.peer.respond(receipt, response).await?;
 590        Ok(())
 591    }
 592
 593    async fn close_buffer(
 594        self: Arc<Server>,
 595        request: TypedEnvelope<proto::CloseBuffer>,
 596    ) -> tide::Result<()> {
 597        let host_connection_id = self
 598            .state()
 599            .read_project(request.payload.project_id, request.sender_id)
 600            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 601            .host_connection_id;
 602        self.peer
 603            .forward_send(request.sender_id, host_connection_id, request.payload)
 604            .await?;
 605        Ok(())
 606    }
 607
 608    async fn save_buffer(
 609        self: Arc<Server>,
 610        request: TypedEnvelope<proto::SaveBuffer>,
 611    ) -> tide::Result<()> {
 612        let host;
 613        let guests;
 614        {
 615            let state = self.state();
 616            let project = state
 617                .read_project(request.payload.project_id, request.sender_id)
 618                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 619            host = project.host_connection_id;
 620            guests = project.guest_connection_ids()
 621        }
 622
 623        let sender = request.sender_id;
 624        let receipt = request.receipt();
 625        let response = self
 626            .peer
 627            .forward_request(sender, host, request.payload.clone())
 628            .await?;
 629
 630        broadcast(host, guests, |conn_id| {
 631            let response = response.clone();
 632            let peer = &self.peer;
 633            async move {
 634                if conn_id == sender {
 635                    peer.respond(receipt, response).await
 636                } else {
 637                    peer.forward_send(host, conn_id, response).await
 638                }
 639            }
 640        })
 641        .await?;
 642
 643        Ok(())
 644    }
 645
 646    async fn update_buffer(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::UpdateBuffer>,
 649    ) -> tide::Result<()> {
 650        let receiver_ids = self
 651            .state()
 652            .project_connection_ids(request.payload.project_id, request.sender_id)
 653            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 654        broadcast(request.sender_id, receiver_ids, |connection_id| {
 655            self.peer
 656                .forward_send(request.sender_id, connection_id, request.payload.clone())
 657        })
 658        .await?;
 659        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 660        Ok(())
 661    }
 662
 663    async fn buffer_saved(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::BufferSaved>,
 666    ) -> tide::Result<()> {
 667        let receiver_ids = self
 668            .state()
 669            .project_connection_ids(request.payload.project_id, request.sender_id)
 670            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 671        broadcast(request.sender_id, receiver_ids, |connection_id| {
 672            self.peer
 673                .forward_send(request.sender_id, connection_id, request.payload.clone())
 674        })
 675        .await?;
 676        Ok(())
 677    }
 678
 679    async fn get_channels(
 680        self: Arc<Server>,
 681        request: TypedEnvelope<proto::GetChannels>,
 682    ) -> tide::Result<()> {
 683        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 684        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 685        self.peer
 686            .respond(
 687                request.receipt(),
 688                proto::GetChannelsResponse {
 689                    channels: channels
 690                        .into_iter()
 691                        .map(|chan| proto::Channel {
 692                            id: chan.id.to_proto(),
 693                            name: chan.name,
 694                        })
 695                        .collect(),
 696                },
 697            )
 698            .await?;
 699        Ok(())
 700    }
 701
 702    async fn get_users(
 703        self: Arc<Server>,
 704        request: TypedEnvelope<proto::GetUsers>,
 705    ) -> tide::Result<()> {
 706        let receipt = request.receipt();
 707        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 708        let users = self
 709            .app_state
 710            .db
 711            .get_users_by_ids(user_ids)
 712            .await?
 713            .into_iter()
 714            .map(|user| proto::User {
 715                id: user.id.to_proto(),
 716                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 717                github_login: user.github_login,
 718            })
 719            .collect();
 720        self.peer
 721            .respond(receipt, proto::GetUsersResponse { users })
 722            .await?;
 723        Ok(())
 724    }
 725
 726    async fn update_contacts_for_users<'a>(
 727        self: &Arc<Server>,
 728        user_ids: impl IntoIterator<Item = &'a UserId>,
 729    ) -> tide::Result<()> {
 730        let mut send_futures = Vec::new();
 731
 732        {
 733            let state = self.state();
 734            for user_id in user_ids {
 735                let contacts = state.contacts_for_user(*user_id);
 736                for connection_id in state.connection_ids_for_user(*user_id) {
 737                    send_futures.push(self.peer.send(
 738                        connection_id,
 739                        proto::UpdateContacts {
 740                            contacts: contacts.clone(),
 741                        },
 742                    ));
 743                }
 744            }
 745        }
 746        futures::future::try_join_all(send_futures).await?;
 747
 748        Ok(())
 749    }
 750
 751    async fn join_channel(
 752        mut self: Arc<Self>,
 753        request: TypedEnvelope<proto::JoinChannel>,
 754    ) -> tide::Result<()> {
 755        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 756        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 757        if !self
 758            .app_state
 759            .db
 760            .can_user_access_channel(user_id, channel_id)
 761            .await?
 762        {
 763            Err(anyhow!("access denied"))?;
 764        }
 765
 766        self.state_mut().join_channel(request.sender_id, channel_id);
 767        let messages = self
 768            .app_state
 769            .db
 770            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 771            .await?
 772            .into_iter()
 773            .map(|msg| proto::ChannelMessage {
 774                id: msg.id.to_proto(),
 775                body: msg.body,
 776                timestamp: msg.sent_at.unix_timestamp() as u64,
 777                sender_id: msg.sender_id.to_proto(),
 778                nonce: Some(msg.nonce.as_u128().into()),
 779            })
 780            .collect::<Vec<_>>();
 781        self.peer
 782            .respond(
 783                request.receipt(),
 784                proto::JoinChannelResponse {
 785                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 786                    messages,
 787                },
 788            )
 789            .await?;
 790        Ok(())
 791    }
 792
 793    async fn leave_channel(
 794        mut self: Arc<Self>,
 795        request: TypedEnvelope<proto::LeaveChannel>,
 796    ) -> tide::Result<()> {
 797        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 798        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 799        if !self
 800            .app_state
 801            .db
 802            .can_user_access_channel(user_id, channel_id)
 803            .await?
 804        {
 805            Err(anyhow!("access denied"))?;
 806        }
 807
 808        self.state_mut()
 809            .leave_channel(request.sender_id, channel_id);
 810
 811        Ok(())
 812    }
 813
 814    async fn send_channel_message(
 815        self: Arc<Self>,
 816        request: TypedEnvelope<proto::SendChannelMessage>,
 817    ) -> tide::Result<()> {
 818        let receipt = request.receipt();
 819        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 820        let user_id;
 821        let connection_ids;
 822        {
 823            let state = self.state();
 824            user_id = state.user_id_for_connection(request.sender_id)?;
 825            if let Some(ids) = state.channel_connection_ids(channel_id) {
 826                connection_ids = ids;
 827            } else {
 828                return Ok(());
 829            }
 830        }
 831
 832        // Validate the message body.
 833        let body = request.payload.body.trim().to_string();
 834        if body.len() > MAX_MESSAGE_LEN {
 835            self.peer
 836                .respond_with_error(
 837                    receipt,
 838                    proto::Error {
 839                        message: "message is too long".to_string(),
 840                    },
 841                )
 842                .await?;
 843            return Ok(());
 844        }
 845        if body.is_empty() {
 846            self.peer
 847                .respond_with_error(
 848                    receipt,
 849                    proto::Error {
 850                        message: "message can't be blank".to_string(),
 851                    },
 852                )
 853                .await?;
 854            return Ok(());
 855        }
 856
 857        let timestamp = OffsetDateTime::now_utc();
 858        let nonce = if let Some(nonce) = request.payload.nonce {
 859            nonce
 860        } else {
 861            self.peer
 862                .respond_with_error(
 863                    receipt,
 864                    proto::Error {
 865                        message: "nonce can't be blank".to_string(),
 866                    },
 867                )
 868                .await?;
 869            return Ok(());
 870        };
 871
 872        let message_id = self
 873            .app_state
 874            .db
 875            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 876            .await?
 877            .to_proto();
 878        let message = proto::ChannelMessage {
 879            sender_id: user_id.to_proto(),
 880            id: message_id,
 881            body,
 882            timestamp: timestamp.unix_timestamp() as u64,
 883            nonce: Some(nonce),
 884        };
 885        broadcast(request.sender_id, connection_ids, |conn_id| {
 886            self.peer.send(
 887                conn_id,
 888                proto::ChannelMessageSent {
 889                    channel_id: channel_id.to_proto(),
 890                    message: Some(message.clone()),
 891                },
 892            )
 893        })
 894        .await?;
 895        self.peer
 896            .respond(
 897                receipt,
 898                proto::SendChannelMessageResponse {
 899                    message: Some(message),
 900                },
 901            )
 902            .await?;
 903        Ok(())
 904    }
 905
 906    async fn get_channel_messages(
 907        self: Arc<Self>,
 908        request: TypedEnvelope<proto::GetChannelMessages>,
 909    ) -> tide::Result<()> {
 910        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 911        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 912        if !self
 913            .app_state
 914            .db
 915            .can_user_access_channel(user_id, channel_id)
 916            .await?
 917        {
 918            Err(anyhow!("access denied"))?;
 919        }
 920
 921        let messages = self
 922            .app_state
 923            .db
 924            .get_channel_messages(
 925                channel_id,
 926                MESSAGE_COUNT_PER_PAGE,
 927                Some(MessageId::from_proto(request.payload.before_message_id)),
 928            )
 929            .await?
 930            .into_iter()
 931            .map(|msg| proto::ChannelMessage {
 932                id: msg.id.to_proto(),
 933                body: msg.body,
 934                timestamp: msg.sent_at.unix_timestamp() as u64,
 935                sender_id: msg.sender_id.to_proto(),
 936                nonce: Some(msg.nonce.as_u128().into()),
 937            })
 938            .collect::<Vec<_>>();
 939        self.peer
 940            .respond(
 941                request.receipt(),
 942                proto::GetChannelMessagesResponse {
 943                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 944                    messages,
 945                },
 946            )
 947            .await?;
 948        Ok(())
 949    }
 950
 951    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 952        self.store.read()
 953    }
 954
 955    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 956        self.store.write()
 957    }
 958}
 959
 960pub async fn broadcast<F, T>(
 961    sender_id: ConnectionId,
 962    receiver_ids: Vec<ConnectionId>,
 963    mut f: F,
 964) -> anyhow::Result<()>
 965where
 966    F: FnMut(ConnectionId) -> T,
 967    T: Future<Output = anyhow::Result<()>>,
 968{
 969    let futures = receiver_ids
 970        .into_iter()
 971        .filter(|id| *id != sender_id)
 972        .map(|id| f(id));
 973    futures::future::try_join_all(futures).await?;
 974    Ok(())
 975}
 976
 977pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 978    let server = Server::new(app.state().clone(), rpc.clone(), None);
 979    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 980        let server = server.clone();
 981        async move {
 982            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 983
 984            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 985            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 986            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 987            let client_protocol_version: Option<u32> = request
 988                .header("X-Zed-Protocol-Version")
 989                .and_then(|v| v.as_str().parse().ok());
 990
 991            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 992                return Ok(Response::new(StatusCode::UpgradeRequired));
 993            }
 994
 995            let header = match request.header("Sec-Websocket-Key") {
 996                Some(h) => h.as_str(),
 997                None => return Err(anyhow!("expected sec-websocket-key"))?,
 998            };
 999
1000            let user_id = process_auth_header(&request).await?;
1001
1002            let mut response = Response::new(StatusCode::SwitchingProtocols);
1003            response.insert_header(UPGRADE, "websocket");
1004            response.insert_header(CONNECTION, "Upgrade");
1005            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1006            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1007            response.insert_header("Sec-Websocket-Version", "13");
1008
1009            let http_res: &mut tide::http::Response = response.as_mut();
1010            let upgrade_receiver = http_res.recv_upgrade().await;
1011            let addr = request.remote().unwrap_or("unknown").to_string();
1012            task::spawn(async move {
1013                if let Some(stream) = upgrade_receiver.await {
1014                    server
1015                        .handle_connection(
1016                            Connection::new(
1017                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1018                            ),
1019                            addr,
1020                            user_id,
1021                            None,
1022                        )
1023                        .await;
1024                }
1025            });
1026
1027            Ok(response)
1028        }
1029    });
1030}
1031
1032fn header_contains_ignore_case<T>(
1033    request: &tide::Request<T>,
1034    header_name: HeaderName,
1035    value: &str,
1036) -> bool {
1037    request
1038        .header(header_name)
1039        .map(|h| {
1040            h.as_str()
1041                .split(',')
1042                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1043        })
1044        .unwrap_or(false)
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049    use super::*;
1050    use crate::{
1051        auth,
1052        db::{tests::TestDb, UserId},
1053        github, AppState, Config,
1054    };
1055    use ::rpc::Peer;
1056    use async_std::task;
1057    use gpui::{executor, ModelHandle, TestAppContext};
1058    use parking_lot::Mutex;
1059    use postage::{mpsc, watch};
1060    use rpc::PeerId;
1061    use serde_json::json;
1062    use sqlx::types::time::OffsetDateTime;
1063    use std::{
1064        ops::Deref,
1065        path::Path,
1066        rc::Rc,
1067        sync::{
1068            atomic::{AtomicBool, Ordering::SeqCst},
1069            Arc,
1070        },
1071        time::Duration,
1072    };
1073    use zed::{
1074        client::{
1075            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1076            EstablishConnectionError, UserStore,
1077        },
1078        editor::{Editor, EditorSettings, Input, MultiBuffer},
1079        fs::{FakeFs, Fs as _},
1080        language::{
1081            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1082            LanguageRegistry, LanguageServerConfig, Point,
1083        },
1084        lsp,
1085        project::{DiagnosticSummary, Project, ProjectPath},
1086    };
1087
1088    #[gpui::test]
1089    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1090        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1091        let lang_registry = Arc::new(LanguageRegistry::new());
1092        let fs = Arc::new(FakeFs::new());
1093        cx_a.foreground().forbid_parking();
1094
1095        // Connect to a server as 2 clients.
1096        let mut server = TestServer::start(cx_a.foreground()).await;
1097        let client_a = server.create_client(&mut cx_a, "user_a").await;
1098        let client_b = server.create_client(&mut cx_b, "user_b").await;
1099
1100        // Share a project as client A
1101        fs.insert_tree(
1102            "/a",
1103            json!({
1104                ".zed.toml": r#"collaborators = ["user_b"]"#,
1105                "a.txt": "a-contents",
1106                "b.txt": "b-contents",
1107            }),
1108        )
1109        .await;
1110        let project_a = cx_a.update(|cx| {
1111            Project::local(
1112                client_a.clone(),
1113                client_a.user_store.clone(),
1114                lang_registry.clone(),
1115                fs.clone(),
1116                cx,
1117            )
1118        });
1119        let worktree_a = project_a
1120            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1121            .await
1122            .unwrap();
1123        worktree_a
1124            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1125            .await;
1126        let project_id = project_a
1127            .update(&mut cx_a, |project, _| project.next_remote_id())
1128            .await;
1129        project_a
1130            .update(&mut cx_a, |project, cx| project.share(cx))
1131            .await
1132            .unwrap();
1133
1134        // Join that project as client B
1135        let project_b = Project::remote(
1136            project_id,
1137            client_b.clone(),
1138            client_b.user_store.clone(),
1139            lang_registry.clone(),
1140            fs.clone(),
1141            &mut cx_b.to_async(),
1142        )
1143        .await
1144        .unwrap();
1145        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1146
1147        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1148            assert_eq!(
1149                project
1150                    .collaborators()
1151                    .get(&client_a.peer_id)
1152                    .unwrap()
1153                    .user
1154                    .github_login,
1155                "user_a"
1156            );
1157            project.replica_id()
1158        });
1159        project_a
1160            .condition(&cx_a, |tree, _| {
1161                tree.collaborators()
1162                    .get(&client_b.peer_id)
1163                    .map_or(false, |collaborator| {
1164                        collaborator.replica_id == replica_id_b
1165                            && collaborator.user.github_login == "user_b"
1166                    })
1167            })
1168            .await;
1169
1170        // Open the same file as client B and client A.
1171        let buffer_b = worktree_b
1172            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1173            .await
1174            .unwrap();
1175        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1176        buffer_b.read_with(&cx_b, |buf, cx| {
1177            assert_eq!(buf.read(cx).text(), "b-contents")
1178        });
1179        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1180        let buffer_a = worktree_a
1181            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1182            .await
1183            .unwrap();
1184
1185        let editor_b = cx_b.add_view(window_b, |cx| {
1186            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1187        });
1188        // TODO
1189        // // Create a selection set as client B and see that selection set as client A.
1190        // buffer_a
1191        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1192        //     .await;
1193
1194        // Edit the buffer as client B and see that edit as client A.
1195        editor_b.update(&mut cx_b, |editor, cx| {
1196            editor.handle_input(&Input("ok, ".into()), cx)
1197        });
1198        buffer_a
1199            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1200            .await;
1201
1202        // TODO
1203        // // Remove the selection set as client B, see those selections disappear as client A.
1204        cx_b.update(move |_| drop(editor_b));
1205        // buffer_a
1206        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1207        //     .await;
1208
1209        // Close the buffer as client A, see that the buffer is closed.
1210        cx_a.update(move |_| drop(buffer_a));
1211        worktree_a
1212            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1213            .await;
1214
1215        // Dropping the client B's project removes client B from client A's collaborators.
1216        cx_b.update(move |_| drop(project_b));
1217        project_a
1218            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1219            .await;
1220    }
1221
1222    #[gpui::test]
1223    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1224        let lang_registry = Arc::new(LanguageRegistry::new());
1225        let fs = Arc::new(FakeFs::new());
1226        cx_a.foreground().forbid_parking();
1227
1228        // Connect to a server as 2 clients.
1229        let mut server = TestServer::start(cx_a.foreground()).await;
1230        let client_a = server.create_client(&mut cx_a, "user_a").await;
1231        let client_b = server.create_client(&mut cx_b, "user_b").await;
1232
1233        // Share a project as client A
1234        fs.insert_tree(
1235            "/a",
1236            json!({
1237                ".zed.toml": r#"collaborators = ["user_b"]"#,
1238                "a.txt": "a-contents",
1239                "b.txt": "b-contents",
1240            }),
1241        )
1242        .await;
1243        let project_a = cx_a.update(|cx| {
1244            Project::local(
1245                client_a.clone(),
1246                client_a.user_store.clone(),
1247                lang_registry.clone(),
1248                fs.clone(),
1249                cx,
1250            )
1251        });
1252        let worktree_a = project_a
1253            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1254            .await
1255            .unwrap();
1256        worktree_a
1257            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1258            .await;
1259        let project_id = project_a
1260            .update(&mut cx_a, |project, _| project.next_remote_id())
1261            .await;
1262        project_a
1263            .update(&mut cx_a, |project, cx| project.share(cx))
1264            .await
1265            .unwrap();
1266
1267        // Join that project as client B
1268        let project_b = Project::remote(
1269            project_id,
1270            client_b.clone(),
1271            client_b.user_store.clone(),
1272            lang_registry.clone(),
1273            fs.clone(),
1274            &mut cx_b.to_async(),
1275        )
1276        .await
1277        .unwrap();
1278
1279        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1280        worktree_b
1281            .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1282            .await
1283            .unwrap();
1284
1285        project_a
1286            .update(&mut cx_a, |project, cx| project.unshare(cx))
1287            .await
1288            .unwrap();
1289        project_b
1290            .condition(&mut cx_b, |project, _| project.is_read_only())
1291            .await;
1292    }
1293
1294    #[gpui::test]
1295    async fn test_propagate_saves_and_fs_changes(
1296        mut cx_a: TestAppContext,
1297        mut cx_b: TestAppContext,
1298        mut cx_c: TestAppContext,
1299    ) {
1300        let lang_registry = Arc::new(LanguageRegistry::new());
1301        let fs = Arc::new(FakeFs::new());
1302        cx_a.foreground().forbid_parking();
1303
1304        // Connect to a server as 3 clients.
1305        let mut server = TestServer::start(cx_a.foreground()).await;
1306        let client_a = server.create_client(&mut cx_a, "user_a").await;
1307        let client_b = server.create_client(&mut cx_b, "user_b").await;
1308        let client_c = server.create_client(&mut cx_c, "user_c").await;
1309
1310        // Share a worktree as client A.
1311        fs.insert_tree(
1312            "/a",
1313            json!({
1314                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1315                "file1": "",
1316                "file2": ""
1317            }),
1318        )
1319        .await;
1320        let project_a = cx_a.update(|cx| {
1321            Project::local(
1322                client_a.clone(),
1323                client_a.user_store.clone(),
1324                lang_registry.clone(),
1325                fs.clone(),
1326                cx,
1327            )
1328        });
1329        let worktree_a = project_a
1330            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1331            .await
1332            .unwrap();
1333        worktree_a
1334            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1335            .await;
1336        let project_id = project_a
1337            .update(&mut cx_a, |project, _| project.next_remote_id())
1338            .await;
1339        project_a
1340            .update(&mut cx_a, |project, cx| project.share(cx))
1341            .await
1342            .unwrap();
1343
1344        // Join that worktree as clients B and C.
1345        let project_b = Project::remote(
1346            project_id,
1347            client_b.clone(),
1348            client_b.user_store.clone(),
1349            lang_registry.clone(),
1350            fs.clone(),
1351            &mut cx_b.to_async(),
1352        )
1353        .await
1354        .unwrap();
1355        let project_c = Project::remote(
1356            project_id,
1357            client_c.clone(),
1358            client_c.user_store.clone(),
1359            lang_registry.clone(),
1360            fs.clone(),
1361            &mut cx_c.to_async(),
1362        )
1363        .await
1364        .unwrap();
1365
1366        // Open and edit a buffer as both guests B and C.
1367        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1368        let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1369        let buffer_b = worktree_b
1370            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1371            .await
1372            .unwrap();
1373        let buffer_c = worktree_c
1374            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1375            .await
1376            .unwrap();
1377        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1378        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1379
1380        // Open and edit that buffer as the host.
1381        let buffer_a = worktree_a
1382            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1383            .await
1384            .unwrap();
1385
1386        buffer_a
1387            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1388            .await;
1389        buffer_a.update(&mut cx_a, |buf, cx| {
1390            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1391        });
1392
1393        // Wait for edits to propagate
1394        buffer_a
1395            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1396            .await;
1397        buffer_b
1398            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1399            .await;
1400        buffer_c
1401            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1402            .await;
1403
1404        // Edit the buffer as the host and concurrently save as guest B.
1405        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1406        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1407        save_b.await.unwrap();
1408        assert_eq!(
1409            fs.load("/a/file1".as_ref()).await.unwrap(),
1410            "hi-a, i-am-c, i-am-b, i-am-a"
1411        );
1412        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1413        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1414        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1415
1416        // Make changes on host's file system, see those changes on the guests.
1417        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1418            .await
1419            .unwrap();
1420        fs.insert_file(Path::new("/a/file4"), "4".into())
1421            .await
1422            .unwrap();
1423
1424        worktree_b
1425            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1426            .await;
1427        worktree_c
1428            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1429            .await;
1430        worktree_b.read_with(&cx_b, |tree, _| {
1431            assert_eq!(
1432                tree.paths()
1433                    .map(|p| p.to_string_lossy())
1434                    .collect::<Vec<_>>(),
1435                &[".zed.toml", "file1", "file3", "file4"]
1436            )
1437        });
1438        worktree_c.read_with(&cx_c, |tree, _| {
1439            assert_eq!(
1440                tree.paths()
1441                    .map(|p| p.to_string_lossy())
1442                    .collect::<Vec<_>>(),
1443                &[".zed.toml", "file1", "file3", "file4"]
1444            )
1445        });
1446    }
1447
1448    #[gpui::test]
1449    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1450        cx_a.foreground().forbid_parking();
1451        let lang_registry = Arc::new(LanguageRegistry::new());
1452        let fs = Arc::new(FakeFs::new());
1453
1454        // Connect to a server as 2 clients.
1455        let mut server = TestServer::start(cx_a.foreground()).await;
1456        let client_a = server.create_client(&mut cx_a, "user_a").await;
1457        let client_b = server.create_client(&mut cx_b, "user_b").await;
1458
1459        // Share a project as client A
1460        fs.insert_tree(
1461            "/dir",
1462            json!({
1463                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1464                "a.txt": "a-contents",
1465            }),
1466        )
1467        .await;
1468
1469        let project_a = cx_a.update(|cx| {
1470            Project::local(
1471                client_a.clone(),
1472                client_a.user_store.clone(),
1473                lang_registry.clone(),
1474                fs.clone(),
1475                cx,
1476            )
1477        });
1478        let worktree_a = project_a
1479            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1480            .await
1481            .unwrap();
1482        worktree_a
1483            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1484            .await;
1485        let project_id = project_a
1486            .update(&mut cx_a, |project, _| project.next_remote_id())
1487            .await;
1488        project_a
1489            .update(&mut cx_a, |project, cx| project.share(cx))
1490            .await
1491            .unwrap();
1492
1493        // Join that project as client B
1494        let project_b = Project::remote(
1495            project_id,
1496            client_b.clone(),
1497            client_b.user_store.clone(),
1498            lang_registry.clone(),
1499            fs.clone(),
1500            &mut cx_b.to_async(),
1501        )
1502        .await
1503        .unwrap();
1504        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1505
1506        // Open a buffer as client B
1507        let buffer_b = worktree_b
1508            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1509            .await
1510            .unwrap();
1511        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1512
1513        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1514        buffer_b.read_with(&cx_b, |buf, _| {
1515            assert!(buf.is_dirty());
1516            assert!(!buf.has_conflict());
1517        });
1518
1519        buffer_b
1520            .update(&mut cx_b, |buf, cx| buf.save(cx))
1521            .unwrap()
1522            .await
1523            .unwrap();
1524        worktree_b
1525            .condition(&cx_b, |_, cx| {
1526                buffer_b.read(cx).file().unwrap().mtime() != mtime
1527            })
1528            .await;
1529        buffer_b.read_with(&cx_b, |buf, _| {
1530            assert!(!buf.is_dirty());
1531            assert!(!buf.has_conflict());
1532        });
1533
1534        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1535        buffer_b.read_with(&cx_b, |buf, _| {
1536            assert!(buf.is_dirty());
1537            assert!(!buf.has_conflict());
1538        });
1539    }
1540
1541    #[gpui::test]
1542    async fn test_editing_while_guest_opens_buffer(
1543        mut cx_a: TestAppContext,
1544        mut cx_b: TestAppContext,
1545    ) {
1546        cx_a.foreground().forbid_parking();
1547        let lang_registry = Arc::new(LanguageRegistry::new());
1548        let fs = Arc::new(FakeFs::new());
1549
1550        // Connect to a server as 2 clients.
1551        let mut server = TestServer::start(cx_a.foreground()).await;
1552        let client_a = server.create_client(&mut cx_a, "user_a").await;
1553        let client_b = server.create_client(&mut cx_b, "user_b").await;
1554
1555        // Share a project as client A
1556        fs.insert_tree(
1557            "/dir",
1558            json!({
1559                ".zed.toml": r#"collaborators = ["user_b"]"#,
1560                "a.txt": "a-contents",
1561            }),
1562        )
1563        .await;
1564        let project_a = cx_a.update(|cx| {
1565            Project::local(
1566                client_a.clone(),
1567                client_a.user_store.clone(),
1568                lang_registry.clone(),
1569                fs.clone(),
1570                cx,
1571            )
1572        });
1573        let worktree_a = project_a
1574            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1575            .await
1576            .unwrap();
1577        worktree_a
1578            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1579            .await;
1580        let project_id = project_a
1581            .update(&mut cx_a, |project, _| project.next_remote_id())
1582            .await;
1583        project_a
1584            .update(&mut cx_a, |project, cx| project.share(cx))
1585            .await
1586            .unwrap();
1587
1588        // Join that project as client B
1589        let project_b = Project::remote(
1590            project_id,
1591            client_b.clone(),
1592            client_b.user_store.clone(),
1593            lang_registry.clone(),
1594            fs.clone(),
1595            &mut cx_b.to_async(),
1596        )
1597        .await
1598        .unwrap();
1599        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1600
1601        // Open a buffer as client A
1602        let buffer_a = worktree_a
1603            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1604            .await
1605            .unwrap();
1606
1607        // Start opening the same buffer as client B
1608        let buffer_b = cx_b
1609            .background()
1610            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1611        task::yield_now().await;
1612
1613        // Edit the buffer as client A while client B is still opening it.
1614        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1615
1616        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1617        let buffer_b = buffer_b.await.unwrap();
1618        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1619    }
1620
1621    #[gpui::test]
1622    async fn test_leaving_worktree_while_opening_buffer(
1623        mut cx_a: TestAppContext,
1624        mut cx_b: TestAppContext,
1625    ) {
1626        cx_a.foreground().forbid_parking();
1627        let lang_registry = Arc::new(LanguageRegistry::new());
1628        let fs = Arc::new(FakeFs::new());
1629
1630        // Connect to a server as 2 clients.
1631        let mut server = TestServer::start(cx_a.foreground()).await;
1632        let client_a = server.create_client(&mut cx_a, "user_a").await;
1633        let client_b = server.create_client(&mut cx_b, "user_b").await;
1634
1635        // Share a project as client A
1636        fs.insert_tree(
1637            "/dir",
1638            json!({
1639                ".zed.toml": r#"collaborators = ["user_b"]"#,
1640                "a.txt": "a-contents",
1641            }),
1642        )
1643        .await;
1644        let project_a = cx_a.update(|cx| {
1645            Project::local(
1646                client_a.clone(),
1647                client_a.user_store.clone(),
1648                lang_registry.clone(),
1649                fs.clone(),
1650                cx,
1651            )
1652        });
1653        let worktree_a = project_a
1654            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1655            .await
1656            .unwrap();
1657        worktree_a
1658            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1659            .await;
1660        let project_id = project_a
1661            .update(&mut cx_a, |project, _| project.next_remote_id())
1662            .await;
1663        project_a
1664            .update(&mut cx_a, |project, cx| project.share(cx))
1665            .await
1666            .unwrap();
1667
1668        // Join that project as client B
1669        let project_b = Project::remote(
1670            project_id,
1671            client_b.clone(),
1672            client_b.user_store.clone(),
1673            lang_registry.clone(),
1674            fs.clone(),
1675            &mut cx_b.to_async(),
1676        )
1677        .await
1678        .unwrap();
1679        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1680
1681        // See that a guest has joined as client A.
1682        project_a
1683            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1684            .await;
1685
1686        // Begin opening a buffer as client B, but leave the project before the open completes.
1687        let buffer_b = cx_b
1688            .background()
1689            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1690        cx_b.update(|_| drop(project_b));
1691        drop(buffer_b);
1692
1693        // See that the guest has left.
1694        project_a
1695            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1696            .await;
1697    }
1698
1699    #[gpui::test]
1700    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1701        cx_a.foreground().forbid_parking();
1702        let lang_registry = Arc::new(LanguageRegistry::new());
1703        let fs = Arc::new(FakeFs::new());
1704
1705        // Connect to a server as 2 clients.
1706        let mut server = TestServer::start(cx_a.foreground()).await;
1707        let client_a = server.create_client(&mut cx_a, "user_a").await;
1708        let client_b = server.create_client(&mut cx_b, "user_b").await;
1709
1710        // Share a project as client A
1711        fs.insert_tree(
1712            "/a",
1713            json!({
1714                ".zed.toml": r#"collaborators = ["user_b"]"#,
1715                "a.txt": "a-contents",
1716                "b.txt": "b-contents",
1717            }),
1718        )
1719        .await;
1720        let project_a = cx_a.update(|cx| {
1721            Project::local(
1722                client_a.clone(),
1723                client_a.user_store.clone(),
1724                lang_registry.clone(),
1725                fs.clone(),
1726                cx,
1727            )
1728        });
1729        let worktree_a = project_a
1730            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1731            .await
1732            .unwrap();
1733        worktree_a
1734            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1735            .await;
1736        let project_id = project_a
1737            .update(&mut cx_a, |project, _| project.next_remote_id())
1738            .await;
1739        project_a
1740            .update(&mut cx_a, |project, cx| project.share(cx))
1741            .await
1742            .unwrap();
1743
1744        // Join that project as client B
1745        let _project_b = Project::remote(
1746            project_id,
1747            client_b.clone(),
1748            client_b.user_store.clone(),
1749            lang_registry.clone(),
1750            fs.clone(),
1751            &mut cx_b.to_async(),
1752        )
1753        .await
1754        .unwrap();
1755
1756        // See that a guest has joined as client A.
1757        project_a
1758            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1759            .await;
1760
1761        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1762        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1763        project_a
1764            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1765            .await;
1766    }
1767
1768    #[gpui::test]
1769    async fn test_collaborating_with_diagnostics(
1770        mut cx_a: TestAppContext,
1771        mut cx_b: TestAppContext,
1772    ) {
1773        cx_a.foreground().forbid_parking();
1774        let mut lang_registry = Arc::new(LanguageRegistry::new());
1775        let fs = Arc::new(FakeFs::new());
1776
1777        // Set up a fake language server.
1778        let (language_server_config, mut fake_language_server) =
1779            LanguageServerConfig::fake(cx_a.background()).await;
1780        Arc::get_mut(&mut lang_registry)
1781            .unwrap()
1782            .add(Arc::new(Language::new(
1783                LanguageConfig {
1784                    name: "Rust".to_string(),
1785                    path_suffixes: vec!["rs".to_string()],
1786                    language_server: Some(language_server_config),
1787                    ..Default::default()
1788                },
1789                Some(tree_sitter_rust::language()),
1790            )));
1791
1792        // Connect to a server as 2 clients.
1793        let mut server = TestServer::start(cx_a.foreground()).await;
1794        let client_a = server.create_client(&mut cx_a, "user_a").await;
1795        let client_b = server.create_client(&mut cx_b, "user_b").await;
1796
1797        // Share a project as client A
1798        fs.insert_tree(
1799            "/a",
1800            json!({
1801                ".zed.toml": r#"collaborators = ["user_b"]"#,
1802                "a.rs": "let one = two",
1803                "other.rs": "",
1804            }),
1805        )
1806        .await;
1807        let project_a = cx_a.update(|cx| {
1808            Project::local(
1809                client_a.clone(),
1810                client_a.user_store.clone(),
1811                lang_registry.clone(),
1812                fs.clone(),
1813                cx,
1814            )
1815        });
1816        let worktree_a = project_a
1817            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1818            .await
1819            .unwrap();
1820        worktree_a
1821            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1822            .await;
1823        let project_id = project_a
1824            .update(&mut cx_a, |project, _| project.next_remote_id())
1825            .await;
1826        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1827        project_a
1828            .update(&mut cx_a, |project, cx| project.share(cx))
1829            .await
1830            .unwrap();
1831
1832        // Cause the language server to start.
1833        let _ = cx_a
1834            .background()
1835            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1836                worktree.open_buffer("other.rs", cx)
1837            }))
1838            .await
1839            .unwrap();
1840
1841        // Simulate a language server reporting errors for a file.
1842        fake_language_server
1843            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1844                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1845                version: None,
1846                diagnostics: vec![lsp::Diagnostic {
1847                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1848                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1849                    message: "message 1".to_string(),
1850                    ..Default::default()
1851                }],
1852            })
1853            .await;
1854
1855        // Wait for server to see the diagnostics update.
1856        server
1857            .condition(|store| {
1858                let worktree = store
1859                    .project(project_id)
1860                    .unwrap()
1861                    .worktrees
1862                    .get(&worktree_id.to_proto())
1863                    .unwrap();
1864
1865                !worktree
1866                    .share
1867                    .as_ref()
1868                    .unwrap()
1869                    .diagnostic_summaries
1870                    .is_empty()
1871            })
1872            .await;
1873
1874        // Join the worktree as client B.
1875        let project_b = Project::remote(
1876            project_id,
1877            client_b.clone(),
1878            client_b.user_store.clone(),
1879            lang_registry.clone(),
1880            fs.clone(),
1881            &mut cx_b.to_async(),
1882        )
1883        .await
1884        .unwrap();
1885
1886        project_b.read_with(&cx_b, |project, cx| {
1887            assert_eq!(
1888                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1889                &[(
1890                    ProjectPath {
1891                        worktree_id,
1892                        path: Arc::from(Path::new("a.rs")),
1893                    },
1894                    DiagnosticSummary {
1895                        error_count: 1,
1896                        warning_count: 0,
1897                        ..Default::default()
1898                    },
1899                )]
1900            )
1901        });
1902
1903        // Simulate a language server reporting more errors for a file.
1904        fake_language_server
1905            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1906                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1907                version: None,
1908                diagnostics: vec![
1909                    lsp::Diagnostic {
1910                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1911                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1912                        message: "message 1".to_string(),
1913                        ..Default::default()
1914                    },
1915                    lsp::Diagnostic {
1916                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1917                        range: lsp::Range::new(
1918                            lsp::Position::new(0, 10),
1919                            lsp::Position::new(0, 13),
1920                        ),
1921                        message: "message 2".to_string(),
1922                        ..Default::default()
1923                    },
1924                ],
1925            })
1926            .await;
1927
1928        // Client b gets the updated summaries
1929        project_b
1930            .condition(&cx_b, |project, cx| {
1931                project.diagnostic_summaries(cx).collect::<Vec<_>>()
1932                    == &[(
1933                        ProjectPath {
1934                            worktree_id,
1935                            path: Arc::from(Path::new("a.rs")),
1936                        },
1937                        DiagnosticSummary {
1938                            error_count: 1,
1939                            warning_count: 1,
1940                            ..Default::default()
1941                        },
1942                    )]
1943            })
1944            .await;
1945
1946        // Open the file with the errors on client B. They should be present.
1947        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1948        let buffer_b = cx_b
1949            .background()
1950            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1951            .await
1952            .unwrap();
1953
1954        buffer_b.read_with(&cx_b, |buffer, _| {
1955            assert_eq!(
1956                buffer
1957                    .snapshot()
1958                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1959                    .map(|entry| entry)
1960                    .collect::<Vec<_>>(),
1961                &[
1962                    DiagnosticEntry {
1963                        range: Point::new(0, 4)..Point::new(0, 7),
1964                        diagnostic: Diagnostic {
1965                            group_id: 0,
1966                            message: "message 1".to_string(),
1967                            severity: lsp::DiagnosticSeverity::ERROR,
1968                            is_primary: true,
1969                            ..Default::default()
1970                        }
1971                    },
1972                    DiagnosticEntry {
1973                        range: Point::new(0, 10)..Point::new(0, 13),
1974                        diagnostic: Diagnostic {
1975                            group_id: 1,
1976                            severity: lsp::DiagnosticSeverity::WARNING,
1977                            message: "message 2".to_string(),
1978                            is_primary: true,
1979                            ..Default::default()
1980                        }
1981                    }
1982                ]
1983            );
1984        });
1985    }
1986
1987    #[gpui::test]
1988    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1989        cx_a.foreground().forbid_parking();
1990
1991        // Connect to a server as 2 clients.
1992        let mut server = TestServer::start(cx_a.foreground()).await;
1993        let client_a = server.create_client(&mut cx_a, "user_a").await;
1994        let client_b = server.create_client(&mut cx_b, "user_b").await;
1995
1996        // Create an org that includes these 2 users.
1997        let db = &server.app_state.db;
1998        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1999        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2000            .await
2001            .unwrap();
2002        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2003            .await
2004            .unwrap();
2005
2006        // Create a channel that includes all the users.
2007        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2008        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2009            .await
2010            .unwrap();
2011        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2012            .await
2013            .unwrap();
2014        db.create_channel_message(
2015            channel_id,
2016            client_b.current_user_id(&cx_b),
2017            "hello A, it's B.",
2018            OffsetDateTime::now_utc(),
2019            1,
2020        )
2021        .await
2022        .unwrap();
2023
2024        let channels_a = cx_a
2025            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2026        channels_a
2027            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2028            .await;
2029        channels_a.read_with(&cx_a, |list, _| {
2030            assert_eq!(
2031                list.available_channels().unwrap(),
2032                &[ChannelDetails {
2033                    id: channel_id.to_proto(),
2034                    name: "test-channel".to_string()
2035                }]
2036            )
2037        });
2038        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2039            this.get_channel(channel_id.to_proto(), cx).unwrap()
2040        });
2041        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2042        channel_a
2043            .condition(&cx_a, |channel, _| {
2044                channel_messages(channel)
2045                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2046            })
2047            .await;
2048
2049        let channels_b = cx_b
2050            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2051        channels_b
2052            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2053            .await;
2054        channels_b.read_with(&cx_b, |list, _| {
2055            assert_eq!(
2056                list.available_channels().unwrap(),
2057                &[ChannelDetails {
2058                    id: channel_id.to_proto(),
2059                    name: "test-channel".to_string()
2060                }]
2061            )
2062        });
2063
2064        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2065            this.get_channel(channel_id.to_proto(), cx).unwrap()
2066        });
2067        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2068        channel_b
2069            .condition(&cx_b, |channel, _| {
2070                channel_messages(channel)
2071                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2072            })
2073            .await;
2074
2075        channel_a
2076            .update(&mut cx_a, |channel, cx| {
2077                channel
2078                    .send_message("oh, hi B.".to_string(), cx)
2079                    .unwrap()
2080                    .detach();
2081                let task = channel.send_message("sup".to_string(), cx).unwrap();
2082                assert_eq!(
2083                    channel_messages(channel),
2084                    &[
2085                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2086                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2087                        ("user_a".to_string(), "sup".to_string(), true)
2088                    ]
2089                );
2090                task
2091            })
2092            .await
2093            .unwrap();
2094
2095        channel_b
2096            .condition(&cx_b, |channel, _| {
2097                channel_messages(channel)
2098                    == [
2099                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2100                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2101                        ("user_a".to_string(), "sup".to_string(), false),
2102                    ]
2103            })
2104            .await;
2105
2106        assert_eq!(
2107            server
2108                .state()
2109                .await
2110                .channel(channel_id)
2111                .unwrap()
2112                .connection_ids
2113                .len(),
2114            2
2115        );
2116        cx_b.update(|_| drop(channel_b));
2117        server
2118            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2119            .await;
2120
2121        cx_a.update(|_| drop(channel_a));
2122        server
2123            .condition(|state| state.channel(channel_id).is_none())
2124            .await;
2125    }
2126
2127    #[gpui::test]
2128    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2129        cx_a.foreground().forbid_parking();
2130
2131        let mut server = TestServer::start(cx_a.foreground()).await;
2132        let client_a = server.create_client(&mut cx_a, "user_a").await;
2133
2134        let db = &server.app_state.db;
2135        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2136        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2137        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2138            .await
2139            .unwrap();
2140        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2141            .await
2142            .unwrap();
2143
2144        let channels_a = cx_a
2145            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2146        channels_a
2147            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2148            .await;
2149        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2150            this.get_channel(channel_id.to_proto(), cx).unwrap()
2151        });
2152
2153        // Messages aren't allowed to be too long.
2154        channel_a
2155            .update(&mut cx_a, |channel, cx| {
2156                let long_body = "this is long.\n".repeat(1024);
2157                channel.send_message(long_body, cx).unwrap()
2158            })
2159            .await
2160            .unwrap_err();
2161
2162        // Messages aren't allowed to be blank.
2163        channel_a.update(&mut cx_a, |channel, cx| {
2164            channel.send_message(String::new(), cx).unwrap_err()
2165        });
2166
2167        // Leading and trailing whitespace are trimmed.
2168        channel_a
2169            .update(&mut cx_a, |channel, cx| {
2170                channel
2171                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2172                    .unwrap()
2173            })
2174            .await
2175            .unwrap();
2176        assert_eq!(
2177            db.get_channel_messages(channel_id, 10, None)
2178                .await
2179                .unwrap()
2180                .iter()
2181                .map(|m| &m.body)
2182                .collect::<Vec<_>>(),
2183            &["surrounded by whitespace"]
2184        );
2185    }
2186
2187    #[gpui::test]
2188    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2189        cx_a.foreground().forbid_parking();
2190
2191        // Connect to a server as 2 clients.
2192        let mut server = TestServer::start(cx_a.foreground()).await;
2193        let client_a = server.create_client(&mut cx_a, "user_a").await;
2194        let client_b = server.create_client(&mut cx_b, "user_b").await;
2195        let mut status_b = client_b.status();
2196
2197        // Create an org that includes these 2 users.
2198        let db = &server.app_state.db;
2199        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2200        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2201            .await
2202            .unwrap();
2203        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2204            .await
2205            .unwrap();
2206
2207        // Create a channel that includes all the users.
2208        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2209        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2210            .await
2211            .unwrap();
2212        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2213            .await
2214            .unwrap();
2215        db.create_channel_message(
2216            channel_id,
2217            client_b.current_user_id(&cx_b),
2218            "hello A, it's B.",
2219            OffsetDateTime::now_utc(),
2220            2,
2221        )
2222        .await
2223        .unwrap();
2224
2225        let channels_a = cx_a
2226            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2227        channels_a
2228            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2229            .await;
2230
2231        channels_a.read_with(&cx_a, |list, _| {
2232            assert_eq!(
2233                list.available_channels().unwrap(),
2234                &[ChannelDetails {
2235                    id: channel_id.to_proto(),
2236                    name: "test-channel".to_string()
2237                }]
2238            )
2239        });
2240        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2241            this.get_channel(channel_id.to_proto(), cx).unwrap()
2242        });
2243        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2244        channel_a
2245            .condition(&cx_a, |channel, _| {
2246                channel_messages(channel)
2247                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2248            })
2249            .await;
2250
2251        let channels_b = cx_b
2252            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2253        channels_b
2254            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2255            .await;
2256        channels_b.read_with(&cx_b, |list, _| {
2257            assert_eq!(
2258                list.available_channels().unwrap(),
2259                &[ChannelDetails {
2260                    id: channel_id.to_proto(),
2261                    name: "test-channel".to_string()
2262                }]
2263            )
2264        });
2265
2266        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2267            this.get_channel(channel_id.to_proto(), cx).unwrap()
2268        });
2269        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2270        channel_b
2271            .condition(&cx_b, |channel, _| {
2272                channel_messages(channel)
2273                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2274            })
2275            .await;
2276
2277        // Disconnect client B, ensuring we can still access its cached channel data.
2278        server.forbid_connections();
2279        server.disconnect_client(client_b.current_user_id(&cx_b));
2280        while !matches!(
2281            status_b.recv().await,
2282            Some(client::Status::ReconnectionError { .. })
2283        ) {}
2284
2285        channels_b.read_with(&cx_b, |channels, _| {
2286            assert_eq!(
2287                channels.available_channels().unwrap(),
2288                [ChannelDetails {
2289                    id: channel_id.to_proto(),
2290                    name: "test-channel".to_string()
2291                }]
2292            )
2293        });
2294        channel_b.read_with(&cx_b, |channel, _| {
2295            assert_eq!(
2296                channel_messages(channel),
2297                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2298            )
2299        });
2300
2301        // Send a message from client B while it is disconnected.
2302        channel_b
2303            .update(&mut cx_b, |channel, cx| {
2304                let task = channel
2305                    .send_message("can you see this?".to_string(), cx)
2306                    .unwrap();
2307                assert_eq!(
2308                    channel_messages(channel),
2309                    &[
2310                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2311                        ("user_b".to_string(), "can you see this?".to_string(), true)
2312                    ]
2313                );
2314                task
2315            })
2316            .await
2317            .unwrap_err();
2318
2319        // Send a message from client A while B is disconnected.
2320        channel_a
2321            .update(&mut cx_a, |channel, cx| {
2322                channel
2323                    .send_message("oh, hi B.".to_string(), cx)
2324                    .unwrap()
2325                    .detach();
2326                let task = channel.send_message("sup".to_string(), cx).unwrap();
2327                assert_eq!(
2328                    channel_messages(channel),
2329                    &[
2330                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2331                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2332                        ("user_a".to_string(), "sup".to_string(), true)
2333                    ]
2334                );
2335                task
2336            })
2337            .await
2338            .unwrap();
2339
2340        // Give client B a chance to reconnect.
2341        server.allow_connections();
2342        cx_b.foreground().advance_clock(Duration::from_secs(10));
2343
2344        // Verify that B sees the new messages upon reconnection, as well as the message client B
2345        // sent while offline.
2346        channel_b
2347            .condition(&cx_b, |channel, _| {
2348                channel_messages(channel)
2349                    == [
2350                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2351                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2352                        ("user_a".to_string(), "sup".to_string(), false),
2353                        ("user_b".to_string(), "can you see this?".to_string(), false),
2354                    ]
2355            })
2356            .await;
2357
2358        // Ensure client A and B can communicate normally after reconnection.
2359        channel_a
2360            .update(&mut cx_a, |channel, cx| {
2361                channel.send_message("you online?".to_string(), cx).unwrap()
2362            })
2363            .await
2364            .unwrap();
2365        channel_b
2366            .condition(&cx_b, |channel, _| {
2367                channel_messages(channel)
2368                    == [
2369                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2370                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2371                        ("user_a".to_string(), "sup".to_string(), false),
2372                        ("user_b".to_string(), "can you see this?".to_string(), false),
2373                        ("user_a".to_string(), "you online?".to_string(), false),
2374                    ]
2375            })
2376            .await;
2377
2378        channel_b
2379            .update(&mut cx_b, |channel, cx| {
2380                channel.send_message("yep".to_string(), cx).unwrap()
2381            })
2382            .await
2383            .unwrap();
2384        channel_a
2385            .condition(&cx_a, |channel, _| {
2386                channel_messages(channel)
2387                    == [
2388                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2389                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2390                        ("user_a".to_string(), "sup".to_string(), false),
2391                        ("user_b".to_string(), "can you see this?".to_string(), false),
2392                        ("user_a".to_string(), "you online?".to_string(), false),
2393                        ("user_b".to_string(), "yep".to_string(), false),
2394                    ]
2395            })
2396            .await;
2397    }
2398
2399    #[gpui::test]
2400    async fn test_contacts(
2401        mut cx_a: TestAppContext,
2402        mut cx_b: TestAppContext,
2403        mut cx_c: TestAppContext,
2404    ) {
2405        cx_a.foreground().forbid_parking();
2406        let lang_registry = Arc::new(LanguageRegistry::new());
2407        let fs = Arc::new(FakeFs::new());
2408
2409        // Connect to a server as 3 clients.
2410        let mut server = TestServer::start(cx_a.foreground()).await;
2411        let client_a = server.create_client(&mut cx_a, "user_a").await;
2412        let client_b = server.create_client(&mut cx_b, "user_b").await;
2413        let client_c = server.create_client(&mut cx_c, "user_c").await;
2414
2415        // Share a worktree as client A.
2416        fs.insert_tree(
2417            "/a",
2418            json!({
2419                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2420            }),
2421        )
2422        .await;
2423
2424        let project_a = cx_a.update(|cx| {
2425            Project::local(
2426                client_a.clone(),
2427                client_a.user_store.clone(),
2428                lang_registry.clone(),
2429                fs.clone(),
2430                cx,
2431            )
2432        });
2433        let worktree_a = project_a
2434            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2435            .await
2436            .unwrap();
2437        worktree_a
2438            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2439            .await;
2440
2441        client_a
2442            .user_store
2443            .condition(&cx_a, |user_store, _| {
2444                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2445            })
2446            .await;
2447        client_b
2448            .user_store
2449            .condition(&cx_b, |user_store, _| {
2450                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2451            })
2452            .await;
2453        client_c
2454            .user_store
2455            .condition(&cx_c, |user_store, _| {
2456                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2457            })
2458            .await;
2459
2460        let project_id = project_a
2461            .update(&mut cx_a, |project, _| project.next_remote_id())
2462            .await;
2463        project_a
2464            .update(&mut cx_a, |project, cx| project.share(cx))
2465            .await
2466            .unwrap();
2467
2468        let _project_b = Project::remote(
2469            project_id,
2470            client_b.clone(),
2471            client_b.user_store.clone(),
2472            lang_registry.clone(),
2473            fs.clone(),
2474            &mut cx_b.to_async(),
2475        )
2476        .await
2477        .unwrap();
2478
2479        client_a
2480            .user_store
2481            .condition(&cx_a, |user_store, _| {
2482                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2483            })
2484            .await;
2485        client_b
2486            .user_store
2487            .condition(&cx_b, |user_store, _| {
2488                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2489            })
2490            .await;
2491        client_c
2492            .user_store
2493            .condition(&cx_c, |user_store, _| {
2494                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2495            })
2496            .await;
2497
2498        project_a
2499            .condition(&cx_a, |project, _| {
2500                project.collaborators().contains_key(&client_b.peer_id)
2501            })
2502            .await;
2503
2504        cx_a.update(move |_| drop(project_a));
2505        client_a
2506            .user_store
2507            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2508            .await;
2509        client_b
2510            .user_store
2511            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2512            .await;
2513        client_c
2514            .user_store
2515            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2516            .await;
2517
2518        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2519            user_store
2520                .contacts()
2521                .iter()
2522                .map(|contact| {
2523                    let worktrees = contact
2524                        .projects
2525                        .iter()
2526                        .map(|p| {
2527                            (
2528                                p.worktree_root_names[0].as_str(),
2529                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2530                            )
2531                        })
2532                        .collect();
2533                    (contact.user.github_login.as_str(), worktrees)
2534                })
2535                .collect()
2536        }
2537    }
2538
2539    struct TestServer {
2540        peer: Arc<Peer>,
2541        app_state: Arc<AppState>,
2542        server: Arc<Server>,
2543        foreground: Rc<executor::Foreground>,
2544        notifications: mpsc::Receiver<()>,
2545        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2546        forbid_connections: Arc<AtomicBool>,
2547        _test_db: TestDb,
2548    }
2549
2550    impl TestServer {
2551        async fn start(foreground: Rc<executor::Foreground>) -> Self {
2552            let test_db = TestDb::new();
2553            let app_state = Self::build_app_state(&test_db).await;
2554            let peer = Peer::new();
2555            let notifications = mpsc::channel(128);
2556            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2557            Self {
2558                peer,
2559                app_state,
2560                server,
2561                foreground,
2562                notifications: notifications.1,
2563                connection_killers: Default::default(),
2564                forbid_connections: Default::default(),
2565                _test_db: test_db,
2566            }
2567        }
2568
2569        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2570            let http = FakeHttpClient::with_404_response();
2571            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2572            let client_name = name.to_string();
2573            let mut client = Client::new(http.clone());
2574            let server = self.server.clone();
2575            let connection_killers = self.connection_killers.clone();
2576            let forbid_connections = self.forbid_connections.clone();
2577            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2578
2579            Arc::get_mut(&mut client)
2580                .unwrap()
2581                .override_authenticate(move |cx| {
2582                    cx.spawn(|_| async move {
2583                        let access_token = "the-token".to_string();
2584                        Ok(Credentials {
2585                            user_id: user_id.0 as u64,
2586                            access_token,
2587                        })
2588                    })
2589                })
2590                .override_establish_connection(move |credentials, cx| {
2591                    assert_eq!(credentials.user_id, user_id.0 as u64);
2592                    assert_eq!(credentials.access_token, "the-token");
2593
2594                    let server = server.clone();
2595                    let connection_killers = connection_killers.clone();
2596                    let forbid_connections = forbid_connections.clone();
2597                    let client_name = client_name.clone();
2598                    let connection_id_tx = connection_id_tx.clone();
2599                    cx.spawn(move |cx| async move {
2600                        if forbid_connections.load(SeqCst) {
2601                            Err(EstablishConnectionError::other(anyhow!(
2602                                "server is forbidding connections"
2603                            )))
2604                        } else {
2605                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2606                            connection_killers.lock().insert(user_id, kill_conn);
2607                            cx.background()
2608                                .spawn(server.handle_connection(
2609                                    server_conn,
2610                                    client_name,
2611                                    user_id,
2612                                    Some(connection_id_tx),
2613                                ))
2614                                .detach();
2615                            Ok(client_conn)
2616                        }
2617                    })
2618                });
2619
2620            client
2621                .authenticate_and_connect(&cx.to_async())
2622                .await
2623                .unwrap();
2624
2625            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2626            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2627            let mut authed_user =
2628                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2629            while authed_user.recv().await.unwrap().is_none() {}
2630
2631            TestClient {
2632                client,
2633                peer_id,
2634                user_store,
2635            }
2636        }
2637
2638        fn disconnect_client(&self, user_id: UserId) {
2639            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2640                let _ = kill_conn.try_send(Some(()));
2641            }
2642        }
2643
2644        fn forbid_connections(&self) {
2645            self.forbid_connections.store(true, SeqCst);
2646        }
2647
2648        fn allow_connections(&self) {
2649            self.forbid_connections.store(false, SeqCst);
2650        }
2651
2652        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2653            let mut config = Config::default();
2654            config.session_secret = "a".repeat(32);
2655            config.database_url = test_db.url.clone();
2656            let github_client = github::AppClient::test();
2657            Arc::new(AppState {
2658                db: test_db.db().clone(),
2659                handlebars: Default::default(),
2660                auth_client: auth::build_client("", ""),
2661                repo_client: github::RepoClient::test(&github_client),
2662                github_client,
2663                config,
2664            })
2665        }
2666
2667        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2668            self.server.store.read()
2669        }
2670
2671        async fn condition<F>(&mut self, mut predicate: F)
2672        where
2673            F: FnMut(&Store) -> bool,
2674        {
2675            async_std::future::timeout(Duration::from_millis(500), async {
2676                while !(predicate)(&*self.server.store.read()) {
2677                    self.foreground.start_waiting();
2678                    self.notifications.recv().await;
2679                    self.foreground.finish_waiting();
2680                }
2681            })
2682            .await
2683            .expect("condition timed out");
2684        }
2685    }
2686
2687    impl Drop for TestServer {
2688        fn drop(&mut self) {
2689            task::block_on(self.peer.reset());
2690        }
2691    }
2692
2693    struct TestClient {
2694        client: Arc<Client>,
2695        pub peer_id: PeerId,
2696        pub user_store: ModelHandle<UserStore>,
2697    }
2698
2699    impl Deref for TestClient {
2700        type Target = Arc<Client>;
2701
2702        fn deref(&self) -> &Self::Target {
2703            &self.client
2704        }
2705    }
2706
2707    impl TestClient {
2708        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2709            UserId::from_proto(
2710                self.user_store
2711                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2712            )
2713        }
2714    }
2715
2716    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2717        channel
2718            .messages()
2719            .cursor::<()>()
2720            .map(|m| {
2721                (
2722                    m.sender.github_login.clone(),
2723                    m.body.clone(),
2724                    m.is_pending(),
2725                )
2726            })
2727            .collect()
2728    }
2729
2730    struct EmptyView;
2731
2732    impl gpui::Entity for EmptyView {
2733        type Event = ();
2734    }
2735
2736    impl gpui::View for EmptyView {
2737        fn ui_name() -> &'static str {
2738            "empty view"
2739        }
2740
2741        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2742            gpui::Element::boxed(gpui::elements::Empty)
2743        }
2744    }
2745}