rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updated)
  76            .add_handler(Server::open_buffer)
  77            .add_handler(Server::close_buffer)
  78            .add_handler(Server::update_buffer)
  79            .add_handler(Server::buffer_saved)
  80            .add_handler(Server::save_buffer)
  81            .add_handler(Server::get_channels)
  82            .add_handler(Server::get_users)
  83            .add_handler(Server::join_channel)
  84            .add_handler(Server::leave_channel)
  85            .add_handler(Server::send_channel_message)
  86            .add_handler(Server::get_channel_messages);
  87
  88        Arc::new(server)
  89    }
  90
  91    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  92    where
  93        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  94        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  95        M: EnvelopedMessage,
  96    {
  97        let prev_handler = self.handlers.insert(
  98            TypeId::of::<M>(),
  99            Box::new(move |server, envelope| {
 100                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 101                (handler)(server, *envelope).boxed()
 102            }),
 103        );
 104        if prev_handler.is_some() {
 105            panic!("registered a handler for the same message twice");
 106        }
 107        self
 108    }
 109
 110    pub fn handle_connection(
 111        self: &Arc<Self>,
 112        connection: Connection,
 113        addr: String,
 114        user_id: UserId,
 115        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 116    ) -> impl Future<Output = ()> {
 117        let mut this = self.clone();
 118        async move {
 119            let (connection_id, handle_io, mut incoming_rx) =
 120                this.peer.add_connection(connection).await;
 121
 122            if let Some(send_connection_id) = send_connection_id.as_mut() {
 123                let _ = send_connection_id.send(connection_id).await;
 124            }
 125
 126            this.state_mut().add_connection(connection_id, user_id);
 127            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 128                log::error!("error updating contacts for {:?}: {}", user_id, err);
 129            }
 130
 131            let handle_io = handle_io.fuse();
 132            futures::pin_mut!(handle_io);
 133            loop {
 134                let next_message = incoming_rx.recv().fuse();
 135                futures::pin_mut!(next_message);
 136                futures::select_biased! {
 137                    message = next_message => {
 138                        if let Some(message) = message {
 139                            let start_time = Instant::now();
 140                            log::info!("RPC message received: {}", message.payload_type_name());
 141                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 142                                if let Err(err) = (handler)(this.clone(), message).await {
 143                                    log::error!("error handling message: {:?}", err);
 144                                } else {
 145                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 146                                }
 147
 148                                if let Some(mut notifications) = this.notifications.clone() {
 149                                    let _ = notifications.send(()).await;
 150                                }
 151                            } else {
 152                                log::warn!("unhandled message: {}", message.payload_type_name());
 153                            }
 154                        } else {
 155                            log::info!("rpc connection closed {:?}", addr);
 156                            break;
 157                        }
 158                    }
 159                    handle_io = handle_io => {
 160                        if let Err(err) = handle_io {
 161                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 162                        }
 163                        break;
 164                    }
 165                }
 166            }
 167
 168            if let Err(err) = this.sign_out(connection_id).await {
 169                log::error!("error signing out connection {:?} - {:?}", addr, err);
 170            }
 171        }
 172    }
 173
 174    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 175        self.peer.disconnect(connection_id).await;
 176        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 177
 178        for (project_id, project) in removed_connection.hosted_projects {
 179            if let Some(share) = project.share {
 180                broadcast(
 181                    connection_id,
 182                    share.guests.keys().copied().collect(),
 183                    |conn_id| {
 184                        self.peer
 185                            .send(conn_id, proto::UnshareProject { project_id })
 186                    },
 187                )
 188                .await?;
 189            }
 190        }
 191
 192        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 193            broadcast(connection_id, peer_ids, |conn_id| {
 194                self.peer.send(
 195                    conn_id,
 196                    proto::RemoveProjectCollaborator {
 197                        project_id,
 198                        peer_id: connection_id.0,
 199                    },
 200                )
 201            })
 202            .await?;
 203        }
 204
 205        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 206            .await?;
 207
 208        Ok(())
 209    }
 210
 211    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 212        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 213        Ok(())
 214    }
 215
 216    async fn register_project(
 217        mut self: Arc<Server>,
 218        request: TypedEnvelope<proto::RegisterProject>,
 219    ) -> tide::Result<()> {
 220        let project_id = {
 221            let mut state = self.state_mut();
 222            let user_id = state.user_id_for_connection(request.sender_id)?;
 223            state.register_project(request.sender_id, user_id)
 224        };
 225        self.peer
 226            .respond(
 227                request.receipt(),
 228                proto::RegisterProjectResponse { project_id },
 229            )
 230            .await?;
 231        Ok(())
 232    }
 233
 234    async fn unregister_project(
 235        mut self: Arc<Server>,
 236        request: TypedEnvelope<proto::UnregisterProject>,
 237    ) -> tide::Result<()> {
 238        let project = self
 239            .state_mut()
 240            .unregister_project(request.payload.project_id, request.sender_id)
 241            .ok_or_else(|| anyhow!("no such project"))?;
 242        self.update_contacts_for_users(project.authorized_user_ids().iter())
 243            .await?;
 244        Ok(())
 245    }
 246
 247    async fn share_project(
 248        mut self: Arc<Server>,
 249        request: TypedEnvelope<proto::ShareProject>,
 250    ) -> tide::Result<()> {
 251        self.state_mut()
 252            .share_project(request.payload.project_id, request.sender_id);
 253        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 254        Ok(())
 255    }
 256
 257    async fn unshare_project(
 258        mut self: Arc<Server>,
 259        request: TypedEnvelope<proto::UnshareProject>,
 260    ) -> tide::Result<()> {
 261        let project_id = request.payload.project_id;
 262        let project = self
 263            .state_mut()
 264            .unshare_project(project_id, request.sender_id)?;
 265
 266        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 267            self.peer
 268                .send(conn_id, proto::UnshareProject { project_id })
 269        })
 270        .await?;
 271        self.update_contacts_for_users(&project.authorized_user_ids)
 272            .await?;
 273
 274        Ok(())
 275    }
 276
 277    async fn join_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::JoinProject>,
 280    ) -> tide::Result<()> {
 281        let project_id = request.payload.project_id;
 282
 283        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 284        let response_data = self
 285            .state_mut()
 286            .join_project(request.sender_id, user_id, project_id)
 287            .and_then(|joined| {
 288                let share = joined.project.share()?;
 289                let peer_count = share.guests.len();
 290                let mut collaborators = Vec::with_capacity(peer_count);
 291                collaborators.push(proto::Collaborator {
 292                    peer_id: joined.project.host_connection_id.0,
 293                    replica_id: 0,
 294                    user_id: joined.project.host_user_id.to_proto(),
 295                });
 296                let worktrees = joined
 297                    .project
 298                    .worktrees
 299                    .iter()
 300                    .filter_map(|(id, worktree)| {
 301                        worktree.share.as_ref().map(|share| proto::Worktree {
 302                            id: *id,
 303                            root_name: worktree.root_name.clone(),
 304                            entries: share.entries.values().cloned().collect(),
 305                        })
 306                    })
 307                    .collect();
 308                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 309                    if *peer_conn_id != request.sender_id {
 310                        collaborators.push(proto::Collaborator {
 311                            peer_id: peer_conn_id.0,
 312                            replica_id: *peer_replica_id as u32,
 313                            user_id: peer_user_id.to_proto(),
 314                        });
 315                    }
 316                }
 317                let response = proto::JoinProjectResponse {
 318                    worktrees,
 319                    replica_id: joined.replica_id as u32,
 320                    collaborators,
 321                };
 322                let connection_ids = joined.project.connection_ids();
 323                let contact_user_ids = joined.project.authorized_user_ids();
 324                Ok((response, connection_ids, contact_user_ids))
 325            });
 326
 327        match response_data {
 328            Ok((response, connection_ids, contact_user_ids)) => {
 329                broadcast(request.sender_id, connection_ids, |conn_id| {
 330                    self.peer.send(
 331                        conn_id,
 332                        proto::AddProjectCollaborator {
 333                            project_id: project_id,
 334                            collaborator: Some(proto::Collaborator {
 335                                peer_id: request.sender_id.0,
 336                                replica_id: response.replica_id,
 337                                user_id: user_id.to_proto(),
 338                            }),
 339                        },
 340                    )
 341                })
 342                .await?;
 343                self.peer.respond(request.receipt(), response).await?;
 344                self.update_contacts_for_users(&contact_user_ids).await?;
 345            }
 346            Err(error) => {
 347                self.peer
 348                    .respond_with_error(
 349                        request.receipt(),
 350                        proto::Error {
 351                            message: error.to_string(),
 352                        },
 353                    )
 354                    .await?;
 355            }
 356        }
 357
 358        Ok(())
 359    }
 360
 361    async fn leave_project(
 362        mut self: Arc<Server>,
 363        request: TypedEnvelope<proto::LeaveProject>,
 364    ) -> tide::Result<()> {
 365        let sender_id = request.sender_id;
 366        let project_id = request.payload.project_id;
 367        let worktree = self.state_mut().leave_project(sender_id, project_id);
 368        if let Some(worktree) = worktree {
 369            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 370                self.peer.send(
 371                    conn_id,
 372                    proto::RemoveProjectCollaborator {
 373                        project_id,
 374                        peer_id: sender_id.0,
 375                    },
 376                )
 377            })
 378            .await?;
 379            self.update_contacts_for_users(&worktree.authorized_user_ids)
 380                .await?;
 381        }
 382        Ok(())
 383    }
 384
 385    async fn register_worktree(
 386        mut self: Arc<Server>,
 387        request: TypedEnvelope<proto::RegisterWorktree>,
 388    ) -> tide::Result<()> {
 389        let receipt = request.receipt();
 390        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 391
 392        let mut contact_user_ids = HashSet::default();
 393        contact_user_ids.insert(host_user_id);
 394        for github_login in request.payload.authorized_logins {
 395            match self.app_state.db.create_user(&github_login, false).await {
 396                Ok(contact_user_id) => {
 397                    contact_user_ids.insert(contact_user_id);
 398                }
 399                Err(err) => {
 400                    let message = err.to_string();
 401                    self.peer
 402                        .respond_with_error(receipt, proto::Error { message })
 403                        .await?;
 404                    return Ok(());
 405                }
 406            }
 407        }
 408
 409        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 410        let ok = self.state_mut().register_worktree(
 411            request.payload.project_id,
 412            request.payload.worktree_id,
 413            Worktree {
 414                authorized_user_ids: contact_user_ids.clone(),
 415                root_name: request.payload.root_name,
 416                share: None,
 417            },
 418        );
 419
 420        if ok {
 421            self.peer.respond(receipt, proto::Ack {}).await?;
 422            self.update_contacts_for_users(&contact_user_ids).await?;
 423        } else {
 424            self.peer
 425                .respond_with_error(
 426                    receipt,
 427                    proto::Error {
 428                        message: NO_SUCH_PROJECT.to_string(),
 429                    },
 430                )
 431                .await?;
 432        }
 433
 434        Ok(())
 435    }
 436
 437    async fn unregister_worktree(
 438        mut self: Arc<Server>,
 439        request: TypedEnvelope<proto::UnregisterWorktree>,
 440    ) -> tide::Result<()> {
 441        let project_id = request.payload.project_id;
 442        let worktree_id = request.payload.worktree_id;
 443        let (worktree, guest_connection_ids) =
 444            self.state_mut()
 445                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 446
 447        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 448            self.peer.send(
 449                conn_id,
 450                proto::UnregisterWorktree {
 451                    project_id,
 452                    worktree_id,
 453                },
 454            )
 455        })
 456        .await?;
 457        self.update_contacts_for_users(&worktree.authorized_user_ids)
 458            .await?;
 459        Ok(())
 460    }
 461
 462    async fn share_worktree(
 463        mut self: Arc<Server>,
 464        mut request: TypedEnvelope<proto::ShareWorktree>,
 465    ) -> tide::Result<()> {
 466        let worktree = request
 467            .payload
 468            .worktree
 469            .as_mut()
 470            .ok_or_else(|| anyhow!("missing worktree"))?;
 471        let entries = mem::take(&mut worktree.entries)
 472            .into_iter()
 473            .map(|entry| (entry.id, entry))
 474            .collect();
 475
 476        let contact_user_ids = self.state_mut().share_worktree(
 477            request.payload.project_id,
 478            worktree.id,
 479            request.sender_id,
 480            entries,
 481        );
 482        if let Some(contact_user_ids) = contact_user_ids {
 483            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 484            self.update_contacts_for_users(&contact_user_ids).await?;
 485        } else {
 486            self.peer
 487                .respond_with_error(
 488                    request.receipt(),
 489                    proto::Error {
 490                        message: "no such worktree".to_string(),
 491                    },
 492                )
 493                .await?;
 494        }
 495        Ok(())
 496    }
 497
 498    async fn update_worktree(
 499        mut self: Arc<Server>,
 500        request: TypedEnvelope<proto::UpdateWorktree>,
 501    ) -> tide::Result<()> {
 502        let connection_ids = self
 503            .state_mut()
 504            .update_worktree(
 505                request.sender_id,
 506                request.payload.project_id,
 507                request.payload.worktree_id,
 508                &request.payload.removed_entries,
 509                &request.payload.updated_entries,
 510            )
 511            .ok_or_else(|| anyhow!("no such worktree"))?;
 512
 513        broadcast(request.sender_id, connection_ids, |connection_id| {
 514            self.peer
 515                .forward_send(request.sender_id, connection_id, request.payload.clone())
 516        })
 517        .await?;
 518
 519        Ok(())
 520    }
 521
 522    async fn update_diagnostic_summary(
 523        self: Arc<Server>,
 524        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 525    ) -> tide::Result<()> {
 526        let receiver_ids = self
 527            .state()
 528            .project_connection_ids(request.payload.project_id, request.sender_id)
 529            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 530        broadcast(request.sender_id, receiver_ids, |connection_id| {
 531            self.peer
 532                .forward_send(request.sender_id, connection_id, request.payload.clone())
 533        })
 534        .await?;
 535        Ok(())
 536    }
 537
 538    async fn disk_based_diagnostics_updated(
 539        self: Arc<Server>,
 540        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 541    ) -> tide::Result<()> {
 542        let receiver_ids = self
 543            .state()
 544            .project_connection_ids(request.payload.project_id, request.sender_id)
 545            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 546        broadcast(request.sender_id, receiver_ids, |connection_id| {
 547            self.peer
 548                .forward_send(request.sender_id, connection_id, request.payload.clone())
 549        })
 550        .await?;
 551        Ok(())
 552    }
 553
 554    async fn open_buffer(
 555        self: Arc<Server>,
 556        request: TypedEnvelope<proto::OpenBuffer>,
 557    ) -> tide::Result<()> {
 558        let receipt = request.receipt();
 559        let host_connection_id = self
 560            .state()
 561            .read_project(request.payload.project_id, request.sender_id)
 562            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 563            .host_connection_id;
 564        let response = self
 565            .peer
 566            .forward_request(request.sender_id, host_connection_id, request.payload)
 567            .await?;
 568        self.peer.respond(receipt, response).await?;
 569        Ok(())
 570    }
 571
 572    async fn close_buffer(
 573        self: Arc<Server>,
 574        request: TypedEnvelope<proto::CloseBuffer>,
 575    ) -> tide::Result<()> {
 576        let host_connection_id = self
 577            .state()
 578            .read_project(request.payload.project_id, request.sender_id)
 579            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 580            .host_connection_id;
 581        self.peer
 582            .forward_send(request.sender_id, host_connection_id, request.payload)
 583            .await?;
 584        Ok(())
 585    }
 586
 587    async fn save_buffer(
 588        self: Arc<Server>,
 589        request: TypedEnvelope<proto::SaveBuffer>,
 590    ) -> tide::Result<()> {
 591        let host;
 592        let guests;
 593        {
 594            let state = self.state();
 595            let project = state
 596                .read_project(request.payload.project_id, request.sender_id)
 597                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 598            host = project.host_connection_id;
 599            guests = project.guest_connection_ids()
 600        }
 601
 602        let sender = request.sender_id;
 603        let receipt = request.receipt();
 604        let response = self
 605            .peer
 606            .forward_request(sender, host, request.payload.clone())
 607            .await?;
 608
 609        broadcast(host, guests, |conn_id| {
 610            let response = response.clone();
 611            let peer = &self.peer;
 612            async move {
 613                if conn_id == sender {
 614                    peer.respond(receipt, response).await
 615                } else {
 616                    peer.forward_send(host, conn_id, response).await
 617                }
 618            }
 619        })
 620        .await?;
 621
 622        Ok(())
 623    }
 624
 625    async fn update_buffer(
 626        self: Arc<Server>,
 627        request: TypedEnvelope<proto::UpdateBuffer>,
 628    ) -> tide::Result<()> {
 629        let receiver_ids = self
 630            .state()
 631            .project_connection_ids(request.payload.project_id, request.sender_id)
 632            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 633        broadcast(request.sender_id, receiver_ids, |connection_id| {
 634            self.peer
 635                .forward_send(request.sender_id, connection_id, request.payload.clone())
 636        })
 637        .await?;
 638        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 639        Ok(())
 640    }
 641
 642    async fn buffer_saved(
 643        self: Arc<Server>,
 644        request: TypedEnvelope<proto::BufferSaved>,
 645    ) -> tide::Result<()> {
 646        let receiver_ids = self
 647            .state()
 648            .project_connection_ids(request.payload.project_id, request.sender_id)
 649            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 650        broadcast(request.sender_id, receiver_ids, |connection_id| {
 651            self.peer
 652                .forward_send(request.sender_id, connection_id, request.payload.clone())
 653        })
 654        .await?;
 655        Ok(())
 656    }
 657
 658    async fn get_channels(
 659        self: Arc<Server>,
 660        request: TypedEnvelope<proto::GetChannels>,
 661    ) -> tide::Result<()> {
 662        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 663        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 664        self.peer
 665            .respond(
 666                request.receipt(),
 667                proto::GetChannelsResponse {
 668                    channels: channels
 669                        .into_iter()
 670                        .map(|chan| proto::Channel {
 671                            id: chan.id.to_proto(),
 672                            name: chan.name,
 673                        })
 674                        .collect(),
 675                },
 676            )
 677            .await?;
 678        Ok(())
 679    }
 680
 681    async fn get_users(
 682        self: Arc<Server>,
 683        request: TypedEnvelope<proto::GetUsers>,
 684    ) -> tide::Result<()> {
 685        let receipt = request.receipt();
 686        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 687        let users = self
 688            .app_state
 689            .db
 690            .get_users_by_ids(user_ids)
 691            .await?
 692            .into_iter()
 693            .map(|user| proto::User {
 694                id: user.id.to_proto(),
 695                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 696                github_login: user.github_login,
 697            })
 698            .collect();
 699        self.peer
 700            .respond(receipt, proto::GetUsersResponse { users })
 701            .await?;
 702        Ok(())
 703    }
 704
 705    async fn update_contacts_for_users<'a>(
 706        self: &Arc<Server>,
 707        user_ids: impl IntoIterator<Item = &'a UserId>,
 708    ) -> tide::Result<()> {
 709        let mut send_futures = Vec::new();
 710
 711        {
 712            let state = self.state();
 713            for user_id in user_ids {
 714                let contacts = state.contacts_for_user(*user_id);
 715                for connection_id in state.connection_ids_for_user(*user_id) {
 716                    send_futures.push(self.peer.send(
 717                        connection_id,
 718                        proto::UpdateContacts {
 719                            contacts: contacts.clone(),
 720                        },
 721                    ));
 722                }
 723            }
 724        }
 725        futures::future::try_join_all(send_futures).await?;
 726
 727        Ok(())
 728    }
 729
 730    async fn join_channel(
 731        mut self: Arc<Self>,
 732        request: TypedEnvelope<proto::JoinChannel>,
 733    ) -> tide::Result<()> {
 734        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 735        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 736        if !self
 737            .app_state
 738            .db
 739            .can_user_access_channel(user_id, channel_id)
 740            .await?
 741        {
 742            Err(anyhow!("access denied"))?;
 743        }
 744
 745        self.state_mut().join_channel(request.sender_id, channel_id);
 746        let messages = self
 747            .app_state
 748            .db
 749            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 750            .await?
 751            .into_iter()
 752            .map(|msg| proto::ChannelMessage {
 753                id: msg.id.to_proto(),
 754                body: msg.body,
 755                timestamp: msg.sent_at.unix_timestamp() as u64,
 756                sender_id: msg.sender_id.to_proto(),
 757                nonce: Some(msg.nonce.as_u128().into()),
 758            })
 759            .collect::<Vec<_>>();
 760        self.peer
 761            .respond(
 762                request.receipt(),
 763                proto::JoinChannelResponse {
 764                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 765                    messages,
 766                },
 767            )
 768            .await?;
 769        Ok(())
 770    }
 771
 772    async fn leave_channel(
 773        mut self: Arc<Self>,
 774        request: TypedEnvelope<proto::LeaveChannel>,
 775    ) -> tide::Result<()> {
 776        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 777        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 778        if !self
 779            .app_state
 780            .db
 781            .can_user_access_channel(user_id, channel_id)
 782            .await?
 783        {
 784            Err(anyhow!("access denied"))?;
 785        }
 786
 787        self.state_mut()
 788            .leave_channel(request.sender_id, channel_id);
 789
 790        Ok(())
 791    }
 792
 793    async fn send_channel_message(
 794        self: Arc<Self>,
 795        request: TypedEnvelope<proto::SendChannelMessage>,
 796    ) -> tide::Result<()> {
 797        let receipt = request.receipt();
 798        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 799        let user_id;
 800        let connection_ids;
 801        {
 802            let state = self.state();
 803            user_id = state.user_id_for_connection(request.sender_id)?;
 804            if let Some(ids) = state.channel_connection_ids(channel_id) {
 805                connection_ids = ids;
 806            } else {
 807                return Ok(());
 808            }
 809        }
 810
 811        // Validate the message body.
 812        let body = request.payload.body.trim().to_string();
 813        if body.len() > MAX_MESSAGE_LEN {
 814            self.peer
 815                .respond_with_error(
 816                    receipt,
 817                    proto::Error {
 818                        message: "message is too long".to_string(),
 819                    },
 820                )
 821                .await?;
 822            return Ok(());
 823        }
 824        if body.is_empty() {
 825            self.peer
 826                .respond_with_error(
 827                    receipt,
 828                    proto::Error {
 829                        message: "message can't be blank".to_string(),
 830                    },
 831                )
 832                .await?;
 833            return Ok(());
 834        }
 835
 836        let timestamp = OffsetDateTime::now_utc();
 837        let nonce = if let Some(nonce) = request.payload.nonce {
 838            nonce
 839        } else {
 840            self.peer
 841                .respond_with_error(
 842                    receipt,
 843                    proto::Error {
 844                        message: "nonce can't be blank".to_string(),
 845                    },
 846                )
 847                .await?;
 848            return Ok(());
 849        };
 850
 851        let message_id = self
 852            .app_state
 853            .db
 854            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 855            .await?
 856            .to_proto();
 857        let message = proto::ChannelMessage {
 858            sender_id: user_id.to_proto(),
 859            id: message_id,
 860            body,
 861            timestamp: timestamp.unix_timestamp() as u64,
 862            nonce: Some(nonce),
 863        };
 864        broadcast(request.sender_id, connection_ids, |conn_id| {
 865            self.peer.send(
 866                conn_id,
 867                proto::ChannelMessageSent {
 868                    channel_id: channel_id.to_proto(),
 869                    message: Some(message.clone()),
 870                },
 871            )
 872        })
 873        .await?;
 874        self.peer
 875            .respond(
 876                receipt,
 877                proto::SendChannelMessageResponse {
 878                    message: Some(message),
 879                },
 880            )
 881            .await?;
 882        Ok(())
 883    }
 884
 885    async fn get_channel_messages(
 886        self: Arc<Self>,
 887        request: TypedEnvelope<proto::GetChannelMessages>,
 888    ) -> tide::Result<()> {
 889        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 890        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 891        if !self
 892            .app_state
 893            .db
 894            .can_user_access_channel(user_id, channel_id)
 895            .await?
 896        {
 897            Err(anyhow!("access denied"))?;
 898        }
 899
 900        let messages = self
 901            .app_state
 902            .db
 903            .get_channel_messages(
 904                channel_id,
 905                MESSAGE_COUNT_PER_PAGE,
 906                Some(MessageId::from_proto(request.payload.before_message_id)),
 907            )
 908            .await?
 909            .into_iter()
 910            .map(|msg| proto::ChannelMessage {
 911                id: msg.id.to_proto(),
 912                body: msg.body,
 913                timestamp: msg.sent_at.unix_timestamp() as u64,
 914                sender_id: msg.sender_id.to_proto(),
 915                nonce: Some(msg.nonce.as_u128().into()),
 916            })
 917            .collect::<Vec<_>>();
 918        self.peer
 919            .respond(
 920                request.receipt(),
 921                proto::GetChannelMessagesResponse {
 922                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 923                    messages,
 924                },
 925            )
 926            .await?;
 927        Ok(())
 928    }
 929
 930    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 931        self.store.read()
 932    }
 933
 934    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 935        self.store.write()
 936    }
 937}
 938
 939pub async fn broadcast<F, T>(
 940    sender_id: ConnectionId,
 941    receiver_ids: Vec<ConnectionId>,
 942    mut f: F,
 943) -> anyhow::Result<()>
 944where
 945    F: FnMut(ConnectionId) -> T,
 946    T: Future<Output = anyhow::Result<()>>,
 947{
 948    let futures = receiver_ids
 949        .into_iter()
 950        .filter(|id| *id != sender_id)
 951        .map(|id| f(id));
 952    futures::future::try_join_all(futures).await?;
 953    Ok(())
 954}
 955
 956pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 957    let server = Server::new(app.state().clone(), rpc.clone(), None);
 958    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 959        let server = server.clone();
 960        async move {
 961            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 962
 963            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 964            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 965            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 966            let client_protocol_version: Option<u32> = request
 967                .header("X-Zed-Protocol-Version")
 968                .and_then(|v| v.as_str().parse().ok());
 969
 970            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 971                return Ok(Response::new(StatusCode::UpgradeRequired));
 972            }
 973
 974            let header = match request.header("Sec-Websocket-Key") {
 975                Some(h) => h.as_str(),
 976                None => return Err(anyhow!("expected sec-websocket-key"))?,
 977            };
 978
 979            let user_id = process_auth_header(&request).await?;
 980
 981            let mut response = Response::new(StatusCode::SwitchingProtocols);
 982            response.insert_header(UPGRADE, "websocket");
 983            response.insert_header(CONNECTION, "Upgrade");
 984            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 985            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 986            response.insert_header("Sec-Websocket-Version", "13");
 987
 988            let http_res: &mut tide::http::Response = response.as_mut();
 989            let upgrade_receiver = http_res.recv_upgrade().await;
 990            let addr = request.remote().unwrap_or("unknown").to_string();
 991            task::spawn(async move {
 992                if let Some(stream) = upgrade_receiver.await {
 993                    server
 994                        .handle_connection(
 995                            Connection::new(
 996                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 997                            ),
 998                            addr,
 999                            user_id,
1000                            None,
1001                        )
1002                        .await;
1003                }
1004            });
1005
1006            Ok(response)
1007        }
1008    });
1009}
1010
1011fn header_contains_ignore_case<T>(
1012    request: &tide::Request<T>,
1013    header_name: HeaderName,
1014    value: &str,
1015) -> bool {
1016    request
1017        .header(header_name)
1018        .map(|h| {
1019            h.as_str()
1020                .split(',')
1021                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1022        })
1023        .unwrap_or(false)
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028    use super::*;
1029    use crate::{
1030        auth,
1031        db::{tests::TestDb, UserId},
1032        github, AppState, Config,
1033    };
1034    use ::rpc::Peer;
1035    use async_std::task;
1036    use gpui::{ModelHandle, TestAppContext};
1037    use parking_lot::Mutex;
1038    use postage::{mpsc, watch};
1039    use rpc::PeerId;
1040    use serde_json::json;
1041    use sqlx::types::time::OffsetDateTime;
1042    use std::{
1043        ops::Deref,
1044        path::Path,
1045        sync::{
1046            atomic::{AtomicBool, Ordering::SeqCst},
1047            Arc,
1048        },
1049        time::Duration,
1050    };
1051    use zed::{
1052        client::{
1053            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1054            EstablishConnectionError, UserStore,
1055        },
1056        editor::{Editor, EditorSettings, Input, MultiBuffer},
1057        fs::{FakeFs, Fs as _},
1058        language::{
1059            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1060            LanguageRegistry, LanguageServerConfig, Point,
1061        },
1062        lsp,
1063        project::{DiagnosticSummary, Project, ProjectPath},
1064    };
1065
1066    #[gpui::test]
1067    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1068        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1069        let lang_registry = Arc::new(LanguageRegistry::new());
1070        let fs = Arc::new(FakeFs::new());
1071        cx_a.foreground().forbid_parking();
1072
1073        // Connect to a server as 2 clients.
1074        let mut server = TestServer::start().await;
1075        let client_a = server.create_client(&mut cx_a, "user_a").await;
1076        let client_b = server.create_client(&mut cx_b, "user_b").await;
1077
1078        // Share a project as client A
1079        fs.insert_tree(
1080            "/a",
1081            json!({
1082                ".zed.toml": r#"collaborators = ["user_b"]"#,
1083                "a.txt": "a-contents",
1084                "b.txt": "b-contents",
1085            }),
1086        )
1087        .await;
1088        let project_a = cx_a.update(|cx| {
1089            Project::local(
1090                client_a.clone(),
1091                client_a.user_store.clone(),
1092                lang_registry.clone(),
1093                fs.clone(),
1094                cx,
1095            )
1096        });
1097        let worktree_a = project_a
1098            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1099            .await
1100            .unwrap();
1101        worktree_a
1102            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1103            .await;
1104        let project_id = project_a
1105            .update(&mut cx_a, |project, _| project.next_remote_id())
1106            .await;
1107        project_a
1108            .update(&mut cx_a, |project, cx| project.share(cx))
1109            .await
1110            .unwrap();
1111
1112        // Join that project as client B
1113        let project_b = Project::remote(
1114            project_id,
1115            client_b.clone(),
1116            client_b.user_store.clone(),
1117            lang_registry.clone(),
1118            fs.clone(),
1119            &mut cx_b.to_async(),
1120        )
1121        .await
1122        .unwrap();
1123        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1124
1125        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1126            assert_eq!(
1127                project
1128                    .collaborators()
1129                    .get(&client_a.peer_id)
1130                    .unwrap()
1131                    .user
1132                    .github_login,
1133                "user_a"
1134            );
1135            project.replica_id()
1136        });
1137        project_a
1138            .condition(&cx_a, |tree, _| {
1139                tree.collaborators()
1140                    .get(&client_b.peer_id)
1141                    .map_or(false, |collaborator| {
1142                        collaborator.replica_id == replica_id_b
1143                            && collaborator.user.github_login == "user_b"
1144                    })
1145            })
1146            .await;
1147
1148        // Open the same file as client B and client A.
1149        let buffer_b = worktree_b
1150            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1151            .await
1152            .unwrap();
1153        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1154        buffer_b.read_with(&cx_b, |buf, cx| {
1155            assert_eq!(buf.read(cx).text(), "b-contents")
1156        });
1157        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1158        let buffer_a = worktree_a
1159            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1160            .await
1161            .unwrap();
1162
1163        let editor_b = cx_b.add_view(window_b, |cx| {
1164            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1165        });
1166        // TODO
1167        // // Create a selection set as client B and see that selection set as client A.
1168        // buffer_a
1169        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1170        //     .await;
1171
1172        // Edit the buffer as client B and see that edit as client A.
1173        editor_b.update(&mut cx_b, |editor, cx| {
1174            editor.handle_input(&Input("ok, ".into()), cx)
1175        });
1176        buffer_a
1177            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1178            .await;
1179
1180        // TODO
1181        // // Remove the selection set as client B, see those selections disappear as client A.
1182        cx_b.update(move |_| drop(editor_b));
1183        // buffer_a
1184        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1185        //     .await;
1186
1187        // Close the buffer as client A, see that the buffer is closed.
1188        cx_a.update(move |_| drop(buffer_a));
1189        worktree_a
1190            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1191            .await;
1192
1193        // Dropping the client B's project removes client B from client A's collaborators.
1194        cx_b.update(move |_| drop(project_b));
1195        project_a
1196            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1197            .await;
1198    }
1199
1200    #[gpui::test]
1201    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1202        let lang_registry = Arc::new(LanguageRegistry::new());
1203        let fs = Arc::new(FakeFs::new());
1204        cx_a.foreground().forbid_parking();
1205
1206        // Connect to a server as 2 clients.
1207        let mut server = TestServer::start().await;
1208        let client_a = server.create_client(&mut cx_a, "user_a").await;
1209        let client_b = server.create_client(&mut cx_b, "user_b").await;
1210
1211        // Share a project as client A
1212        fs.insert_tree(
1213            "/a",
1214            json!({
1215                ".zed.toml": r#"collaborators = ["user_b"]"#,
1216                "a.txt": "a-contents",
1217                "b.txt": "b-contents",
1218            }),
1219        )
1220        .await;
1221        let project_a = cx_a.update(|cx| {
1222            Project::local(
1223                client_a.clone(),
1224                client_a.user_store.clone(),
1225                lang_registry.clone(),
1226                fs.clone(),
1227                cx,
1228            )
1229        });
1230        let worktree_a = project_a
1231            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1232            .await
1233            .unwrap();
1234        worktree_a
1235            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1236            .await;
1237        let project_id = project_a
1238            .update(&mut cx_a, |project, _| project.next_remote_id())
1239            .await;
1240        project_a
1241            .update(&mut cx_a, |project, cx| project.share(cx))
1242            .await
1243            .unwrap();
1244
1245        // Join that project as client B
1246        let project_b = Project::remote(
1247            project_id,
1248            client_b.clone(),
1249            client_b.user_store.clone(),
1250            lang_registry.clone(),
1251            fs.clone(),
1252            &mut cx_b.to_async(),
1253        )
1254        .await
1255        .unwrap();
1256
1257        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1258        worktree_b
1259            .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1260            .await
1261            .unwrap();
1262
1263        project_a
1264            .update(&mut cx_a, |project, cx| project.unshare(cx))
1265            .await
1266            .unwrap();
1267        project_b
1268            .condition(&mut cx_b, |project, _| project.is_read_only())
1269            .await;
1270    }
1271
1272    #[gpui::test]
1273    async fn test_propagate_saves_and_fs_changes(
1274        mut cx_a: TestAppContext,
1275        mut cx_b: TestAppContext,
1276        mut cx_c: TestAppContext,
1277    ) {
1278        let lang_registry = Arc::new(LanguageRegistry::new());
1279        let fs = Arc::new(FakeFs::new());
1280        cx_a.foreground().forbid_parking();
1281
1282        // Connect to a server as 3 clients.
1283        let mut server = TestServer::start().await;
1284        let client_a = server.create_client(&mut cx_a, "user_a").await;
1285        let client_b = server.create_client(&mut cx_b, "user_b").await;
1286        let client_c = server.create_client(&mut cx_c, "user_c").await;
1287
1288        // Share a worktree as client A.
1289        fs.insert_tree(
1290            "/a",
1291            json!({
1292                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1293                "file1": "",
1294                "file2": ""
1295            }),
1296        )
1297        .await;
1298        let project_a = cx_a.update(|cx| {
1299            Project::local(
1300                client_a.clone(),
1301                client_a.user_store.clone(),
1302                lang_registry.clone(),
1303                fs.clone(),
1304                cx,
1305            )
1306        });
1307        let worktree_a = project_a
1308            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1309            .await
1310            .unwrap();
1311        worktree_a
1312            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1313            .await;
1314        let project_id = project_a
1315            .update(&mut cx_a, |project, _| project.next_remote_id())
1316            .await;
1317        project_a
1318            .update(&mut cx_a, |project, cx| project.share(cx))
1319            .await
1320            .unwrap();
1321
1322        // Join that worktree as clients B and C.
1323        let project_b = Project::remote(
1324            project_id,
1325            client_b.clone(),
1326            client_b.user_store.clone(),
1327            lang_registry.clone(),
1328            fs.clone(),
1329            &mut cx_b.to_async(),
1330        )
1331        .await
1332        .unwrap();
1333        let project_c = Project::remote(
1334            project_id,
1335            client_c.clone(),
1336            client_c.user_store.clone(),
1337            lang_registry.clone(),
1338            fs.clone(),
1339            &mut cx_c.to_async(),
1340        )
1341        .await
1342        .unwrap();
1343
1344        // Open and edit a buffer as both guests B and C.
1345        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1346        let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1347        let buffer_b = worktree_b
1348            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1349            .await
1350            .unwrap();
1351        let buffer_c = worktree_c
1352            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1353            .await
1354            .unwrap();
1355        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1356        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1357
1358        // Open and edit that buffer as the host.
1359        let buffer_a = worktree_a
1360            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1361            .await
1362            .unwrap();
1363
1364        buffer_a
1365            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1366            .await;
1367        buffer_a.update(&mut cx_a, |buf, cx| {
1368            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1369        });
1370
1371        // Wait for edits to propagate
1372        buffer_a
1373            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1374            .await;
1375        buffer_b
1376            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1377            .await;
1378        buffer_c
1379            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1380            .await;
1381
1382        // Edit the buffer as the host and concurrently save as guest B.
1383        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1384        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1385        save_b.await.unwrap();
1386        assert_eq!(
1387            fs.load("/a/file1".as_ref()).await.unwrap(),
1388            "hi-a, i-am-c, i-am-b, i-am-a"
1389        );
1390        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1391        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1392        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1393
1394        // Make changes on host's file system, see those changes on the guests.
1395        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1396            .await
1397            .unwrap();
1398        fs.insert_file(Path::new("/a/file4"), "4".into())
1399            .await
1400            .unwrap();
1401
1402        worktree_b
1403            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1404            .await;
1405        worktree_c
1406            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1407            .await;
1408        worktree_b.read_with(&cx_b, |tree, _| {
1409            assert_eq!(
1410                tree.paths()
1411                    .map(|p| p.to_string_lossy())
1412                    .collect::<Vec<_>>(),
1413                &[".zed.toml", "file1", "file3", "file4"]
1414            )
1415        });
1416        worktree_c.read_with(&cx_c, |tree, _| {
1417            assert_eq!(
1418                tree.paths()
1419                    .map(|p| p.to_string_lossy())
1420                    .collect::<Vec<_>>(),
1421                &[".zed.toml", "file1", "file3", "file4"]
1422            )
1423        });
1424    }
1425
1426    #[gpui::test]
1427    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1428        cx_a.foreground().forbid_parking();
1429        let lang_registry = Arc::new(LanguageRegistry::new());
1430        let fs = Arc::new(FakeFs::new());
1431
1432        // Connect to a server as 2 clients.
1433        let mut server = TestServer::start().await;
1434        let client_a = server.create_client(&mut cx_a, "user_a").await;
1435        let client_b = server.create_client(&mut cx_b, "user_b").await;
1436
1437        // Share a project as client A
1438        fs.insert_tree(
1439            "/dir",
1440            json!({
1441                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1442                "a.txt": "a-contents",
1443            }),
1444        )
1445        .await;
1446
1447        let project_a = cx_a.update(|cx| {
1448            Project::local(
1449                client_a.clone(),
1450                client_a.user_store.clone(),
1451                lang_registry.clone(),
1452                fs.clone(),
1453                cx,
1454            )
1455        });
1456        let worktree_a = project_a
1457            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1458            .await
1459            .unwrap();
1460        worktree_a
1461            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1462            .await;
1463        let project_id = project_a
1464            .update(&mut cx_a, |project, _| project.next_remote_id())
1465            .await;
1466        project_a
1467            .update(&mut cx_a, |project, cx| project.share(cx))
1468            .await
1469            .unwrap();
1470
1471        // Join that project as client B
1472        let project_b = Project::remote(
1473            project_id,
1474            client_b.clone(),
1475            client_b.user_store.clone(),
1476            lang_registry.clone(),
1477            fs.clone(),
1478            &mut cx_b.to_async(),
1479        )
1480        .await
1481        .unwrap();
1482        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1483
1484        // Open a buffer as client B
1485        let buffer_b = worktree_b
1486            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1487            .await
1488            .unwrap();
1489        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1490
1491        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1492        buffer_b.read_with(&cx_b, |buf, _| {
1493            assert!(buf.is_dirty());
1494            assert!(!buf.has_conflict());
1495        });
1496
1497        buffer_b
1498            .update(&mut cx_b, |buf, cx| buf.save(cx))
1499            .unwrap()
1500            .await
1501            .unwrap();
1502        worktree_b
1503            .condition(&cx_b, |_, cx| {
1504                buffer_b.read(cx).file().unwrap().mtime() != mtime
1505            })
1506            .await;
1507        buffer_b.read_with(&cx_b, |buf, _| {
1508            assert!(!buf.is_dirty());
1509            assert!(!buf.has_conflict());
1510        });
1511
1512        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1513        buffer_b.read_with(&cx_b, |buf, _| {
1514            assert!(buf.is_dirty());
1515            assert!(!buf.has_conflict());
1516        });
1517    }
1518
1519    #[gpui::test]
1520    async fn test_editing_while_guest_opens_buffer(
1521        mut cx_a: TestAppContext,
1522        mut cx_b: TestAppContext,
1523    ) {
1524        cx_a.foreground().forbid_parking();
1525        let lang_registry = Arc::new(LanguageRegistry::new());
1526        let fs = Arc::new(FakeFs::new());
1527
1528        // Connect to a server as 2 clients.
1529        let mut server = TestServer::start().await;
1530        let client_a = server.create_client(&mut cx_a, "user_a").await;
1531        let client_b = server.create_client(&mut cx_b, "user_b").await;
1532
1533        // Share a project as client A
1534        fs.insert_tree(
1535            "/dir",
1536            json!({
1537                ".zed.toml": r#"collaborators = ["user_b"]"#,
1538                "a.txt": "a-contents",
1539            }),
1540        )
1541        .await;
1542        let project_a = cx_a.update(|cx| {
1543            Project::local(
1544                client_a.clone(),
1545                client_a.user_store.clone(),
1546                lang_registry.clone(),
1547                fs.clone(),
1548                cx,
1549            )
1550        });
1551        let worktree_a = project_a
1552            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1553            .await
1554            .unwrap();
1555        worktree_a
1556            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1557            .await;
1558        let project_id = project_a
1559            .update(&mut cx_a, |project, _| project.next_remote_id())
1560            .await;
1561        project_a
1562            .update(&mut cx_a, |project, cx| project.share(cx))
1563            .await
1564            .unwrap();
1565
1566        // Join that project as client B
1567        let project_b = Project::remote(
1568            project_id,
1569            client_b.clone(),
1570            client_b.user_store.clone(),
1571            lang_registry.clone(),
1572            fs.clone(),
1573            &mut cx_b.to_async(),
1574        )
1575        .await
1576        .unwrap();
1577        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1578
1579        // Open a buffer as client A
1580        let buffer_a = worktree_a
1581            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1582            .await
1583            .unwrap();
1584
1585        // Start opening the same buffer as client B
1586        let buffer_b = cx_b
1587            .background()
1588            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1589        task::yield_now().await;
1590
1591        // Edit the buffer as client A while client B is still opening it.
1592        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1593
1594        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1595        let buffer_b = buffer_b.await.unwrap();
1596        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1597    }
1598
1599    #[gpui::test]
1600    async fn test_leaving_worktree_while_opening_buffer(
1601        mut cx_a: TestAppContext,
1602        mut cx_b: TestAppContext,
1603    ) {
1604        cx_a.foreground().forbid_parking();
1605        let lang_registry = Arc::new(LanguageRegistry::new());
1606        let fs = Arc::new(FakeFs::new());
1607
1608        // Connect to a server as 2 clients.
1609        let mut server = TestServer::start().await;
1610        let client_a = server.create_client(&mut cx_a, "user_a").await;
1611        let client_b = server.create_client(&mut cx_b, "user_b").await;
1612
1613        // Share a project as client A
1614        fs.insert_tree(
1615            "/dir",
1616            json!({
1617                ".zed.toml": r#"collaborators = ["user_b"]"#,
1618                "a.txt": "a-contents",
1619            }),
1620        )
1621        .await;
1622        let project_a = cx_a.update(|cx| {
1623            Project::local(
1624                client_a.clone(),
1625                client_a.user_store.clone(),
1626                lang_registry.clone(),
1627                fs.clone(),
1628                cx,
1629            )
1630        });
1631        let worktree_a = project_a
1632            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1633            .await
1634            .unwrap();
1635        worktree_a
1636            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1637            .await;
1638        let project_id = project_a
1639            .update(&mut cx_a, |project, _| project.next_remote_id())
1640            .await;
1641        project_a
1642            .update(&mut cx_a, |project, cx| project.share(cx))
1643            .await
1644            .unwrap();
1645
1646        // Join that project as client B
1647        let project_b = Project::remote(
1648            project_id,
1649            client_b.clone(),
1650            client_b.user_store.clone(),
1651            lang_registry.clone(),
1652            fs.clone(),
1653            &mut cx_b.to_async(),
1654        )
1655        .await
1656        .unwrap();
1657        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1658
1659        // See that a guest has joined as client A.
1660        project_a
1661            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1662            .await;
1663
1664        // Begin opening a buffer as client B, but leave the project before the open completes.
1665        let buffer_b = cx_b
1666            .background()
1667            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1668        cx_b.update(|_| drop(project_b));
1669        drop(buffer_b);
1670
1671        // See that the guest has left.
1672        project_a
1673            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1674            .await;
1675    }
1676
1677    #[gpui::test]
1678    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1679        cx_a.foreground().forbid_parking();
1680        let lang_registry = Arc::new(LanguageRegistry::new());
1681        let fs = Arc::new(FakeFs::new());
1682
1683        // Connect to a server as 2 clients.
1684        let mut server = TestServer::start().await;
1685        let client_a = server.create_client(&mut cx_a, "user_a").await;
1686        let client_b = server.create_client(&mut cx_b, "user_b").await;
1687
1688        // Share a project as client A
1689        fs.insert_tree(
1690            "/a",
1691            json!({
1692                ".zed.toml": r#"collaborators = ["user_b"]"#,
1693                "a.txt": "a-contents",
1694                "b.txt": "b-contents",
1695            }),
1696        )
1697        .await;
1698        let project_a = cx_a.update(|cx| {
1699            Project::local(
1700                client_a.clone(),
1701                client_a.user_store.clone(),
1702                lang_registry.clone(),
1703                fs.clone(),
1704                cx,
1705            )
1706        });
1707        let worktree_a = project_a
1708            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1709            .await
1710            .unwrap();
1711        worktree_a
1712            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1713            .await;
1714        let project_id = project_a
1715            .update(&mut cx_a, |project, _| project.next_remote_id())
1716            .await;
1717        project_a
1718            .update(&mut cx_a, |project, cx| project.share(cx))
1719            .await
1720            .unwrap();
1721
1722        // Join that project as client B
1723        let _project_b = Project::remote(
1724            project_id,
1725            client_b.clone(),
1726            client_b.user_store.clone(),
1727            lang_registry.clone(),
1728            fs.clone(),
1729            &mut cx_b.to_async(),
1730        )
1731        .await
1732        .unwrap();
1733
1734        // See that a guest has joined as client A.
1735        project_a
1736            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1737            .await;
1738
1739        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1740        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1741        project_a
1742            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1743            .await;
1744    }
1745
1746    #[gpui::test]
1747    async fn test_collaborating_with_diagnostics(
1748        mut cx_a: TestAppContext,
1749        mut cx_b: TestAppContext,
1750    ) {
1751        cx_a.foreground().forbid_parking();
1752        let mut lang_registry = Arc::new(LanguageRegistry::new());
1753        let fs = Arc::new(FakeFs::new());
1754
1755        // Set up a fake language server.
1756        let (language_server_config, mut fake_language_server) =
1757            LanguageServerConfig::fake(cx_a.background()).await;
1758        Arc::get_mut(&mut lang_registry)
1759            .unwrap()
1760            .add(Arc::new(Language::new(
1761                LanguageConfig {
1762                    name: "Rust".to_string(),
1763                    path_suffixes: vec!["rs".to_string()],
1764                    language_server: Some(language_server_config),
1765                    ..Default::default()
1766                },
1767                Some(tree_sitter_rust::language()),
1768            )));
1769
1770        // Connect to a server as 2 clients.
1771        let mut server = TestServer::start().await;
1772        let client_a = server.create_client(&mut cx_a, "user_a").await;
1773        let client_b = server.create_client(&mut cx_b, "user_b").await;
1774
1775        // Share a project as client A
1776        fs.insert_tree(
1777            "/a",
1778            json!({
1779                ".zed.toml": r#"collaborators = ["user_b"]"#,
1780                "a.rs": "let one = two",
1781                "other.rs": "",
1782            }),
1783        )
1784        .await;
1785        let project_a = cx_a.update(|cx| {
1786            Project::local(
1787                client_a.clone(),
1788                client_a.user_store.clone(),
1789                lang_registry.clone(),
1790                fs.clone(),
1791                cx,
1792            )
1793        });
1794        let worktree_a = project_a
1795            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1796            .await
1797            .unwrap();
1798        worktree_a
1799            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1800            .await;
1801        let project_id = project_a
1802            .update(&mut cx_a, |project, _| project.next_remote_id())
1803            .await;
1804        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1805        project_a
1806            .update(&mut cx_a, |project, cx| project.share(cx))
1807            .await
1808            .unwrap();
1809
1810        // Cause the language server to start.
1811        let _ = cx_a
1812            .background()
1813            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1814                worktree.open_buffer("other.rs", cx)
1815            }))
1816            .await
1817            .unwrap();
1818
1819        // Join the worktree as client B.
1820        let project_b = Project::remote(
1821            project_id,
1822            client_b.clone(),
1823            client_b.user_store.clone(),
1824            lang_registry.clone(),
1825            fs.clone(),
1826            &mut cx_b.to_async(),
1827        )
1828        .await
1829        .unwrap();
1830
1831        // Simulate a language server reporting errors for a file.
1832        fake_language_server
1833            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1834                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1835                version: None,
1836                diagnostics: vec![
1837                    lsp::Diagnostic {
1838                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1839                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1840                        message: "message 1".to_string(),
1841                        ..Default::default()
1842                    },
1843                    lsp::Diagnostic {
1844                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1845                        range: lsp::Range::new(
1846                            lsp::Position::new(0, 10),
1847                            lsp::Position::new(0, 13),
1848                        ),
1849                        message: "message 2".to_string(),
1850                        ..Default::default()
1851                    },
1852                ],
1853            })
1854            .await;
1855
1856        project_b
1857            .condition(&cx_b, |project, cx| {
1858                project.diagnostic_summaries(cx).collect::<Vec<_>>()
1859                    == &[(
1860                        ProjectPath {
1861                            worktree_id,
1862                            path: Arc::from(Path::new("a.rs")),
1863                        },
1864                        DiagnosticSummary {
1865                            error_count: 1,
1866                            warning_count: 1,
1867                            ..Default::default()
1868                        },
1869                    )]
1870            })
1871            .await;
1872
1873        // Open the file with the errors.
1874        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1875        let buffer_b = cx_b
1876            .background()
1877            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1878            .await
1879            .unwrap();
1880
1881        buffer_b.read_with(&cx_b, |buffer, _| {
1882            assert_eq!(
1883                buffer
1884                    .snapshot()
1885                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1886                    .map(|entry| entry)
1887                    .collect::<Vec<_>>(),
1888                &[
1889                    DiagnosticEntry {
1890                        range: Point::new(0, 4)..Point::new(0, 7),
1891                        diagnostic: Diagnostic {
1892                            group_id: 0,
1893                            message: "message 1".to_string(),
1894                            severity: lsp::DiagnosticSeverity::ERROR,
1895                            is_primary: true,
1896                            ..Default::default()
1897                        }
1898                    },
1899                    DiagnosticEntry {
1900                        range: Point::new(0, 10)..Point::new(0, 13),
1901                        diagnostic: Diagnostic {
1902                            group_id: 1,
1903                            severity: lsp::DiagnosticSeverity::WARNING,
1904                            message: "message 2".to_string(),
1905                            is_primary: true,
1906                            ..Default::default()
1907                        }
1908                    }
1909                ]
1910            );
1911        });
1912    }
1913
1914    #[gpui::test]
1915    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1916        cx_a.foreground().forbid_parking();
1917
1918        // Connect to a server as 2 clients.
1919        let mut server = TestServer::start().await;
1920        let client_a = server.create_client(&mut cx_a, "user_a").await;
1921        let client_b = server.create_client(&mut cx_b, "user_b").await;
1922
1923        // Create an org that includes these 2 users.
1924        let db = &server.app_state.db;
1925        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1926        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1927            .await
1928            .unwrap();
1929        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1930            .await
1931            .unwrap();
1932
1933        // Create a channel that includes all the users.
1934        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1935        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1936            .await
1937            .unwrap();
1938        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1939            .await
1940            .unwrap();
1941        db.create_channel_message(
1942            channel_id,
1943            client_b.current_user_id(&cx_b),
1944            "hello A, it's B.",
1945            OffsetDateTime::now_utc(),
1946            1,
1947        )
1948        .await
1949        .unwrap();
1950
1951        let channels_a = cx_a
1952            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1953        channels_a
1954            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1955            .await;
1956        channels_a.read_with(&cx_a, |list, _| {
1957            assert_eq!(
1958                list.available_channels().unwrap(),
1959                &[ChannelDetails {
1960                    id: channel_id.to_proto(),
1961                    name: "test-channel".to_string()
1962                }]
1963            )
1964        });
1965        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1966            this.get_channel(channel_id.to_proto(), cx).unwrap()
1967        });
1968        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1969        channel_a
1970            .condition(&cx_a, |channel, _| {
1971                channel_messages(channel)
1972                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1973            })
1974            .await;
1975
1976        let channels_b = cx_b
1977            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1978        channels_b
1979            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1980            .await;
1981        channels_b.read_with(&cx_b, |list, _| {
1982            assert_eq!(
1983                list.available_channels().unwrap(),
1984                &[ChannelDetails {
1985                    id: channel_id.to_proto(),
1986                    name: "test-channel".to_string()
1987                }]
1988            )
1989        });
1990
1991        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1992            this.get_channel(channel_id.to_proto(), cx).unwrap()
1993        });
1994        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1995        channel_b
1996            .condition(&cx_b, |channel, _| {
1997                channel_messages(channel)
1998                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1999            })
2000            .await;
2001
2002        channel_a
2003            .update(&mut cx_a, |channel, cx| {
2004                channel
2005                    .send_message("oh, hi B.".to_string(), cx)
2006                    .unwrap()
2007                    .detach();
2008                let task = channel.send_message("sup".to_string(), cx).unwrap();
2009                assert_eq!(
2010                    channel_messages(channel),
2011                    &[
2012                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2013                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2014                        ("user_a".to_string(), "sup".to_string(), true)
2015                    ]
2016                );
2017                task
2018            })
2019            .await
2020            .unwrap();
2021
2022        channel_b
2023            .condition(&cx_b, |channel, _| {
2024                channel_messages(channel)
2025                    == [
2026                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2027                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2028                        ("user_a".to_string(), "sup".to_string(), false),
2029                    ]
2030            })
2031            .await;
2032
2033        assert_eq!(
2034            server
2035                .state()
2036                .await
2037                .channel(channel_id)
2038                .unwrap()
2039                .connection_ids
2040                .len(),
2041            2
2042        );
2043        cx_b.update(|_| drop(channel_b));
2044        server
2045            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2046            .await;
2047
2048        cx_a.update(|_| drop(channel_a));
2049        server
2050            .condition(|state| state.channel(channel_id).is_none())
2051            .await;
2052    }
2053
2054    #[gpui::test]
2055    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2056        cx_a.foreground().forbid_parking();
2057
2058        let mut server = TestServer::start().await;
2059        let client_a = server.create_client(&mut cx_a, "user_a").await;
2060
2061        let db = &server.app_state.db;
2062        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2063        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2064        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2065            .await
2066            .unwrap();
2067        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2068            .await
2069            .unwrap();
2070
2071        let channels_a = cx_a
2072            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2073        channels_a
2074            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2075            .await;
2076        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2077            this.get_channel(channel_id.to_proto(), cx).unwrap()
2078        });
2079
2080        // Messages aren't allowed to be too long.
2081        channel_a
2082            .update(&mut cx_a, |channel, cx| {
2083                let long_body = "this is long.\n".repeat(1024);
2084                channel.send_message(long_body, cx).unwrap()
2085            })
2086            .await
2087            .unwrap_err();
2088
2089        // Messages aren't allowed to be blank.
2090        channel_a.update(&mut cx_a, |channel, cx| {
2091            channel.send_message(String::new(), cx).unwrap_err()
2092        });
2093
2094        // Leading and trailing whitespace are trimmed.
2095        channel_a
2096            .update(&mut cx_a, |channel, cx| {
2097                channel
2098                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2099                    .unwrap()
2100            })
2101            .await
2102            .unwrap();
2103        assert_eq!(
2104            db.get_channel_messages(channel_id, 10, None)
2105                .await
2106                .unwrap()
2107                .iter()
2108                .map(|m| &m.body)
2109                .collect::<Vec<_>>(),
2110            &["surrounded by whitespace"]
2111        );
2112    }
2113
2114    #[gpui::test]
2115    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2116        cx_a.foreground().forbid_parking();
2117
2118        // Connect to a server as 2 clients.
2119        let mut server = TestServer::start().await;
2120        let client_a = server.create_client(&mut cx_a, "user_a").await;
2121        let client_b = server.create_client(&mut cx_b, "user_b").await;
2122        let mut status_b = client_b.status();
2123
2124        // Create an org that includes these 2 users.
2125        let db = &server.app_state.db;
2126        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2127        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2128            .await
2129            .unwrap();
2130        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2131            .await
2132            .unwrap();
2133
2134        // Create a channel that includes all the users.
2135        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2136        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2137            .await
2138            .unwrap();
2139        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2140            .await
2141            .unwrap();
2142        db.create_channel_message(
2143            channel_id,
2144            client_b.current_user_id(&cx_b),
2145            "hello A, it's B.",
2146            OffsetDateTime::now_utc(),
2147            2,
2148        )
2149        .await
2150        .unwrap();
2151
2152        let channels_a = cx_a
2153            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2154        channels_a
2155            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2156            .await;
2157
2158        channels_a.read_with(&cx_a, |list, _| {
2159            assert_eq!(
2160                list.available_channels().unwrap(),
2161                &[ChannelDetails {
2162                    id: channel_id.to_proto(),
2163                    name: "test-channel".to_string()
2164                }]
2165            )
2166        });
2167        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2168            this.get_channel(channel_id.to_proto(), cx).unwrap()
2169        });
2170        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2171        channel_a
2172            .condition(&cx_a, |channel, _| {
2173                channel_messages(channel)
2174                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2175            })
2176            .await;
2177
2178        let channels_b = cx_b
2179            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2180        channels_b
2181            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2182            .await;
2183        channels_b.read_with(&cx_b, |list, _| {
2184            assert_eq!(
2185                list.available_channels().unwrap(),
2186                &[ChannelDetails {
2187                    id: channel_id.to_proto(),
2188                    name: "test-channel".to_string()
2189                }]
2190            )
2191        });
2192
2193        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2194            this.get_channel(channel_id.to_proto(), cx).unwrap()
2195        });
2196        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2197        channel_b
2198            .condition(&cx_b, |channel, _| {
2199                channel_messages(channel)
2200                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2201            })
2202            .await;
2203
2204        // Disconnect client B, ensuring we can still access its cached channel data.
2205        server.forbid_connections();
2206        server.disconnect_client(client_b.current_user_id(&cx_b));
2207        while !matches!(
2208            status_b.recv().await,
2209            Some(client::Status::ReconnectionError { .. })
2210        ) {}
2211
2212        channels_b.read_with(&cx_b, |channels, _| {
2213            assert_eq!(
2214                channels.available_channels().unwrap(),
2215                [ChannelDetails {
2216                    id: channel_id.to_proto(),
2217                    name: "test-channel".to_string()
2218                }]
2219            )
2220        });
2221        channel_b.read_with(&cx_b, |channel, _| {
2222            assert_eq!(
2223                channel_messages(channel),
2224                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2225            )
2226        });
2227
2228        // Send a message from client B while it is disconnected.
2229        channel_b
2230            .update(&mut cx_b, |channel, cx| {
2231                let task = channel
2232                    .send_message("can you see this?".to_string(), cx)
2233                    .unwrap();
2234                assert_eq!(
2235                    channel_messages(channel),
2236                    &[
2237                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2238                        ("user_b".to_string(), "can you see this?".to_string(), true)
2239                    ]
2240                );
2241                task
2242            })
2243            .await
2244            .unwrap_err();
2245
2246        // Send a message from client A while B is disconnected.
2247        channel_a
2248            .update(&mut cx_a, |channel, cx| {
2249                channel
2250                    .send_message("oh, hi B.".to_string(), cx)
2251                    .unwrap()
2252                    .detach();
2253                let task = channel.send_message("sup".to_string(), cx).unwrap();
2254                assert_eq!(
2255                    channel_messages(channel),
2256                    &[
2257                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2258                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2259                        ("user_a".to_string(), "sup".to_string(), true)
2260                    ]
2261                );
2262                task
2263            })
2264            .await
2265            .unwrap();
2266
2267        // Give client B a chance to reconnect.
2268        server.allow_connections();
2269        cx_b.foreground().advance_clock(Duration::from_secs(10));
2270
2271        // Verify that B sees the new messages upon reconnection, as well as the message client B
2272        // sent while offline.
2273        channel_b
2274            .condition(&cx_b, |channel, _| {
2275                channel_messages(channel)
2276                    == [
2277                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2278                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2279                        ("user_a".to_string(), "sup".to_string(), false),
2280                        ("user_b".to_string(), "can you see this?".to_string(), false),
2281                    ]
2282            })
2283            .await;
2284
2285        // Ensure client A and B can communicate normally after reconnection.
2286        channel_a
2287            .update(&mut cx_a, |channel, cx| {
2288                channel.send_message("you online?".to_string(), cx).unwrap()
2289            })
2290            .await
2291            .unwrap();
2292        channel_b
2293            .condition(&cx_b, |channel, _| {
2294                channel_messages(channel)
2295                    == [
2296                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2297                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2298                        ("user_a".to_string(), "sup".to_string(), false),
2299                        ("user_b".to_string(), "can you see this?".to_string(), false),
2300                        ("user_a".to_string(), "you online?".to_string(), false),
2301                    ]
2302            })
2303            .await;
2304
2305        channel_b
2306            .update(&mut cx_b, |channel, cx| {
2307                channel.send_message("yep".to_string(), cx).unwrap()
2308            })
2309            .await
2310            .unwrap();
2311        channel_a
2312            .condition(&cx_a, |channel, _| {
2313                channel_messages(channel)
2314                    == [
2315                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2316                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2317                        ("user_a".to_string(), "sup".to_string(), false),
2318                        ("user_b".to_string(), "can you see this?".to_string(), false),
2319                        ("user_a".to_string(), "you online?".to_string(), false),
2320                        ("user_b".to_string(), "yep".to_string(), false),
2321                    ]
2322            })
2323            .await;
2324    }
2325
2326    #[gpui::test]
2327    async fn test_contacts(
2328        mut cx_a: TestAppContext,
2329        mut cx_b: TestAppContext,
2330        mut cx_c: TestAppContext,
2331    ) {
2332        cx_a.foreground().forbid_parking();
2333        let lang_registry = Arc::new(LanguageRegistry::new());
2334        let fs = Arc::new(FakeFs::new());
2335
2336        // Connect to a server as 3 clients.
2337        let mut server = TestServer::start().await;
2338        let client_a = server.create_client(&mut cx_a, "user_a").await;
2339        let client_b = server.create_client(&mut cx_b, "user_b").await;
2340        let client_c = server.create_client(&mut cx_c, "user_c").await;
2341
2342        // Share a worktree as client A.
2343        fs.insert_tree(
2344            "/a",
2345            json!({
2346                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2347            }),
2348        )
2349        .await;
2350
2351        let project_a = cx_a.update(|cx| {
2352            Project::local(
2353                client_a.clone(),
2354                client_a.user_store.clone(),
2355                lang_registry.clone(),
2356                fs.clone(),
2357                cx,
2358            )
2359        });
2360        let worktree_a = project_a
2361            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2362            .await
2363            .unwrap();
2364        worktree_a
2365            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2366            .await;
2367
2368        client_a
2369            .user_store
2370            .condition(&cx_a, |user_store, _| {
2371                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2372            })
2373            .await;
2374        client_b
2375            .user_store
2376            .condition(&cx_b, |user_store, _| {
2377                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2378            })
2379            .await;
2380        client_c
2381            .user_store
2382            .condition(&cx_c, |user_store, _| {
2383                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2384            })
2385            .await;
2386
2387        let project_id = project_a
2388            .update(&mut cx_a, |project, _| project.next_remote_id())
2389            .await;
2390        project_a
2391            .update(&mut cx_a, |project, cx| project.share(cx))
2392            .await
2393            .unwrap();
2394
2395        let _project_b = Project::remote(
2396            project_id,
2397            client_b.clone(),
2398            client_b.user_store.clone(),
2399            lang_registry.clone(),
2400            fs.clone(),
2401            &mut cx_b.to_async(),
2402        )
2403        .await
2404        .unwrap();
2405
2406        client_a
2407            .user_store
2408            .condition(&cx_a, |user_store, _| {
2409                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2410            })
2411            .await;
2412        client_b
2413            .user_store
2414            .condition(&cx_b, |user_store, _| {
2415                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2416            })
2417            .await;
2418        client_c
2419            .user_store
2420            .condition(&cx_c, |user_store, _| {
2421                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2422            })
2423            .await;
2424
2425        project_a
2426            .condition(&cx_a, |project, _| {
2427                project.collaborators().contains_key(&client_b.peer_id)
2428            })
2429            .await;
2430
2431        cx_a.update(move |_| drop(project_a));
2432        client_a
2433            .user_store
2434            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2435            .await;
2436        client_b
2437            .user_store
2438            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2439            .await;
2440        client_c
2441            .user_store
2442            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2443            .await;
2444
2445        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2446            user_store
2447                .contacts()
2448                .iter()
2449                .map(|contact| {
2450                    let worktrees = contact
2451                        .projects
2452                        .iter()
2453                        .map(|p| {
2454                            (
2455                                p.worktree_root_names[0].as_str(),
2456                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2457                            )
2458                        })
2459                        .collect();
2460                    (contact.user.github_login.as_str(), worktrees)
2461                })
2462                .collect()
2463        }
2464    }
2465
2466    struct TestServer {
2467        peer: Arc<Peer>,
2468        app_state: Arc<AppState>,
2469        server: Arc<Server>,
2470        notifications: mpsc::Receiver<()>,
2471        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2472        forbid_connections: Arc<AtomicBool>,
2473        _test_db: TestDb,
2474    }
2475
2476    impl TestServer {
2477        async fn start() -> Self {
2478            let test_db = TestDb::new();
2479            let app_state = Self::build_app_state(&test_db).await;
2480            let peer = Peer::new();
2481            let notifications = mpsc::channel(128);
2482            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2483            Self {
2484                peer,
2485                app_state,
2486                server,
2487                notifications: notifications.1,
2488                connection_killers: Default::default(),
2489                forbid_connections: Default::default(),
2490                _test_db: test_db,
2491            }
2492        }
2493
2494        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2495            let http = FakeHttpClient::with_404_response();
2496            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2497            let client_name = name.to_string();
2498            let mut client = Client::new(http.clone());
2499            let server = self.server.clone();
2500            let connection_killers = self.connection_killers.clone();
2501            let forbid_connections = self.forbid_connections.clone();
2502            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2503
2504            Arc::get_mut(&mut client)
2505                .unwrap()
2506                .override_authenticate(move |cx| {
2507                    cx.spawn(|_| async move {
2508                        let access_token = "the-token".to_string();
2509                        Ok(Credentials {
2510                            user_id: user_id.0 as u64,
2511                            access_token,
2512                        })
2513                    })
2514                })
2515                .override_establish_connection(move |credentials, cx| {
2516                    assert_eq!(credentials.user_id, user_id.0 as u64);
2517                    assert_eq!(credentials.access_token, "the-token");
2518
2519                    let server = server.clone();
2520                    let connection_killers = connection_killers.clone();
2521                    let forbid_connections = forbid_connections.clone();
2522                    let client_name = client_name.clone();
2523                    let connection_id_tx = connection_id_tx.clone();
2524                    cx.spawn(move |cx| async move {
2525                        if forbid_connections.load(SeqCst) {
2526                            Err(EstablishConnectionError::other(anyhow!(
2527                                "server is forbidding connections"
2528                            )))
2529                        } else {
2530                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2531                            connection_killers.lock().insert(user_id, kill_conn);
2532                            cx.background()
2533                                .spawn(server.handle_connection(
2534                                    server_conn,
2535                                    client_name,
2536                                    user_id,
2537                                    Some(connection_id_tx),
2538                                ))
2539                                .detach();
2540                            Ok(client_conn)
2541                        }
2542                    })
2543                });
2544
2545            client
2546                .authenticate_and_connect(&cx.to_async())
2547                .await
2548                .unwrap();
2549
2550            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2551            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2552            let mut authed_user =
2553                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2554            while authed_user.recv().await.unwrap().is_none() {}
2555
2556            TestClient {
2557                client,
2558                peer_id,
2559                user_store,
2560            }
2561        }
2562
2563        fn disconnect_client(&self, user_id: UserId) {
2564            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2565                let _ = kill_conn.try_send(Some(()));
2566            }
2567        }
2568
2569        fn forbid_connections(&self) {
2570            self.forbid_connections.store(true, SeqCst);
2571        }
2572
2573        fn allow_connections(&self) {
2574            self.forbid_connections.store(false, SeqCst);
2575        }
2576
2577        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2578            let mut config = Config::default();
2579            config.session_secret = "a".repeat(32);
2580            config.database_url = test_db.url.clone();
2581            let github_client = github::AppClient::test();
2582            Arc::new(AppState {
2583                db: test_db.db().clone(),
2584                handlebars: Default::default(),
2585                auth_client: auth::build_client("", ""),
2586                repo_client: github::RepoClient::test(&github_client),
2587                github_client,
2588                config,
2589            })
2590        }
2591
2592        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2593            self.server.store.read()
2594        }
2595
2596        async fn condition<F>(&mut self, mut predicate: F)
2597        where
2598            F: FnMut(&Store) -> bool,
2599        {
2600            async_std::future::timeout(Duration::from_millis(500), async {
2601                while !(predicate)(&*self.server.store.read()) {
2602                    self.notifications.recv().await;
2603                }
2604            })
2605            .await
2606            .expect("condition timed out");
2607        }
2608    }
2609
2610    impl Drop for TestServer {
2611        fn drop(&mut self) {
2612            task::block_on(self.peer.reset());
2613        }
2614    }
2615
2616    struct TestClient {
2617        client: Arc<Client>,
2618        pub peer_id: PeerId,
2619        pub user_store: ModelHandle<UserStore>,
2620    }
2621
2622    impl Deref for TestClient {
2623        type Target = Arc<Client>;
2624
2625        fn deref(&self) -> &Self::Target {
2626            &self.client
2627        }
2628    }
2629
2630    impl TestClient {
2631        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2632            UserId::from_proto(
2633                self.user_store
2634                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2635            )
2636        }
2637    }
2638
2639    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2640        channel
2641            .messages()
2642            .cursor::<()>()
2643            .map(|m| {
2644                (
2645                    m.sender.github_login.clone(),
2646                    m.body.clone(),
2647                    m.is_pending(),
2648                )
2649            })
2650            .collect()
2651    }
2652
2653    struct EmptyView;
2654
2655    impl gpui::Entity for EmptyView {
2656        type Event = ();
2657    }
2658
2659    impl gpui::View for EmptyView {
2660        fn ui_name() -> &'static str {
2661            "empty view"
2662        }
2663
2664        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2665            gpui::Element::boxed(gpui::elements::Empty)
2666        }
2667    }
2668}