rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updated)
  76            .add_handler(Server::open_buffer)
  77            .add_handler(Server::close_buffer)
  78            .add_handler(Server::update_buffer)
  79            .add_handler(Server::buffer_saved)
  80            .add_handler(Server::save_buffer)
  81            .add_handler(Server::get_channels)
  82            .add_handler(Server::get_users)
  83            .add_handler(Server::join_channel)
  84            .add_handler(Server::leave_channel)
  85            .add_handler(Server::send_channel_message)
  86            .add_handler(Server::get_channel_messages);
  87
  88        Arc::new(server)
  89    }
  90
  91    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  92    where
  93        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  94        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  95        M: EnvelopedMessage,
  96    {
  97        let prev_handler = self.handlers.insert(
  98            TypeId::of::<M>(),
  99            Box::new(move |server, envelope| {
 100                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 101                (handler)(server, *envelope).boxed()
 102            }),
 103        );
 104        if prev_handler.is_some() {
 105            panic!("registered a handler for the same message twice");
 106        }
 107        self
 108    }
 109
 110    pub fn handle_connection(
 111        self: &Arc<Self>,
 112        connection: Connection,
 113        addr: String,
 114        user_id: UserId,
 115        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 116    ) -> impl Future<Output = ()> {
 117        let mut this = self.clone();
 118        async move {
 119            let (connection_id, handle_io, mut incoming_rx) =
 120                this.peer.add_connection(connection).await;
 121
 122            if let Some(send_connection_id) = send_connection_id.as_mut() {
 123                let _ = send_connection_id.send(connection_id).await;
 124            }
 125
 126            this.state_mut().add_connection(connection_id, user_id);
 127            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 128                log::error!("error updating contacts for {:?}: {}", user_id, err);
 129            }
 130
 131            let handle_io = handle_io.fuse();
 132            futures::pin_mut!(handle_io);
 133            loop {
 134                let next_message = incoming_rx.recv().fuse();
 135                futures::pin_mut!(next_message);
 136                futures::select_biased! {
 137                    message = next_message => {
 138                        if let Some(message) = message {
 139                            let start_time = Instant::now();
 140                            log::info!("RPC message received: {}", message.payload_type_name());
 141                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 142                                if let Err(err) = (handler)(this.clone(), message).await {
 143                                    log::error!("error handling message: {:?}", err);
 144                                } else {
 145                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 146                                }
 147
 148                                if let Some(mut notifications) = this.notifications.clone() {
 149                                    let _ = notifications.send(()).await;
 150                                }
 151                            } else {
 152                                log::warn!("unhandled message: {}", message.payload_type_name());
 153                            }
 154                        } else {
 155                            log::info!("rpc connection closed {:?}", addr);
 156                            break;
 157                        }
 158                    }
 159                    handle_io = handle_io => {
 160                        if let Err(err) = handle_io {
 161                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 162                        }
 163                        break;
 164                    }
 165                }
 166            }
 167
 168            if let Err(err) = this.sign_out(connection_id).await {
 169                log::error!("error signing out connection {:?} - {:?}", addr, err);
 170            }
 171        }
 172    }
 173
 174    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 175        self.peer.disconnect(connection_id).await;
 176        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 177
 178        for (project_id, project) in removed_connection.hosted_projects {
 179            if let Some(share) = project.share {
 180                broadcast(
 181                    connection_id,
 182                    share.guests.keys().copied().collect(),
 183                    |conn_id| {
 184                        self.peer
 185                            .send(conn_id, proto::UnshareProject { project_id })
 186                    },
 187                )
 188                .await?;
 189            }
 190        }
 191
 192        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 193            broadcast(connection_id, peer_ids, |conn_id| {
 194                self.peer.send(
 195                    conn_id,
 196                    proto::RemoveProjectCollaborator {
 197                        project_id,
 198                        peer_id: connection_id.0,
 199                    },
 200                )
 201            })
 202            .await?;
 203        }
 204
 205        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 206            .await?;
 207
 208        Ok(())
 209    }
 210
 211    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 212        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 213        Ok(())
 214    }
 215
 216    async fn register_project(
 217        mut self: Arc<Server>,
 218        request: TypedEnvelope<proto::RegisterProject>,
 219    ) -> tide::Result<()> {
 220        let project_id = {
 221            let mut state = self.state_mut();
 222            let user_id = state.user_id_for_connection(request.sender_id)?;
 223            state.register_project(request.sender_id, user_id)
 224        };
 225        self.peer
 226            .respond(
 227                request.receipt(),
 228                proto::RegisterProjectResponse { project_id },
 229            )
 230            .await?;
 231        Ok(())
 232    }
 233
 234    async fn unregister_project(
 235        mut self: Arc<Server>,
 236        request: TypedEnvelope<proto::UnregisterProject>,
 237    ) -> tide::Result<()> {
 238        let project = self
 239            .state_mut()
 240            .unregister_project(request.payload.project_id, request.sender_id)
 241            .ok_or_else(|| anyhow!("no such project"))?;
 242        self.update_contacts_for_users(project.authorized_user_ids().iter())
 243            .await?;
 244        Ok(())
 245    }
 246
 247    async fn share_project(
 248        mut self: Arc<Server>,
 249        request: TypedEnvelope<proto::ShareProject>,
 250    ) -> tide::Result<()> {
 251        self.state_mut()
 252            .share_project(request.payload.project_id, request.sender_id);
 253        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 254        Ok(())
 255    }
 256
 257    async fn unshare_project(
 258        mut self: Arc<Server>,
 259        request: TypedEnvelope<proto::UnshareProject>,
 260    ) -> tide::Result<()> {
 261        let project_id = request.payload.project_id;
 262        let project = self
 263            .state_mut()
 264            .unshare_project(project_id, request.sender_id)?;
 265
 266        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 267            self.peer
 268                .send(conn_id, proto::UnshareProject { project_id })
 269        })
 270        .await?;
 271        self.update_contacts_for_users(&project.authorized_user_ids)
 272            .await?;
 273
 274        Ok(())
 275    }
 276
 277    async fn join_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::JoinProject>,
 280    ) -> tide::Result<()> {
 281        let project_id = request.payload.project_id;
 282
 283        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 284        let response_data = self
 285            .state_mut()
 286            .join_project(request.sender_id, user_id, project_id)
 287            .and_then(|joined| {
 288                let share = joined.project.share()?;
 289                let peer_count = share.guests.len();
 290                let mut collaborators = Vec::with_capacity(peer_count);
 291                collaborators.push(proto::Collaborator {
 292                    peer_id: joined.project.host_connection_id.0,
 293                    replica_id: 0,
 294                    user_id: joined.project.host_user_id.to_proto(),
 295                });
 296                let worktrees = joined
 297                    .project
 298                    .worktrees
 299                    .iter()
 300                    .filter_map(|(id, worktree)| {
 301                        worktree.share.as_ref().map(|share| proto::Worktree {
 302                            id: *id,
 303                            root_name: worktree.root_name.clone(),
 304                            entries: share.entries.values().cloned().collect(),
 305                            diagnostic_summaries: share
 306                                .diagnostic_summaries
 307                                .values()
 308                                .cloned()
 309                                .collect(),
 310                        })
 311                    })
 312                    .collect();
 313                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 314                    if *peer_conn_id != request.sender_id {
 315                        collaborators.push(proto::Collaborator {
 316                            peer_id: peer_conn_id.0,
 317                            replica_id: *peer_replica_id as u32,
 318                            user_id: peer_user_id.to_proto(),
 319                        });
 320                    }
 321                }
 322                let response = proto::JoinProjectResponse {
 323                    worktrees,
 324                    replica_id: joined.replica_id as u32,
 325                    collaborators,
 326                };
 327                let connection_ids = joined.project.connection_ids();
 328                let contact_user_ids = joined.project.authorized_user_ids();
 329                Ok((response, connection_ids, contact_user_ids))
 330            });
 331
 332        match response_data {
 333            Ok((response, connection_ids, contact_user_ids)) => {
 334                broadcast(request.sender_id, connection_ids, |conn_id| {
 335                    self.peer.send(
 336                        conn_id,
 337                        proto::AddProjectCollaborator {
 338                            project_id: project_id,
 339                            collaborator: Some(proto::Collaborator {
 340                                peer_id: request.sender_id.0,
 341                                replica_id: response.replica_id,
 342                                user_id: user_id.to_proto(),
 343                            }),
 344                        },
 345                    )
 346                })
 347                .await?;
 348                self.peer.respond(request.receipt(), response).await?;
 349                self.update_contacts_for_users(&contact_user_ids).await?;
 350            }
 351            Err(error) => {
 352                self.peer
 353                    .respond_with_error(
 354                        request.receipt(),
 355                        proto::Error {
 356                            message: error.to_string(),
 357                        },
 358                    )
 359                    .await?;
 360            }
 361        }
 362
 363        Ok(())
 364    }
 365
 366    async fn leave_project(
 367        mut self: Arc<Server>,
 368        request: TypedEnvelope<proto::LeaveProject>,
 369    ) -> tide::Result<()> {
 370        let sender_id = request.sender_id;
 371        let project_id = request.payload.project_id;
 372        let worktree = self.state_mut().leave_project(sender_id, project_id);
 373        if let Some(worktree) = worktree {
 374            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 375                self.peer.send(
 376                    conn_id,
 377                    proto::RemoveProjectCollaborator {
 378                        project_id,
 379                        peer_id: sender_id.0,
 380                    },
 381                )
 382            })
 383            .await?;
 384            self.update_contacts_for_users(&worktree.authorized_user_ids)
 385                .await?;
 386        }
 387        Ok(())
 388    }
 389
 390    async fn register_worktree(
 391        mut self: Arc<Server>,
 392        request: TypedEnvelope<proto::RegisterWorktree>,
 393    ) -> tide::Result<()> {
 394        let receipt = request.receipt();
 395        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 396
 397        let mut contact_user_ids = HashSet::default();
 398        contact_user_ids.insert(host_user_id);
 399        for github_login in request.payload.authorized_logins {
 400            match self.app_state.db.create_user(&github_login, false).await {
 401                Ok(contact_user_id) => {
 402                    contact_user_ids.insert(contact_user_id);
 403                }
 404                Err(err) => {
 405                    let message = err.to_string();
 406                    self.peer
 407                        .respond_with_error(receipt, proto::Error { message })
 408                        .await?;
 409                    return Ok(());
 410                }
 411            }
 412        }
 413
 414        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 415        let ok = self.state_mut().register_worktree(
 416            request.payload.project_id,
 417            request.payload.worktree_id,
 418            Worktree {
 419                authorized_user_ids: contact_user_ids.clone(),
 420                root_name: request.payload.root_name,
 421                share: None,
 422            },
 423        );
 424
 425        if ok {
 426            self.peer.respond(receipt, proto::Ack {}).await?;
 427            self.update_contacts_for_users(&contact_user_ids).await?;
 428        } else {
 429            self.peer
 430                .respond_with_error(
 431                    receipt,
 432                    proto::Error {
 433                        message: NO_SUCH_PROJECT.to_string(),
 434                    },
 435                )
 436                .await?;
 437        }
 438
 439        Ok(())
 440    }
 441
 442    async fn unregister_worktree(
 443        mut self: Arc<Server>,
 444        request: TypedEnvelope<proto::UnregisterWorktree>,
 445    ) -> tide::Result<()> {
 446        let project_id = request.payload.project_id;
 447        let worktree_id = request.payload.worktree_id;
 448        let (worktree, guest_connection_ids) =
 449            self.state_mut()
 450                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 451
 452        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 453            self.peer.send(
 454                conn_id,
 455                proto::UnregisterWorktree {
 456                    project_id,
 457                    worktree_id,
 458                },
 459            )
 460        })
 461        .await?;
 462        self.update_contacts_for_users(&worktree.authorized_user_ids)
 463            .await?;
 464        Ok(())
 465    }
 466
 467    async fn share_worktree(
 468        mut self: Arc<Server>,
 469        mut request: TypedEnvelope<proto::ShareWorktree>,
 470    ) -> tide::Result<()> {
 471        let worktree = request
 472            .payload
 473            .worktree
 474            .as_mut()
 475            .ok_or_else(|| anyhow!("missing worktree"))?;
 476        let entries = mem::take(&mut worktree.entries)
 477            .into_iter()
 478            .map(|entry| (entry.id, entry))
 479            .collect();
 480
 481        let diagnostic_summaries = mem::take(&mut worktree.diagnostic_summaries)
 482            .into_iter()
 483            .map(|summary| (PathBuf::from(summary.path.clone()), summary))
 484            .collect();
 485
 486        let contact_user_ids = self.state_mut().share_worktree(
 487            request.payload.project_id,
 488            worktree.id,
 489            request.sender_id,
 490            entries,
 491            diagnostic_summaries,
 492        );
 493        if let Some(contact_user_ids) = contact_user_ids {
 494            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 495            self.update_contacts_for_users(&contact_user_ids).await?;
 496        } else {
 497            self.peer
 498                .respond_with_error(
 499                    request.receipt(),
 500                    proto::Error {
 501                        message: "no such worktree".to_string(),
 502                    },
 503                )
 504                .await?;
 505        }
 506        Ok(())
 507    }
 508
 509    async fn update_worktree(
 510        mut self: Arc<Server>,
 511        request: TypedEnvelope<proto::UpdateWorktree>,
 512    ) -> tide::Result<()> {
 513        let connection_ids = self
 514            .state_mut()
 515            .update_worktree(
 516                request.sender_id,
 517                request.payload.project_id,
 518                request.payload.worktree_id,
 519                &request.payload.removed_entries,
 520                &request.payload.updated_entries,
 521            )
 522            .ok_or_else(|| anyhow!("no such worktree"))?;
 523
 524        broadcast(request.sender_id, connection_ids, |connection_id| {
 525            self.peer
 526                .forward_send(request.sender_id, connection_id, request.payload.clone())
 527        })
 528        .await?;
 529
 530        Ok(())
 531    }
 532
 533    async fn update_diagnostic_summary(
 534        mut self: Arc<Server>,
 535        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = request
 538            .payload
 539            .summary
 540            .clone()
 541            .and_then(|summary| {
 542                self.state_mut().update_diagnostic_summary(
 543                    request.payload.project_id,
 544                    request.payload.worktree_id,
 545                    request.sender_id,
 546                    summary,
 547                )
 548            })
 549            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 550
 551        broadcast(request.sender_id, receiver_ids, |connection_id| {
 552            self.peer
 553                .forward_send(request.sender_id, connection_id, request.payload.clone())
 554        })
 555        .await?;
 556        Ok(())
 557    }
 558
 559    async fn disk_based_diagnostics_updated(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 562    ) -> tide::Result<()> {
 563        let receiver_ids = self
 564            .state()
 565            .project_connection_ids(request.payload.project_id, request.sender_id)
 566            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 567        broadcast(request.sender_id, receiver_ids, |connection_id| {
 568            self.peer
 569                .forward_send(request.sender_id, connection_id, request.payload.clone())
 570        })
 571        .await?;
 572        Ok(())
 573    }
 574
 575    async fn open_buffer(
 576        self: Arc<Server>,
 577        request: TypedEnvelope<proto::OpenBuffer>,
 578    ) -> tide::Result<()> {
 579        let receipt = request.receipt();
 580        let host_connection_id = self
 581            .state()
 582            .read_project(request.payload.project_id, request.sender_id)
 583            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 584            .host_connection_id;
 585        let response = self
 586            .peer
 587            .forward_request(request.sender_id, host_connection_id, request.payload)
 588            .await?;
 589        self.peer.respond(receipt, response).await?;
 590        Ok(())
 591    }
 592
 593    async fn close_buffer(
 594        self: Arc<Server>,
 595        request: TypedEnvelope<proto::CloseBuffer>,
 596    ) -> tide::Result<()> {
 597        let host_connection_id = self
 598            .state()
 599            .read_project(request.payload.project_id, request.sender_id)
 600            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 601            .host_connection_id;
 602        self.peer
 603            .forward_send(request.sender_id, host_connection_id, request.payload)
 604            .await?;
 605        Ok(())
 606    }
 607
 608    async fn save_buffer(
 609        self: Arc<Server>,
 610        request: TypedEnvelope<proto::SaveBuffer>,
 611    ) -> tide::Result<()> {
 612        let host;
 613        let guests;
 614        {
 615            let state = self.state();
 616            let project = state
 617                .read_project(request.payload.project_id, request.sender_id)
 618                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 619            host = project.host_connection_id;
 620            guests = project.guest_connection_ids()
 621        }
 622
 623        let sender = request.sender_id;
 624        let receipt = request.receipt();
 625        let response = self
 626            .peer
 627            .forward_request(sender, host, request.payload.clone())
 628            .await?;
 629
 630        broadcast(host, guests, |conn_id| {
 631            let response = response.clone();
 632            let peer = &self.peer;
 633            async move {
 634                if conn_id == sender {
 635                    peer.respond(receipt, response).await
 636                } else {
 637                    peer.forward_send(host, conn_id, response).await
 638                }
 639            }
 640        })
 641        .await?;
 642
 643        Ok(())
 644    }
 645
 646    async fn update_buffer(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::UpdateBuffer>,
 649    ) -> tide::Result<()> {
 650        let receiver_ids = self
 651            .state()
 652            .project_connection_ids(request.payload.project_id, request.sender_id)
 653            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 654        broadcast(request.sender_id, receiver_ids, |connection_id| {
 655            self.peer
 656                .forward_send(request.sender_id, connection_id, request.payload.clone())
 657        })
 658        .await?;
 659        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 660        Ok(())
 661    }
 662
 663    async fn buffer_saved(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::BufferSaved>,
 666    ) -> tide::Result<()> {
 667        let receiver_ids = self
 668            .state()
 669            .project_connection_ids(request.payload.project_id, request.sender_id)
 670            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 671        broadcast(request.sender_id, receiver_ids, |connection_id| {
 672            self.peer
 673                .forward_send(request.sender_id, connection_id, request.payload.clone())
 674        })
 675        .await?;
 676        Ok(())
 677    }
 678
 679    async fn get_channels(
 680        self: Arc<Server>,
 681        request: TypedEnvelope<proto::GetChannels>,
 682    ) -> tide::Result<()> {
 683        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 684        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 685        self.peer
 686            .respond(
 687                request.receipt(),
 688                proto::GetChannelsResponse {
 689                    channels: channels
 690                        .into_iter()
 691                        .map(|chan| proto::Channel {
 692                            id: chan.id.to_proto(),
 693                            name: chan.name,
 694                        })
 695                        .collect(),
 696                },
 697            )
 698            .await?;
 699        Ok(())
 700    }
 701
 702    async fn get_users(
 703        self: Arc<Server>,
 704        request: TypedEnvelope<proto::GetUsers>,
 705    ) -> tide::Result<()> {
 706        let receipt = request.receipt();
 707        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 708        let users = self
 709            .app_state
 710            .db
 711            .get_users_by_ids(user_ids)
 712            .await?
 713            .into_iter()
 714            .map(|user| proto::User {
 715                id: user.id.to_proto(),
 716                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 717                github_login: user.github_login,
 718            })
 719            .collect();
 720        self.peer
 721            .respond(receipt, proto::GetUsersResponse { users })
 722            .await?;
 723        Ok(())
 724    }
 725
 726    async fn update_contacts_for_users<'a>(
 727        self: &Arc<Server>,
 728        user_ids: impl IntoIterator<Item = &'a UserId>,
 729    ) -> tide::Result<()> {
 730        let mut send_futures = Vec::new();
 731
 732        {
 733            let state = self.state();
 734            for user_id in user_ids {
 735                let contacts = state.contacts_for_user(*user_id);
 736                for connection_id in state.connection_ids_for_user(*user_id) {
 737                    send_futures.push(self.peer.send(
 738                        connection_id,
 739                        proto::UpdateContacts {
 740                            contacts: contacts.clone(),
 741                        },
 742                    ));
 743                }
 744            }
 745        }
 746        futures::future::try_join_all(send_futures).await?;
 747
 748        Ok(())
 749    }
 750
 751    async fn join_channel(
 752        mut self: Arc<Self>,
 753        request: TypedEnvelope<proto::JoinChannel>,
 754    ) -> tide::Result<()> {
 755        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 756        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 757        if !self
 758            .app_state
 759            .db
 760            .can_user_access_channel(user_id, channel_id)
 761            .await?
 762        {
 763            Err(anyhow!("access denied"))?;
 764        }
 765
 766        self.state_mut().join_channel(request.sender_id, channel_id);
 767        let messages = self
 768            .app_state
 769            .db
 770            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 771            .await?
 772            .into_iter()
 773            .map(|msg| proto::ChannelMessage {
 774                id: msg.id.to_proto(),
 775                body: msg.body,
 776                timestamp: msg.sent_at.unix_timestamp() as u64,
 777                sender_id: msg.sender_id.to_proto(),
 778                nonce: Some(msg.nonce.as_u128().into()),
 779            })
 780            .collect::<Vec<_>>();
 781        self.peer
 782            .respond(
 783                request.receipt(),
 784                proto::JoinChannelResponse {
 785                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 786                    messages,
 787                },
 788            )
 789            .await?;
 790        Ok(())
 791    }
 792
 793    async fn leave_channel(
 794        mut self: Arc<Self>,
 795        request: TypedEnvelope<proto::LeaveChannel>,
 796    ) -> tide::Result<()> {
 797        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 798        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 799        if !self
 800            .app_state
 801            .db
 802            .can_user_access_channel(user_id, channel_id)
 803            .await?
 804        {
 805            Err(anyhow!("access denied"))?;
 806        }
 807
 808        self.state_mut()
 809            .leave_channel(request.sender_id, channel_id);
 810
 811        Ok(())
 812    }
 813
 814    async fn send_channel_message(
 815        self: Arc<Self>,
 816        request: TypedEnvelope<proto::SendChannelMessage>,
 817    ) -> tide::Result<()> {
 818        let receipt = request.receipt();
 819        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 820        let user_id;
 821        let connection_ids;
 822        {
 823            let state = self.state();
 824            user_id = state.user_id_for_connection(request.sender_id)?;
 825            if let Some(ids) = state.channel_connection_ids(channel_id) {
 826                connection_ids = ids;
 827            } else {
 828                return Ok(());
 829            }
 830        }
 831
 832        // Validate the message body.
 833        let body = request.payload.body.trim().to_string();
 834        if body.len() > MAX_MESSAGE_LEN {
 835            self.peer
 836                .respond_with_error(
 837                    receipt,
 838                    proto::Error {
 839                        message: "message is too long".to_string(),
 840                    },
 841                )
 842                .await?;
 843            return Ok(());
 844        }
 845        if body.is_empty() {
 846            self.peer
 847                .respond_with_error(
 848                    receipt,
 849                    proto::Error {
 850                        message: "message can't be blank".to_string(),
 851                    },
 852                )
 853                .await?;
 854            return Ok(());
 855        }
 856
 857        let timestamp = OffsetDateTime::now_utc();
 858        let nonce = if let Some(nonce) = request.payload.nonce {
 859            nonce
 860        } else {
 861            self.peer
 862                .respond_with_error(
 863                    receipt,
 864                    proto::Error {
 865                        message: "nonce can't be blank".to_string(),
 866                    },
 867                )
 868                .await?;
 869            return Ok(());
 870        };
 871
 872        let message_id = self
 873            .app_state
 874            .db
 875            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 876            .await?
 877            .to_proto();
 878        let message = proto::ChannelMessage {
 879            sender_id: user_id.to_proto(),
 880            id: message_id,
 881            body,
 882            timestamp: timestamp.unix_timestamp() as u64,
 883            nonce: Some(nonce),
 884        };
 885        broadcast(request.sender_id, connection_ids, |conn_id| {
 886            self.peer.send(
 887                conn_id,
 888                proto::ChannelMessageSent {
 889                    channel_id: channel_id.to_proto(),
 890                    message: Some(message.clone()),
 891                },
 892            )
 893        })
 894        .await?;
 895        self.peer
 896            .respond(
 897                receipt,
 898                proto::SendChannelMessageResponse {
 899                    message: Some(message),
 900                },
 901            )
 902            .await?;
 903        Ok(())
 904    }
 905
 906    async fn get_channel_messages(
 907        self: Arc<Self>,
 908        request: TypedEnvelope<proto::GetChannelMessages>,
 909    ) -> tide::Result<()> {
 910        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 911        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 912        if !self
 913            .app_state
 914            .db
 915            .can_user_access_channel(user_id, channel_id)
 916            .await?
 917        {
 918            Err(anyhow!("access denied"))?;
 919        }
 920
 921        let messages = self
 922            .app_state
 923            .db
 924            .get_channel_messages(
 925                channel_id,
 926                MESSAGE_COUNT_PER_PAGE,
 927                Some(MessageId::from_proto(request.payload.before_message_id)),
 928            )
 929            .await?
 930            .into_iter()
 931            .map(|msg| proto::ChannelMessage {
 932                id: msg.id.to_proto(),
 933                body: msg.body,
 934                timestamp: msg.sent_at.unix_timestamp() as u64,
 935                sender_id: msg.sender_id.to_proto(),
 936                nonce: Some(msg.nonce.as_u128().into()),
 937            })
 938            .collect::<Vec<_>>();
 939        self.peer
 940            .respond(
 941                request.receipt(),
 942                proto::GetChannelMessagesResponse {
 943                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 944                    messages,
 945                },
 946            )
 947            .await?;
 948        Ok(())
 949    }
 950
 951    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 952        self.store.read()
 953    }
 954
 955    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 956        self.store.write()
 957    }
 958}
 959
 960pub async fn broadcast<F, T>(
 961    sender_id: ConnectionId,
 962    receiver_ids: Vec<ConnectionId>,
 963    mut f: F,
 964) -> anyhow::Result<()>
 965where
 966    F: FnMut(ConnectionId) -> T,
 967    T: Future<Output = anyhow::Result<()>>,
 968{
 969    let futures = receiver_ids
 970        .into_iter()
 971        .filter(|id| *id != sender_id)
 972        .map(|id| f(id));
 973    futures::future::try_join_all(futures).await?;
 974    Ok(())
 975}
 976
 977pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 978    let server = Server::new(app.state().clone(), rpc.clone(), None);
 979    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 980        let server = server.clone();
 981        async move {
 982            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 983
 984            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 985            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 986            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 987            let client_protocol_version: Option<u32> = request
 988                .header("X-Zed-Protocol-Version")
 989                .and_then(|v| v.as_str().parse().ok());
 990
 991            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 992                return Ok(Response::new(StatusCode::UpgradeRequired));
 993            }
 994
 995            let header = match request.header("Sec-Websocket-Key") {
 996                Some(h) => h.as_str(),
 997                None => return Err(anyhow!("expected sec-websocket-key"))?,
 998            };
 999
1000            let user_id = process_auth_header(&request).await?;
1001
1002            let mut response = Response::new(StatusCode::SwitchingProtocols);
1003            response.insert_header(UPGRADE, "websocket");
1004            response.insert_header(CONNECTION, "Upgrade");
1005            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1006            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1007            response.insert_header("Sec-Websocket-Version", "13");
1008
1009            let http_res: &mut tide::http::Response = response.as_mut();
1010            let upgrade_receiver = http_res.recv_upgrade().await;
1011            let addr = request.remote().unwrap_or("unknown").to_string();
1012            task::spawn(async move {
1013                if let Some(stream) = upgrade_receiver.await {
1014                    server
1015                        .handle_connection(
1016                            Connection::new(
1017                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1018                            ),
1019                            addr,
1020                            user_id,
1021                            None,
1022                        )
1023                        .await;
1024                }
1025            });
1026
1027            Ok(response)
1028        }
1029    });
1030}
1031
1032fn header_contains_ignore_case<T>(
1033    request: &tide::Request<T>,
1034    header_name: HeaderName,
1035    value: &str,
1036) -> bool {
1037    request
1038        .header(header_name)
1039        .map(|h| {
1040            h.as_str()
1041                .split(',')
1042                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1043        })
1044        .unwrap_or(false)
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049    use super::*;
1050    use crate::{
1051        auth,
1052        db::{tests::TestDb, UserId},
1053        github, AppState, Config,
1054    };
1055    use ::rpc::Peer;
1056    use async_std::task;
1057    use gpui::{ModelHandle, TestAppContext};
1058    use parking_lot::Mutex;
1059    use postage::{mpsc, watch};
1060    use rpc::PeerId;
1061    use serde_json::json;
1062    use sqlx::types::time::OffsetDateTime;
1063    use std::{
1064        ops::Deref,
1065        path::Path,
1066        sync::{
1067            atomic::{AtomicBool, Ordering::SeqCst},
1068            Arc,
1069        },
1070        time::Duration,
1071    };
1072    use zed::{
1073        client::{
1074            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1075            EstablishConnectionError, UserStore,
1076        },
1077        editor::{Editor, EditorSettings, Input, MultiBuffer},
1078        fs::{FakeFs, Fs as _},
1079        language::{
1080            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1081            LanguageRegistry, LanguageServerConfig, Point,
1082        },
1083        lsp,
1084        project::{DiagnosticSummary, Project, ProjectPath},
1085    };
1086
1087    #[gpui::test]
1088    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1089        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1090        let lang_registry = Arc::new(LanguageRegistry::new());
1091        let fs = Arc::new(FakeFs::new());
1092        cx_a.foreground().forbid_parking();
1093
1094        // Connect to a server as 2 clients.
1095        let mut server = TestServer::start().await;
1096        let client_a = server.create_client(&mut cx_a, "user_a").await;
1097        let client_b = server.create_client(&mut cx_b, "user_b").await;
1098
1099        // Share a project as client A
1100        fs.insert_tree(
1101            "/a",
1102            json!({
1103                ".zed.toml": r#"collaborators = ["user_b"]"#,
1104                "a.txt": "a-contents",
1105                "b.txt": "b-contents",
1106            }),
1107        )
1108        .await;
1109        let project_a = cx_a.update(|cx| {
1110            Project::local(
1111                client_a.clone(),
1112                client_a.user_store.clone(),
1113                lang_registry.clone(),
1114                fs.clone(),
1115                cx,
1116            )
1117        });
1118        let worktree_a = project_a
1119            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1120            .await
1121            .unwrap();
1122        worktree_a
1123            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1124            .await;
1125        let project_id = project_a
1126            .update(&mut cx_a, |project, _| project.next_remote_id())
1127            .await;
1128        project_a
1129            .update(&mut cx_a, |project, cx| project.share(cx))
1130            .await
1131            .unwrap();
1132
1133        // Join that project as client B
1134        let project_b = Project::remote(
1135            project_id,
1136            client_b.clone(),
1137            client_b.user_store.clone(),
1138            lang_registry.clone(),
1139            fs.clone(),
1140            &mut cx_b.to_async(),
1141        )
1142        .await
1143        .unwrap();
1144        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1145
1146        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1147            assert_eq!(
1148                project
1149                    .collaborators()
1150                    .get(&client_a.peer_id)
1151                    .unwrap()
1152                    .user
1153                    .github_login,
1154                "user_a"
1155            );
1156            project.replica_id()
1157        });
1158        project_a
1159            .condition(&cx_a, |tree, _| {
1160                tree.collaborators()
1161                    .get(&client_b.peer_id)
1162                    .map_or(false, |collaborator| {
1163                        collaborator.replica_id == replica_id_b
1164                            && collaborator.user.github_login == "user_b"
1165                    })
1166            })
1167            .await;
1168
1169        // Open the same file as client B and client A.
1170        let buffer_b = worktree_b
1171            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1172            .await
1173            .unwrap();
1174        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1175        buffer_b.read_with(&cx_b, |buf, cx| {
1176            assert_eq!(buf.read(cx).text(), "b-contents")
1177        });
1178        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1179        let buffer_a = worktree_a
1180            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1181            .await
1182            .unwrap();
1183
1184        let editor_b = cx_b.add_view(window_b, |cx| {
1185            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1186        });
1187        // TODO
1188        // // Create a selection set as client B and see that selection set as client A.
1189        // buffer_a
1190        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1191        //     .await;
1192
1193        // Edit the buffer as client B and see that edit as client A.
1194        editor_b.update(&mut cx_b, |editor, cx| {
1195            editor.handle_input(&Input("ok, ".into()), cx)
1196        });
1197        buffer_a
1198            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1199            .await;
1200
1201        // TODO
1202        // // Remove the selection set as client B, see those selections disappear as client A.
1203        cx_b.update(move |_| drop(editor_b));
1204        // buffer_a
1205        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1206        //     .await;
1207
1208        // Close the buffer as client A, see that the buffer is closed.
1209        cx_a.update(move |_| drop(buffer_a));
1210        worktree_a
1211            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1212            .await;
1213
1214        // Dropping the client B's project removes client B from client A's collaborators.
1215        cx_b.update(move |_| drop(project_b));
1216        project_a
1217            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1218            .await;
1219    }
1220
1221    #[gpui::test]
1222    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1223        let lang_registry = Arc::new(LanguageRegistry::new());
1224        let fs = Arc::new(FakeFs::new());
1225        cx_a.foreground().forbid_parking();
1226
1227        // Connect to a server as 2 clients.
1228        let mut server = TestServer::start().await;
1229        let client_a = server.create_client(&mut cx_a, "user_a").await;
1230        let client_b = server.create_client(&mut cx_b, "user_b").await;
1231
1232        // Share a project as client A
1233        fs.insert_tree(
1234            "/a",
1235            json!({
1236                ".zed.toml": r#"collaborators = ["user_b"]"#,
1237                "a.txt": "a-contents",
1238                "b.txt": "b-contents",
1239            }),
1240        )
1241        .await;
1242        let project_a = cx_a.update(|cx| {
1243            Project::local(
1244                client_a.clone(),
1245                client_a.user_store.clone(),
1246                lang_registry.clone(),
1247                fs.clone(),
1248                cx,
1249            )
1250        });
1251        let worktree_a = project_a
1252            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1253            .await
1254            .unwrap();
1255        worktree_a
1256            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1257            .await;
1258        let project_id = project_a
1259            .update(&mut cx_a, |project, _| project.next_remote_id())
1260            .await;
1261        project_a
1262            .update(&mut cx_a, |project, cx| project.share(cx))
1263            .await
1264            .unwrap();
1265
1266        // Join that project as client B
1267        let project_b = Project::remote(
1268            project_id,
1269            client_b.clone(),
1270            client_b.user_store.clone(),
1271            lang_registry.clone(),
1272            fs.clone(),
1273            &mut cx_b.to_async(),
1274        )
1275        .await
1276        .unwrap();
1277
1278        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1279        worktree_b
1280            .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1281            .await
1282            .unwrap();
1283
1284        project_a
1285            .update(&mut cx_a, |project, cx| project.unshare(cx))
1286            .await
1287            .unwrap();
1288        project_b
1289            .condition(&mut cx_b, |project, _| project.is_read_only())
1290            .await;
1291    }
1292
1293    #[gpui::test]
1294    async fn test_propagate_saves_and_fs_changes(
1295        mut cx_a: TestAppContext,
1296        mut cx_b: TestAppContext,
1297        mut cx_c: TestAppContext,
1298    ) {
1299        let lang_registry = Arc::new(LanguageRegistry::new());
1300        let fs = Arc::new(FakeFs::new());
1301        cx_a.foreground().forbid_parking();
1302
1303        // Connect to a server as 3 clients.
1304        let mut server = TestServer::start().await;
1305        let client_a = server.create_client(&mut cx_a, "user_a").await;
1306        let client_b = server.create_client(&mut cx_b, "user_b").await;
1307        let client_c = server.create_client(&mut cx_c, "user_c").await;
1308
1309        // Share a worktree as client A.
1310        fs.insert_tree(
1311            "/a",
1312            json!({
1313                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1314                "file1": "",
1315                "file2": ""
1316            }),
1317        )
1318        .await;
1319        let project_a = cx_a.update(|cx| {
1320            Project::local(
1321                client_a.clone(),
1322                client_a.user_store.clone(),
1323                lang_registry.clone(),
1324                fs.clone(),
1325                cx,
1326            )
1327        });
1328        let worktree_a = project_a
1329            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1330            .await
1331            .unwrap();
1332        worktree_a
1333            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1334            .await;
1335        let project_id = project_a
1336            .update(&mut cx_a, |project, _| project.next_remote_id())
1337            .await;
1338        project_a
1339            .update(&mut cx_a, |project, cx| project.share(cx))
1340            .await
1341            .unwrap();
1342
1343        // Join that worktree as clients B and C.
1344        let project_b = Project::remote(
1345            project_id,
1346            client_b.clone(),
1347            client_b.user_store.clone(),
1348            lang_registry.clone(),
1349            fs.clone(),
1350            &mut cx_b.to_async(),
1351        )
1352        .await
1353        .unwrap();
1354        let project_c = Project::remote(
1355            project_id,
1356            client_c.clone(),
1357            client_c.user_store.clone(),
1358            lang_registry.clone(),
1359            fs.clone(),
1360            &mut cx_c.to_async(),
1361        )
1362        .await
1363        .unwrap();
1364
1365        // Open and edit a buffer as both guests B and C.
1366        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1367        let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1368        let buffer_b = worktree_b
1369            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1370            .await
1371            .unwrap();
1372        let buffer_c = worktree_c
1373            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1374            .await
1375            .unwrap();
1376        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1377        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1378
1379        // Open and edit that buffer as the host.
1380        let buffer_a = worktree_a
1381            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1382            .await
1383            .unwrap();
1384
1385        buffer_a
1386            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1387            .await;
1388        buffer_a.update(&mut cx_a, |buf, cx| {
1389            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1390        });
1391
1392        // Wait for edits to propagate
1393        buffer_a
1394            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1395            .await;
1396        buffer_b
1397            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1398            .await;
1399        buffer_c
1400            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1401            .await;
1402
1403        // Edit the buffer as the host and concurrently save as guest B.
1404        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1405        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1406        save_b.await.unwrap();
1407        assert_eq!(
1408            fs.load("/a/file1".as_ref()).await.unwrap(),
1409            "hi-a, i-am-c, i-am-b, i-am-a"
1410        );
1411        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1412        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1413        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1414
1415        // Make changes on host's file system, see those changes on the guests.
1416        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1417            .await
1418            .unwrap();
1419        fs.insert_file(Path::new("/a/file4"), "4".into())
1420            .await
1421            .unwrap();
1422
1423        worktree_b
1424            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1425            .await;
1426        worktree_c
1427            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1428            .await;
1429        worktree_b.read_with(&cx_b, |tree, _| {
1430            assert_eq!(
1431                tree.paths()
1432                    .map(|p| p.to_string_lossy())
1433                    .collect::<Vec<_>>(),
1434                &[".zed.toml", "file1", "file3", "file4"]
1435            )
1436        });
1437        worktree_c.read_with(&cx_c, |tree, _| {
1438            assert_eq!(
1439                tree.paths()
1440                    .map(|p| p.to_string_lossy())
1441                    .collect::<Vec<_>>(),
1442                &[".zed.toml", "file1", "file3", "file4"]
1443            )
1444        });
1445    }
1446
1447    #[gpui::test]
1448    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1449        cx_a.foreground().forbid_parking();
1450        let lang_registry = Arc::new(LanguageRegistry::new());
1451        let fs = Arc::new(FakeFs::new());
1452
1453        // Connect to a server as 2 clients.
1454        let mut server = TestServer::start().await;
1455        let client_a = server.create_client(&mut cx_a, "user_a").await;
1456        let client_b = server.create_client(&mut cx_b, "user_b").await;
1457
1458        // Share a project as client A
1459        fs.insert_tree(
1460            "/dir",
1461            json!({
1462                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1463                "a.txt": "a-contents",
1464            }),
1465        )
1466        .await;
1467
1468        let project_a = cx_a.update(|cx| {
1469            Project::local(
1470                client_a.clone(),
1471                client_a.user_store.clone(),
1472                lang_registry.clone(),
1473                fs.clone(),
1474                cx,
1475            )
1476        });
1477        let worktree_a = project_a
1478            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1479            .await
1480            .unwrap();
1481        worktree_a
1482            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1483            .await;
1484        let project_id = project_a
1485            .update(&mut cx_a, |project, _| project.next_remote_id())
1486            .await;
1487        project_a
1488            .update(&mut cx_a, |project, cx| project.share(cx))
1489            .await
1490            .unwrap();
1491
1492        // Join that project as client B
1493        let project_b = Project::remote(
1494            project_id,
1495            client_b.clone(),
1496            client_b.user_store.clone(),
1497            lang_registry.clone(),
1498            fs.clone(),
1499            &mut cx_b.to_async(),
1500        )
1501        .await
1502        .unwrap();
1503        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1504
1505        // Open a buffer as client B
1506        let buffer_b = worktree_b
1507            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1508            .await
1509            .unwrap();
1510        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1511
1512        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1513        buffer_b.read_with(&cx_b, |buf, _| {
1514            assert!(buf.is_dirty());
1515            assert!(!buf.has_conflict());
1516        });
1517
1518        buffer_b
1519            .update(&mut cx_b, |buf, cx| buf.save(cx))
1520            .unwrap()
1521            .await
1522            .unwrap();
1523        worktree_b
1524            .condition(&cx_b, |_, cx| {
1525                buffer_b.read(cx).file().unwrap().mtime() != mtime
1526            })
1527            .await;
1528        buffer_b.read_with(&cx_b, |buf, _| {
1529            assert!(!buf.is_dirty());
1530            assert!(!buf.has_conflict());
1531        });
1532
1533        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1534        buffer_b.read_with(&cx_b, |buf, _| {
1535            assert!(buf.is_dirty());
1536            assert!(!buf.has_conflict());
1537        });
1538    }
1539
1540    #[gpui::test]
1541    async fn test_editing_while_guest_opens_buffer(
1542        mut cx_a: TestAppContext,
1543        mut cx_b: TestAppContext,
1544    ) {
1545        cx_a.foreground().forbid_parking();
1546        let lang_registry = Arc::new(LanguageRegistry::new());
1547        let fs = Arc::new(FakeFs::new());
1548
1549        // Connect to a server as 2 clients.
1550        let mut server = TestServer::start().await;
1551        let client_a = server.create_client(&mut cx_a, "user_a").await;
1552        let client_b = server.create_client(&mut cx_b, "user_b").await;
1553
1554        // Share a project as client A
1555        fs.insert_tree(
1556            "/dir",
1557            json!({
1558                ".zed.toml": r#"collaborators = ["user_b"]"#,
1559                "a.txt": "a-contents",
1560            }),
1561        )
1562        .await;
1563        let project_a = cx_a.update(|cx| {
1564            Project::local(
1565                client_a.clone(),
1566                client_a.user_store.clone(),
1567                lang_registry.clone(),
1568                fs.clone(),
1569                cx,
1570            )
1571        });
1572        let worktree_a = project_a
1573            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1574            .await
1575            .unwrap();
1576        worktree_a
1577            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1578            .await;
1579        let project_id = project_a
1580            .update(&mut cx_a, |project, _| project.next_remote_id())
1581            .await;
1582        project_a
1583            .update(&mut cx_a, |project, cx| project.share(cx))
1584            .await
1585            .unwrap();
1586
1587        // Join that project as client B
1588        let project_b = Project::remote(
1589            project_id,
1590            client_b.clone(),
1591            client_b.user_store.clone(),
1592            lang_registry.clone(),
1593            fs.clone(),
1594            &mut cx_b.to_async(),
1595        )
1596        .await
1597        .unwrap();
1598        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1599
1600        // Open a buffer as client A
1601        let buffer_a = worktree_a
1602            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1603            .await
1604            .unwrap();
1605
1606        // Start opening the same buffer as client B
1607        let buffer_b = cx_b
1608            .background()
1609            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1610        task::yield_now().await;
1611
1612        // Edit the buffer as client A while client B is still opening it.
1613        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1614
1615        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1616        let buffer_b = buffer_b.await.unwrap();
1617        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1618    }
1619
1620    #[gpui::test]
1621    async fn test_leaving_worktree_while_opening_buffer(
1622        mut cx_a: TestAppContext,
1623        mut cx_b: TestAppContext,
1624    ) {
1625        cx_a.foreground().forbid_parking();
1626        let lang_registry = Arc::new(LanguageRegistry::new());
1627        let fs = Arc::new(FakeFs::new());
1628
1629        // Connect to a server as 2 clients.
1630        let mut server = TestServer::start().await;
1631        let client_a = server.create_client(&mut cx_a, "user_a").await;
1632        let client_b = server.create_client(&mut cx_b, "user_b").await;
1633
1634        // Share a project as client A
1635        fs.insert_tree(
1636            "/dir",
1637            json!({
1638                ".zed.toml": r#"collaborators = ["user_b"]"#,
1639                "a.txt": "a-contents",
1640            }),
1641        )
1642        .await;
1643        let project_a = cx_a.update(|cx| {
1644            Project::local(
1645                client_a.clone(),
1646                client_a.user_store.clone(),
1647                lang_registry.clone(),
1648                fs.clone(),
1649                cx,
1650            )
1651        });
1652        let worktree_a = project_a
1653            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1654            .await
1655            .unwrap();
1656        worktree_a
1657            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1658            .await;
1659        let project_id = project_a
1660            .update(&mut cx_a, |project, _| project.next_remote_id())
1661            .await;
1662        project_a
1663            .update(&mut cx_a, |project, cx| project.share(cx))
1664            .await
1665            .unwrap();
1666
1667        // Join that project as client B
1668        let project_b = Project::remote(
1669            project_id,
1670            client_b.clone(),
1671            client_b.user_store.clone(),
1672            lang_registry.clone(),
1673            fs.clone(),
1674            &mut cx_b.to_async(),
1675        )
1676        .await
1677        .unwrap();
1678        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1679
1680        // See that a guest has joined as client A.
1681        project_a
1682            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1683            .await;
1684
1685        // Begin opening a buffer as client B, but leave the project before the open completes.
1686        let buffer_b = cx_b
1687            .background()
1688            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1689        cx_b.update(|_| drop(project_b));
1690        drop(buffer_b);
1691
1692        // See that the guest has left.
1693        project_a
1694            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1695            .await;
1696    }
1697
1698    #[gpui::test]
1699    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1700        cx_a.foreground().forbid_parking();
1701        let lang_registry = Arc::new(LanguageRegistry::new());
1702        let fs = Arc::new(FakeFs::new());
1703
1704        // Connect to a server as 2 clients.
1705        let mut server = TestServer::start().await;
1706        let client_a = server.create_client(&mut cx_a, "user_a").await;
1707        let client_b = server.create_client(&mut cx_b, "user_b").await;
1708
1709        // Share a project as client A
1710        fs.insert_tree(
1711            "/a",
1712            json!({
1713                ".zed.toml": r#"collaborators = ["user_b"]"#,
1714                "a.txt": "a-contents",
1715                "b.txt": "b-contents",
1716            }),
1717        )
1718        .await;
1719        let project_a = cx_a.update(|cx| {
1720            Project::local(
1721                client_a.clone(),
1722                client_a.user_store.clone(),
1723                lang_registry.clone(),
1724                fs.clone(),
1725                cx,
1726            )
1727        });
1728        let worktree_a = project_a
1729            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1730            .await
1731            .unwrap();
1732        worktree_a
1733            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1734            .await;
1735        let project_id = project_a
1736            .update(&mut cx_a, |project, _| project.next_remote_id())
1737            .await;
1738        project_a
1739            .update(&mut cx_a, |project, cx| project.share(cx))
1740            .await
1741            .unwrap();
1742
1743        // Join that project as client B
1744        let _project_b = Project::remote(
1745            project_id,
1746            client_b.clone(),
1747            client_b.user_store.clone(),
1748            lang_registry.clone(),
1749            fs.clone(),
1750            &mut cx_b.to_async(),
1751        )
1752        .await
1753        .unwrap();
1754
1755        // See that a guest has joined as client A.
1756        project_a
1757            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1758            .await;
1759
1760        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1761        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1762        project_a
1763            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1764            .await;
1765    }
1766
1767    #[gpui::test]
1768    async fn test_collaborating_with_diagnostics(
1769        mut cx_a: TestAppContext,
1770        mut cx_b: TestAppContext,
1771    ) {
1772        cx_a.foreground().forbid_parking();
1773        let mut lang_registry = Arc::new(LanguageRegistry::new());
1774        let fs = Arc::new(FakeFs::new());
1775
1776        // Set up a fake language server.
1777        let (language_server_config, mut fake_language_server) =
1778            LanguageServerConfig::fake(cx_a.background()).await;
1779        Arc::get_mut(&mut lang_registry)
1780            .unwrap()
1781            .add(Arc::new(Language::new(
1782                LanguageConfig {
1783                    name: "Rust".to_string(),
1784                    path_suffixes: vec!["rs".to_string()],
1785                    language_server: Some(language_server_config),
1786                    ..Default::default()
1787                },
1788                Some(tree_sitter_rust::language()),
1789            )));
1790
1791        // Connect to a server as 2 clients.
1792        let mut server = TestServer::start().await;
1793        let client_a = server.create_client(&mut cx_a, "user_a").await;
1794        let client_b = server.create_client(&mut cx_b, "user_b").await;
1795
1796        // Share a project as client A
1797        fs.insert_tree(
1798            "/a",
1799            json!({
1800                ".zed.toml": r#"collaborators = ["user_b"]"#,
1801                "a.rs": "let one = two",
1802                "other.rs": "",
1803            }),
1804        )
1805        .await;
1806        let project_a = cx_a.update(|cx| {
1807            Project::local(
1808                client_a.clone(),
1809                client_a.user_store.clone(),
1810                lang_registry.clone(),
1811                fs.clone(),
1812                cx,
1813            )
1814        });
1815        let worktree_a = project_a
1816            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1817            .await
1818            .unwrap();
1819        worktree_a
1820            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1821            .await;
1822        let project_id = project_a
1823            .update(&mut cx_a, |project, _| project.next_remote_id())
1824            .await;
1825        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1826        project_a
1827            .update(&mut cx_a, |project, cx| project.share(cx))
1828            .await
1829            .unwrap();
1830
1831        // Cause the language server to start.
1832        let _ = cx_a
1833            .background()
1834            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1835                worktree.open_buffer("other.rs", cx)
1836            }))
1837            .await
1838            .unwrap();
1839
1840        // Simulate a language server reporting errors for a file.
1841        fake_language_server
1842            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1843                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1844                version: None,
1845                diagnostics: vec![lsp::Diagnostic {
1846                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1847                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1848                    message: "message 1".to_string(),
1849                    ..Default::default()
1850                }],
1851            })
1852            .await;
1853
1854        // Wait for server to see the diagnostics update.
1855        server
1856            .condition(|store| {
1857                let worktree = store
1858                    .project(project_id)
1859                    .unwrap()
1860                    .worktrees
1861                    .get(&worktree_id.to_proto())
1862                    .unwrap();
1863
1864                !worktree
1865                    .share
1866                    .as_ref()
1867                    .unwrap()
1868                    .diagnostic_summaries
1869                    .is_empty()
1870            })
1871            .await;
1872
1873        // Join the worktree as client B.
1874        let project_b = Project::remote(
1875            project_id,
1876            client_b.clone(),
1877            client_b.user_store.clone(),
1878            lang_registry.clone(),
1879            fs.clone(),
1880            &mut cx_b.to_async(),
1881        )
1882        .await
1883        .unwrap();
1884
1885        project_b.read_with(&cx_b, |project, cx| {
1886            assert_eq!(
1887                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1888                &[(
1889                    ProjectPath {
1890                        worktree_id,
1891                        path: Arc::from(Path::new("a.rs")),
1892                    },
1893                    DiagnosticSummary {
1894                        error_count: 1,
1895                        warning_count: 0,
1896                        ..Default::default()
1897                    },
1898                )]
1899            )
1900        });
1901
1902        // Simulate a language server reporting more errors for a file.
1903        fake_language_server
1904            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1905                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1906                version: None,
1907                diagnostics: vec![
1908                    lsp::Diagnostic {
1909                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1910                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1911                        message: "message 1".to_string(),
1912                        ..Default::default()
1913                    },
1914                    lsp::Diagnostic {
1915                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1916                        range: lsp::Range::new(
1917                            lsp::Position::new(0, 10),
1918                            lsp::Position::new(0, 13),
1919                        ),
1920                        message: "message 2".to_string(),
1921                        ..Default::default()
1922                    },
1923                ],
1924            })
1925            .await;
1926
1927        // Client b gets the updated summaries
1928        project_b
1929            .condition(&cx_b, |project, cx| {
1930                project.diagnostic_summaries(cx).collect::<Vec<_>>()
1931                    == &[(
1932                        ProjectPath {
1933                            worktree_id,
1934                            path: Arc::from(Path::new("a.rs")),
1935                        },
1936                        DiagnosticSummary {
1937                            error_count: 1,
1938                            warning_count: 1,
1939                            ..Default::default()
1940                        },
1941                    )]
1942            })
1943            .await;
1944
1945        // Open the file with the errors on client B. They should be present.
1946        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1947        let buffer_b = cx_b
1948            .background()
1949            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1950            .await
1951            .unwrap();
1952
1953        buffer_b.read_with(&cx_b, |buffer, _| {
1954            assert_eq!(
1955                buffer
1956                    .snapshot()
1957                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1958                    .map(|entry| entry)
1959                    .collect::<Vec<_>>(),
1960                &[
1961                    DiagnosticEntry {
1962                        range: Point::new(0, 4)..Point::new(0, 7),
1963                        diagnostic: Diagnostic {
1964                            group_id: 0,
1965                            message: "message 1".to_string(),
1966                            severity: lsp::DiagnosticSeverity::ERROR,
1967                            is_primary: true,
1968                            ..Default::default()
1969                        }
1970                    },
1971                    DiagnosticEntry {
1972                        range: Point::new(0, 10)..Point::new(0, 13),
1973                        diagnostic: Diagnostic {
1974                            group_id: 1,
1975                            severity: lsp::DiagnosticSeverity::WARNING,
1976                            message: "message 2".to_string(),
1977                            is_primary: true,
1978                            ..Default::default()
1979                        }
1980                    }
1981                ]
1982            );
1983        });
1984    }
1985
1986    #[gpui::test]
1987    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1988        cx_a.foreground().forbid_parking();
1989
1990        // Connect to a server as 2 clients.
1991        let mut server = TestServer::start().await;
1992        let client_a = server.create_client(&mut cx_a, "user_a").await;
1993        let client_b = server.create_client(&mut cx_b, "user_b").await;
1994
1995        // Create an org that includes these 2 users.
1996        let db = &server.app_state.db;
1997        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1998        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1999            .await
2000            .unwrap();
2001        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2002            .await
2003            .unwrap();
2004
2005        // Create a channel that includes all the users.
2006        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2007        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2008            .await
2009            .unwrap();
2010        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2011            .await
2012            .unwrap();
2013        db.create_channel_message(
2014            channel_id,
2015            client_b.current_user_id(&cx_b),
2016            "hello A, it's B.",
2017            OffsetDateTime::now_utc(),
2018            1,
2019        )
2020        .await
2021        .unwrap();
2022
2023        let channels_a = cx_a
2024            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2025        channels_a
2026            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2027            .await;
2028        channels_a.read_with(&cx_a, |list, _| {
2029            assert_eq!(
2030                list.available_channels().unwrap(),
2031                &[ChannelDetails {
2032                    id: channel_id.to_proto(),
2033                    name: "test-channel".to_string()
2034                }]
2035            )
2036        });
2037        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2038            this.get_channel(channel_id.to_proto(), cx).unwrap()
2039        });
2040        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2041        channel_a
2042            .condition(&cx_a, |channel, _| {
2043                channel_messages(channel)
2044                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2045            })
2046            .await;
2047
2048        let channels_b = cx_b
2049            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2050        channels_b
2051            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2052            .await;
2053        channels_b.read_with(&cx_b, |list, _| {
2054            assert_eq!(
2055                list.available_channels().unwrap(),
2056                &[ChannelDetails {
2057                    id: channel_id.to_proto(),
2058                    name: "test-channel".to_string()
2059                }]
2060            )
2061        });
2062
2063        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2064            this.get_channel(channel_id.to_proto(), cx).unwrap()
2065        });
2066        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2067        channel_b
2068            .condition(&cx_b, |channel, _| {
2069                channel_messages(channel)
2070                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2071            })
2072            .await;
2073
2074        channel_a
2075            .update(&mut cx_a, |channel, cx| {
2076                channel
2077                    .send_message("oh, hi B.".to_string(), cx)
2078                    .unwrap()
2079                    .detach();
2080                let task = channel.send_message("sup".to_string(), cx).unwrap();
2081                assert_eq!(
2082                    channel_messages(channel),
2083                    &[
2084                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2085                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2086                        ("user_a".to_string(), "sup".to_string(), true)
2087                    ]
2088                );
2089                task
2090            })
2091            .await
2092            .unwrap();
2093
2094        channel_b
2095            .condition(&cx_b, |channel, _| {
2096                channel_messages(channel)
2097                    == [
2098                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2099                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2100                        ("user_a".to_string(), "sup".to_string(), false),
2101                    ]
2102            })
2103            .await;
2104
2105        assert_eq!(
2106            server
2107                .state()
2108                .await
2109                .channel(channel_id)
2110                .unwrap()
2111                .connection_ids
2112                .len(),
2113            2
2114        );
2115        cx_b.update(|_| drop(channel_b));
2116        server
2117            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2118            .await;
2119
2120        cx_a.update(|_| drop(channel_a));
2121        server
2122            .condition(|state| state.channel(channel_id).is_none())
2123            .await;
2124    }
2125
2126    #[gpui::test]
2127    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2128        cx_a.foreground().forbid_parking();
2129
2130        let mut server = TestServer::start().await;
2131        let client_a = server.create_client(&mut cx_a, "user_a").await;
2132
2133        let db = &server.app_state.db;
2134        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2135        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2136        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2137            .await
2138            .unwrap();
2139        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2140            .await
2141            .unwrap();
2142
2143        let channels_a = cx_a
2144            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2145        channels_a
2146            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2147            .await;
2148        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2149            this.get_channel(channel_id.to_proto(), cx).unwrap()
2150        });
2151
2152        // Messages aren't allowed to be too long.
2153        channel_a
2154            .update(&mut cx_a, |channel, cx| {
2155                let long_body = "this is long.\n".repeat(1024);
2156                channel.send_message(long_body, cx).unwrap()
2157            })
2158            .await
2159            .unwrap_err();
2160
2161        // Messages aren't allowed to be blank.
2162        channel_a.update(&mut cx_a, |channel, cx| {
2163            channel.send_message(String::new(), cx).unwrap_err()
2164        });
2165
2166        // Leading and trailing whitespace are trimmed.
2167        channel_a
2168            .update(&mut cx_a, |channel, cx| {
2169                channel
2170                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2171                    .unwrap()
2172            })
2173            .await
2174            .unwrap();
2175        assert_eq!(
2176            db.get_channel_messages(channel_id, 10, None)
2177                .await
2178                .unwrap()
2179                .iter()
2180                .map(|m| &m.body)
2181                .collect::<Vec<_>>(),
2182            &["surrounded by whitespace"]
2183        );
2184    }
2185
2186    #[gpui::test]
2187    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2188        cx_a.foreground().forbid_parking();
2189
2190        // Connect to a server as 2 clients.
2191        let mut server = TestServer::start().await;
2192        let client_a = server.create_client(&mut cx_a, "user_a").await;
2193        let client_b = server.create_client(&mut cx_b, "user_b").await;
2194        let mut status_b = client_b.status();
2195
2196        // Create an org that includes these 2 users.
2197        let db = &server.app_state.db;
2198        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2199        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2200            .await
2201            .unwrap();
2202        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2203            .await
2204            .unwrap();
2205
2206        // Create a channel that includes all the users.
2207        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2208        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2209            .await
2210            .unwrap();
2211        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2212            .await
2213            .unwrap();
2214        db.create_channel_message(
2215            channel_id,
2216            client_b.current_user_id(&cx_b),
2217            "hello A, it's B.",
2218            OffsetDateTime::now_utc(),
2219            2,
2220        )
2221        .await
2222        .unwrap();
2223
2224        let channels_a = cx_a
2225            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2226        channels_a
2227            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2228            .await;
2229
2230        channels_a.read_with(&cx_a, |list, _| {
2231            assert_eq!(
2232                list.available_channels().unwrap(),
2233                &[ChannelDetails {
2234                    id: channel_id.to_proto(),
2235                    name: "test-channel".to_string()
2236                }]
2237            )
2238        });
2239        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2240            this.get_channel(channel_id.to_proto(), cx).unwrap()
2241        });
2242        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2243        channel_a
2244            .condition(&cx_a, |channel, _| {
2245                channel_messages(channel)
2246                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2247            })
2248            .await;
2249
2250        let channels_b = cx_b
2251            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2252        channels_b
2253            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2254            .await;
2255        channels_b.read_with(&cx_b, |list, _| {
2256            assert_eq!(
2257                list.available_channels().unwrap(),
2258                &[ChannelDetails {
2259                    id: channel_id.to_proto(),
2260                    name: "test-channel".to_string()
2261                }]
2262            )
2263        });
2264
2265        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2266            this.get_channel(channel_id.to_proto(), cx).unwrap()
2267        });
2268        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2269        channel_b
2270            .condition(&cx_b, |channel, _| {
2271                channel_messages(channel)
2272                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2273            })
2274            .await;
2275
2276        // Disconnect client B, ensuring we can still access its cached channel data.
2277        server.forbid_connections();
2278        server.disconnect_client(client_b.current_user_id(&cx_b));
2279        while !matches!(
2280            status_b.recv().await,
2281            Some(client::Status::ReconnectionError { .. })
2282        ) {}
2283
2284        channels_b.read_with(&cx_b, |channels, _| {
2285            assert_eq!(
2286                channels.available_channels().unwrap(),
2287                [ChannelDetails {
2288                    id: channel_id.to_proto(),
2289                    name: "test-channel".to_string()
2290                }]
2291            )
2292        });
2293        channel_b.read_with(&cx_b, |channel, _| {
2294            assert_eq!(
2295                channel_messages(channel),
2296                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2297            )
2298        });
2299
2300        // Send a message from client B while it is disconnected.
2301        channel_b
2302            .update(&mut cx_b, |channel, cx| {
2303                let task = channel
2304                    .send_message("can you see this?".to_string(), cx)
2305                    .unwrap();
2306                assert_eq!(
2307                    channel_messages(channel),
2308                    &[
2309                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2310                        ("user_b".to_string(), "can you see this?".to_string(), true)
2311                    ]
2312                );
2313                task
2314            })
2315            .await
2316            .unwrap_err();
2317
2318        // Send a message from client A while B is disconnected.
2319        channel_a
2320            .update(&mut cx_a, |channel, cx| {
2321                channel
2322                    .send_message("oh, hi B.".to_string(), cx)
2323                    .unwrap()
2324                    .detach();
2325                let task = channel.send_message("sup".to_string(), cx).unwrap();
2326                assert_eq!(
2327                    channel_messages(channel),
2328                    &[
2329                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2330                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2331                        ("user_a".to_string(), "sup".to_string(), true)
2332                    ]
2333                );
2334                task
2335            })
2336            .await
2337            .unwrap();
2338
2339        // Give client B a chance to reconnect.
2340        server.allow_connections();
2341        cx_b.foreground().advance_clock(Duration::from_secs(10));
2342
2343        // Verify that B sees the new messages upon reconnection, as well as the message client B
2344        // sent while offline.
2345        channel_b
2346            .condition(&cx_b, |channel, _| {
2347                channel_messages(channel)
2348                    == [
2349                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2350                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2351                        ("user_a".to_string(), "sup".to_string(), false),
2352                        ("user_b".to_string(), "can you see this?".to_string(), false),
2353                    ]
2354            })
2355            .await;
2356
2357        // Ensure client A and B can communicate normally after reconnection.
2358        channel_a
2359            .update(&mut cx_a, |channel, cx| {
2360                channel.send_message("you online?".to_string(), cx).unwrap()
2361            })
2362            .await
2363            .unwrap();
2364        channel_b
2365            .condition(&cx_b, |channel, _| {
2366                channel_messages(channel)
2367                    == [
2368                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2369                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2370                        ("user_a".to_string(), "sup".to_string(), false),
2371                        ("user_b".to_string(), "can you see this?".to_string(), false),
2372                        ("user_a".to_string(), "you online?".to_string(), false),
2373                    ]
2374            })
2375            .await;
2376
2377        channel_b
2378            .update(&mut cx_b, |channel, cx| {
2379                channel.send_message("yep".to_string(), cx).unwrap()
2380            })
2381            .await
2382            .unwrap();
2383        channel_a
2384            .condition(&cx_a, |channel, _| {
2385                channel_messages(channel)
2386                    == [
2387                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2388                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2389                        ("user_a".to_string(), "sup".to_string(), false),
2390                        ("user_b".to_string(), "can you see this?".to_string(), false),
2391                        ("user_a".to_string(), "you online?".to_string(), false),
2392                        ("user_b".to_string(), "yep".to_string(), false),
2393                    ]
2394            })
2395            .await;
2396    }
2397
2398    #[gpui::test]
2399    async fn test_contacts(
2400        mut cx_a: TestAppContext,
2401        mut cx_b: TestAppContext,
2402        mut cx_c: TestAppContext,
2403    ) {
2404        cx_a.foreground().forbid_parking();
2405        let lang_registry = Arc::new(LanguageRegistry::new());
2406        let fs = Arc::new(FakeFs::new());
2407
2408        // Connect to a server as 3 clients.
2409        let mut server = TestServer::start().await;
2410        let client_a = server.create_client(&mut cx_a, "user_a").await;
2411        let client_b = server.create_client(&mut cx_b, "user_b").await;
2412        let client_c = server.create_client(&mut cx_c, "user_c").await;
2413
2414        // Share a worktree as client A.
2415        fs.insert_tree(
2416            "/a",
2417            json!({
2418                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2419            }),
2420        )
2421        .await;
2422
2423        let project_a = cx_a.update(|cx| {
2424            Project::local(
2425                client_a.clone(),
2426                client_a.user_store.clone(),
2427                lang_registry.clone(),
2428                fs.clone(),
2429                cx,
2430            )
2431        });
2432        let worktree_a = project_a
2433            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2434            .await
2435            .unwrap();
2436        worktree_a
2437            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2438            .await;
2439
2440        client_a
2441            .user_store
2442            .condition(&cx_a, |user_store, _| {
2443                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2444            })
2445            .await;
2446        client_b
2447            .user_store
2448            .condition(&cx_b, |user_store, _| {
2449                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2450            })
2451            .await;
2452        client_c
2453            .user_store
2454            .condition(&cx_c, |user_store, _| {
2455                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2456            })
2457            .await;
2458
2459        let project_id = project_a
2460            .update(&mut cx_a, |project, _| project.next_remote_id())
2461            .await;
2462        project_a
2463            .update(&mut cx_a, |project, cx| project.share(cx))
2464            .await
2465            .unwrap();
2466
2467        let _project_b = Project::remote(
2468            project_id,
2469            client_b.clone(),
2470            client_b.user_store.clone(),
2471            lang_registry.clone(),
2472            fs.clone(),
2473            &mut cx_b.to_async(),
2474        )
2475        .await
2476        .unwrap();
2477
2478        client_a
2479            .user_store
2480            .condition(&cx_a, |user_store, _| {
2481                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2482            })
2483            .await;
2484        client_b
2485            .user_store
2486            .condition(&cx_b, |user_store, _| {
2487                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2488            })
2489            .await;
2490        client_c
2491            .user_store
2492            .condition(&cx_c, |user_store, _| {
2493                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2494            })
2495            .await;
2496
2497        project_a
2498            .condition(&cx_a, |project, _| {
2499                project.collaborators().contains_key(&client_b.peer_id)
2500            })
2501            .await;
2502
2503        cx_a.update(move |_| drop(project_a));
2504        client_a
2505            .user_store
2506            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2507            .await;
2508        client_b
2509            .user_store
2510            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2511            .await;
2512        client_c
2513            .user_store
2514            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2515            .await;
2516
2517        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2518            user_store
2519                .contacts()
2520                .iter()
2521                .map(|contact| {
2522                    let worktrees = contact
2523                        .projects
2524                        .iter()
2525                        .map(|p| {
2526                            (
2527                                p.worktree_root_names[0].as_str(),
2528                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2529                            )
2530                        })
2531                        .collect();
2532                    (contact.user.github_login.as_str(), worktrees)
2533                })
2534                .collect()
2535        }
2536    }
2537
2538    struct TestServer {
2539        peer: Arc<Peer>,
2540        app_state: Arc<AppState>,
2541        server: Arc<Server>,
2542        notifications: mpsc::Receiver<()>,
2543        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2544        forbid_connections: Arc<AtomicBool>,
2545        _test_db: TestDb,
2546    }
2547
2548    impl TestServer {
2549        async fn start() -> Self {
2550            let test_db = TestDb::new();
2551            let app_state = Self::build_app_state(&test_db).await;
2552            let peer = Peer::new();
2553            let notifications = mpsc::channel(128);
2554            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2555            Self {
2556                peer,
2557                app_state,
2558                server,
2559                notifications: notifications.1,
2560                connection_killers: Default::default(),
2561                forbid_connections: Default::default(),
2562                _test_db: test_db,
2563            }
2564        }
2565
2566        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2567            let http = FakeHttpClient::with_404_response();
2568            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2569            let client_name = name.to_string();
2570            let mut client = Client::new(http.clone());
2571            let server = self.server.clone();
2572            let connection_killers = self.connection_killers.clone();
2573            let forbid_connections = self.forbid_connections.clone();
2574            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2575
2576            Arc::get_mut(&mut client)
2577                .unwrap()
2578                .override_authenticate(move |cx| {
2579                    cx.spawn(|_| async move {
2580                        let access_token = "the-token".to_string();
2581                        Ok(Credentials {
2582                            user_id: user_id.0 as u64,
2583                            access_token,
2584                        })
2585                    })
2586                })
2587                .override_establish_connection(move |credentials, cx| {
2588                    assert_eq!(credentials.user_id, user_id.0 as u64);
2589                    assert_eq!(credentials.access_token, "the-token");
2590
2591                    let server = server.clone();
2592                    let connection_killers = connection_killers.clone();
2593                    let forbid_connections = forbid_connections.clone();
2594                    let client_name = client_name.clone();
2595                    let connection_id_tx = connection_id_tx.clone();
2596                    cx.spawn(move |cx| async move {
2597                        if forbid_connections.load(SeqCst) {
2598                            Err(EstablishConnectionError::other(anyhow!(
2599                                "server is forbidding connections"
2600                            )))
2601                        } else {
2602                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2603                            connection_killers.lock().insert(user_id, kill_conn);
2604                            cx.background()
2605                                .spawn(server.handle_connection(
2606                                    server_conn,
2607                                    client_name,
2608                                    user_id,
2609                                    Some(connection_id_tx),
2610                                ))
2611                                .detach();
2612                            Ok(client_conn)
2613                        }
2614                    })
2615                });
2616
2617            client
2618                .authenticate_and_connect(&cx.to_async())
2619                .await
2620                .unwrap();
2621
2622            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2623            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2624            let mut authed_user =
2625                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2626            while authed_user.recv().await.unwrap().is_none() {}
2627
2628            TestClient {
2629                client,
2630                peer_id,
2631                user_store,
2632            }
2633        }
2634
2635        fn disconnect_client(&self, user_id: UserId) {
2636            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2637                let _ = kill_conn.try_send(Some(()));
2638            }
2639        }
2640
2641        fn forbid_connections(&self) {
2642            self.forbid_connections.store(true, SeqCst);
2643        }
2644
2645        fn allow_connections(&self) {
2646            self.forbid_connections.store(false, SeqCst);
2647        }
2648
2649        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2650            let mut config = Config::default();
2651            config.session_secret = "a".repeat(32);
2652            config.database_url = test_db.url.clone();
2653            let github_client = github::AppClient::test();
2654            Arc::new(AppState {
2655                db: test_db.db().clone(),
2656                handlebars: Default::default(),
2657                auth_client: auth::build_client("", ""),
2658                repo_client: github::RepoClient::test(&github_client),
2659                github_client,
2660                config,
2661            })
2662        }
2663
2664        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2665            self.server.store.read()
2666        }
2667
2668        async fn condition<F>(&mut self, mut predicate: F)
2669        where
2670            F: FnMut(&Store) -> bool,
2671        {
2672            async_std::future::timeout(Duration::from_millis(500), async {
2673                while !(predicate)(&*self.server.store.read()) {
2674                    self.notifications.recv().await;
2675                }
2676            })
2677            .await
2678            .expect("condition timed out");
2679        }
2680    }
2681
2682    impl Drop for TestServer {
2683        fn drop(&mut self) {
2684            task::block_on(self.peer.reset());
2685        }
2686    }
2687
2688    struct TestClient {
2689        client: Arc<Client>,
2690        pub peer_id: PeerId,
2691        pub user_store: ModelHandle<UserStore>,
2692    }
2693
2694    impl Deref for TestClient {
2695        type Target = Arc<Client>;
2696
2697        fn deref(&self) -> &Self::Target {
2698            &self.client
2699        }
2700    }
2701
2702    impl TestClient {
2703        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2704            UserId::from_proto(
2705                self.user_store
2706                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2707            )
2708        }
2709    }
2710
2711    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2712        channel
2713            .messages()
2714            .cursor::<()>()
2715            .map(|m| {
2716                (
2717                    m.sender.github_login.clone(),
2718                    m.body.clone(),
2719                    m.is_pending(),
2720                )
2721            })
2722            .collect()
2723    }
2724
2725    struct EmptyView;
2726
2727    impl gpui::Entity for EmptyView {
2728        type Event = ();
2729    }
2730
2731    impl gpui::View for EmptyView {
2732        fn ui_name() -> &'static str {
2733            "empty view"
2734        }
2735
2736        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2737            gpui::Element::boxed(gpui::elements::Empty)
2738        }
2739    }
2740}