rpc.rs

   1mod store;
   2
   3use super::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::{sync::RwLock, task};
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use futures::{future::BoxFuture, FutureExt};
  12use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  13use sha1::{Digest as _, Sha1};
  14use std::{
  15    any::TypeId,
  16    collections::{HashMap, HashSet},
  17    future::Future,
  18    mem,
  19    sync::Arc,
  20    time::Instant,
  21};
  22use store::{JoinedWorktree, Store, Worktree};
  23use surf::StatusCode;
  24use tide::log;
  25use tide::{
  26    http::headers::{HeaderName, CONNECTION, UPGRADE},
  27    Request, Response,
  28};
  29use time::OffsetDateTime;
  30use zrpc::{
  31    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  32    Connection, ConnectionId, Peer, TypedEnvelope,
  33};
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::Sender<()>>,
  47}
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::Sender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_handler(Server::ping)
  68            .add_handler(Server::open_worktree)
  69            .add_handler(Server::close_worktree)
  70            .add_handler(Server::share_worktree)
  71            .add_handler(Server::unshare_worktree)
  72            .add_handler(Server::join_worktree)
  73            .add_handler(Server::leave_worktree)
  74            .add_handler(Server::update_worktree)
  75            .add_handler(Server::open_buffer)
  76            .add_handler(Server::close_buffer)
  77            .add_handler(Server::update_buffer)
  78            .add_handler(Server::buffer_saved)
  79            .add_handler(Server::save_buffer)
  80            .add_handler(Server::get_channels)
  81            .add_handler(Server::get_users)
  82            .add_handler(Server::join_channel)
  83            .add_handler(Server::leave_channel)
  84            .add_handler(Server::send_channel_message)
  85            .add_handler(Server::get_channel_messages);
  86
  87        Arc::new(server)
  88    }
  89
  90    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  91    where
  92        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  93        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  94        M: EnvelopedMessage,
  95    {
  96        let prev_handler = self.handlers.insert(
  97            TypeId::of::<M>(),
  98            Box::new(move |server, envelope| {
  99                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 100                (handler)(server, *envelope).boxed()
 101            }),
 102        );
 103        if prev_handler.is_some() {
 104            panic!("registered a handler for the same message twice");
 105        }
 106        self
 107    }
 108
 109    pub fn handle_connection(
 110        self: &Arc<Self>,
 111        connection: Connection,
 112        addr: String,
 113        user_id: UserId,
 114    ) -> impl Future<Output = ()> {
 115        let this = self.clone();
 116        async move {
 117            let (connection_id, handle_io, mut incoming_rx) =
 118                this.peer.add_connection(connection).await;
 119            this.store
 120                .write()
 121                .await
 122                .add_connection(connection_id, user_id);
 123            if let Err(err) = this.update_collaborators_for_users(&[user_id]).await {
 124                log::error!("error updating collaborators for {:?}: {}", user_id, err);
 125            }
 126
 127            let handle_io = handle_io.fuse();
 128            futures::pin_mut!(handle_io);
 129            loop {
 130                let next_message = incoming_rx.recv().fuse();
 131                futures::pin_mut!(next_message);
 132                futures::select_biased! {
 133                    message = next_message => {
 134                        if let Some(message) = message {
 135                            let start_time = Instant::now();
 136                            log::info!("RPC message received: {}", message.payload_type_name());
 137                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 138                                if let Err(err) = (handler)(this.clone(), message).await {
 139                                    log::error!("error handling message: {:?}", err);
 140                                } else {
 141                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 142                                }
 143
 144                                if let Some(mut notifications) = this.notifications.clone() {
 145                                    let _ = notifications.send(()).await;
 146                                }
 147                            } else {
 148                                log::warn!("unhandled message: {}", message.payload_type_name());
 149                            }
 150                        } else {
 151                            log::info!("rpc connection closed {:?}", addr);
 152                            break;
 153                        }
 154                    }
 155                    handle_io = handle_io => {
 156                        if let Err(err) = handle_io {
 157                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 158                        }
 159                        break;
 160                    }
 161                }
 162            }
 163
 164            if let Err(err) = this.sign_out(connection_id).await {
 165                log::error!("error signing out connection {:?} - {:?}", addr, err);
 166            }
 167        }
 168    }
 169
 170    async fn sign_out(self: &Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 171        self.peer.disconnect(connection_id).await;
 172        let removed_connection = self.store.write().await.remove_connection(connection_id)?;
 173
 174        for (worktree_id, worktree) in removed_connection.hosted_worktrees {
 175            if let Some(share) = worktree.share {
 176                broadcast(
 177                    connection_id,
 178                    share.guest_connection_ids.keys().copied().collect(),
 179                    |conn_id| {
 180                        self.peer
 181                            .send(conn_id, proto::UnshareWorktree { worktree_id })
 182                    },
 183                )
 184                .await?;
 185            }
 186        }
 187
 188        for (worktree_id, peer_ids) in removed_connection.guest_worktree_ids {
 189            broadcast(connection_id, peer_ids, |conn_id| {
 190                self.peer.send(
 191                    conn_id,
 192                    proto::RemovePeer {
 193                        worktree_id,
 194                        peer_id: connection_id.0,
 195                    },
 196                )
 197            })
 198            .await?;
 199        }
 200
 201        self.update_collaborators_for_users(removed_connection.collaborator_ids.iter())
 202            .await?;
 203
 204        Ok(())
 205    }
 206
 207    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 208        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 209        Ok(())
 210    }
 211
 212    async fn open_worktree(
 213        self: Arc<Server>,
 214        request: TypedEnvelope<proto::OpenWorktree>,
 215    ) -> tide::Result<()> {
 216        let receipt = request.receipt();
 217        let host_user_id = self
 218            .store
 219            .read()
 220            .await
 221            .user_id_for_connection(request.sender_id)?;
 222
 223        let mut collaborator_user_ids = HashSet::new();
 224        collaborator_user_ids.insert(host_user_id);
 225        for github_login in request.payload.collaborator_logins {
 226            match self.app_state.db.create_user(&github_login, false).await {
 227                Ok(collaborator_user_id) => {
 228                    collaborator_user_ids.insert(collaborator_user_id);
 229                }
 230                Err(err) => {
 231                    let message = err.to_string();
 232                    self.peer
 233                        .respond_with_error(receipt, proto::Error { message })
 234                        .await?;
 235                    return Ok(());
 236                }
 237            }
 238        }
 239
 240        let collaborator_user_ids = collaborator_user_ids.into_iter().collect::<Vec<_>>();
 241        let worktree_id = self.store.write().await.add_worktree(Worktree {
 242            host_connection_id: request.sender_id,
 243            collaborator_user_ids: collaborator_user_ids.clone(),
 244            root_name: request.payload.root_name,
 245            share: None,
 246        });
 247
 248        self.peer
 249            .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
 250            .await?;
 251        self.update_collaborators_for_users(&collaborator_user_ids)
 252            .await?;
 253
 254        Ok(())
 255    }
 256
 257    async fn close_worktree(
 258        self: Arc<Server>,
 259        request: TypedEnvelope<proto::CloseWorktree>,
 260    ) -> tide::Result<()> {
 261        let worktree_id = request.payload.worktree_id;
 262        let worktree = self
 263            .store
 264            .write()
 265            .await
 266            .remove_worktree(worktree_id, request.sender_id)?;
 267
 268        if let Some(share) = worktree.share {
 269            broadcast(
 270                request.sender_id,
 271                share.guest_connection_ids.keys().copied().collect(),
 272                |conn_id| {
 273                    self.peer
 274                        .send(conn_id, proto::UnshareWorktree { worktree_id })
 275                },
 276            )
 277            .await?;
 278        }
 279        self.update_collaborators_for_users(&worktree.collaborator_user_ids)
 280            .await?;
 281        Ok(())
 282    }
 283
 284    async fn share_worktree(
 285        self: Arc<Server>,
 286        mut request: TypedEnvelope<proto::ShareWorktree>,
 287    ) -> tide::Result<()> {
 288        let worktree = request
 289            .payload
 290            .worktree
 291            .as_mut()
 292            .ok_or_else(|| anyhow!("missing worktree"))?;
 293        let entries = mem::take(&mut worktree.entries)
 294            .into_iter()
 295            .map(|entry| (entry.id, entry))
 296            .collect();
 297
 298        let collaborator_user_ids =
 299            self.store
 300                .write()
 301                .await
 302                .share_worktree(worktree.id, request.sender_id, entries);
 303        if let Some(collaborator_user_ids) = collaborator_user_ids {
 304            self.peer
 305                .respond(request.receipt(), proto::ShareWorktreeResponse {})
 306                .await?;
 307            self.update_collaborators_for_users(&collaborator_user_ids)
 308                .await?;
 309        } else {
 310            self.peer
 311                .respond_with_error(
 312                    request.receipt(),
 313                    proto::Error {
 314                        message: "no such worktree".to_string(),
 315                    },
 316                )
 317                .await?;
 318        }
 319        Ok(())
 320    }
 321
 322    async fn unshare_worktree(
 323        self: Arc<Server>,
 324        request: TypedEnvelope<proto::UnshareWorktree>,
 325    ) -> tide::Result<()> {
 326        let worktree_id = request.payload.worktree_id;
 327        let worktree = self
 328            .store
 329            .write()
 330            .await
 331            .unshare_worktree(worktree_id, request.sender_id)?;
 332
 333        broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
 334            self.peer
 335                .send(conn_id, proto::UnshareWorktree { worktree_id })
 336        })
 337        .await?;
 338        self.update_collaborators_for_users(&worktree.collaborator_ids)
 339            .await?;
 340
 341        Ok(())
 342    }
 343
 344    async fn join_worktree(
 345        self: Arc<Server>,
 346        request: TypedEnvelope<proto::JoinWorktree>,
 347    ) -> tide::Result<()> {
 348        let worktree_id = request.payload.worktree_id;
 349        let user_id = self
 350            .store
 351            .read()
 352            .await
 353            .user_id_for_connection(request.sender_id)?;
 354
 355        let response;
 356        let connection_ids;
 357        let collaborator_user_ids;
 358        let mut state = self.store.write().await;
 359        match state.join_worktree(request.sender_id, user_id, worktree_id) {
 360            Ok(JoinedWorktree {
 361                replica_id,
 362                worktree,
 363            }) => {
 364                let share = worktree.share()?;
 365                let peer_count = share.guest_connection_ids.len();
 366                let mut peers = Vec::with_capacity(peer_count);
 367                peers.push(proto::Peer {
 368                    peer_id: worktree.host_connection_id.0,
 369                    replica_id: 0,
 370                });
 371                for (peer_conn_id, peer_replica_id) in &share.guest_connection_ids {
 372                    if *peer_conn_id != request.sender_id {
 373                        peers.push(proto::Peer {
 374                            peer_id: peer_conn_id.0,
 375                            replica_id: *peer_replica_id as u32,
 376                        });
 377                    }
 378                }
 379                response = proto::JoinWorktreeResponse {
 380                    worktree: Some(proto::Worktree {
 381                        id: worktree_id,
 382                        root_name: worktree.root_name.clone(),
 383                        entries: share.entries.values().cloned().collect(),
 384                    }),
 385                    replica_id: replica_id as u32,
 386                    peers,
 387                };
 388                connection_ids = worktree.connection_ids();
 389                collaborator_user_ids = worktree.collaborator_user_ids.clone();
 390            }
 391            Err(error) => {
 392                self.peer
 393                    .respond_with_error(
 394                        request.receipt(),
 395                        proto::Error {
 396                            message: error.to_string(),
 397                        },
 398                    )
 399                    .await?;
 400                return Ok(());
 401            }
 402        }
 403
 404        drop(state);
 405        broadcast(request.sender_id, connection_ids, |conn_id| {
 406            self.peer.send(
 407                conn_id,
 408                proto::AddPeer {
 409                    worktree_id,
 410                    peer: Some(proto::Peer {
 411                        peer_id: request.sender_id.0,
 412                        replica_id: response.replica_id,
 413                    }),
 414                },
 415            )
 416        })
 417        .await?;
 418        self.peer.respond(request.receipt(), response).await?;
 419        self.update_collaborators_for_users(&collaborator_user_ids)
 420            .await?;
 421
 422        Ok(())
 423    }
 424
 425    async fn leave_worktree(
 426        self: Arc<Server>,
 427        request: TypedEnvelope<proto::LeaveWorktree>,
 428    ) -> tide::Result<()> {
 429        let sender_id = request.sender_id;
 430        let worktree_id = request.payload.worktree_id;
 431
 432        if let Some(worktree) = self
 433            .store
 434            .write()
 435            .await
 436            .leave_worktree(sender_id, worktree_id)
 437        {
 438            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 439                self.peer.send(
 440                    conn_id,
 441                    proto::RemovePeer {
 442                        worktree_id,
 443                        peer_id: sender_id.0,
 444                    },
 445                )
 446            })
 447            .await?;
 448            self.update_collaborators_for_users(&worktree.collaborator_ids)
 449                .await?;
 450        }
 451        Ok(())
 452    }
 453
 454    async fn update_worktree(
 455        self: Arc<Server>,
 456        request: TypedEnvelope<proto::UpdateWorktree>,
 457    ) -> tide::Result<()> {
 458        let connection_ids = self.store.write().await.update_worktree(
 459            request.sender_id,
 460            request.payload.worktree_id,
 461            &request.payload.removed_entries,
 462            &request.payload.updated_entries,
 463        )?;
 464
 465        broadcast(request.sender_id, connection_ids, |connection_id| {
 466            self.peer
 467                .forward_send(request.sender_id, connection_id, request.payload.clone())
 468        })
 469        .await?;
 470
 471        Ok(())
 472    }
 473
 474    async fn open_buffer(
 475        self: Arc<Server>,
 476        request: TypedEnvelope<proto::OpenBuffer>,
 477    ) -> tide::Result<()> {
 478        let receipt = request.receipt();
 479        let host_connection_id = self
 480            .store
 481            .read()
 482            .await
 483            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 484        let response = self
 485            .peer
 486            .forward_request(request.sender_id, host_connection_id, request.payload)
 487            .await?;
 488        self.peer.respond(receipt, response).await?;
 489        Ok(())
 490    }
 491
 492    async fn close_buffer(
 493        self: Arc<Server>,
 494        request: TypedEnvelope<proto::CloseBuffer>,
 495    ) -> tide::Result<()> {
 496        let host_connection_id = self
 497            .store
 498            .read()
 499            .await
 500            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 501        self.peer
 502            .forward_send(request.sender_id, host_connection_id, request.payload)
 503            .await?;
 504        Ok(())
 505    }
 506
 507    async fn save_buffer(
 508        self: Arc<Server>,
 509        request: TypedEnvelope<proto::SaveBuffer>,
 510    ) -> tide::Result<()> {
 511        let host;
 512        let guests;
 513        {
 514            let state = self.store.read().await;
 515            host = state
 516                .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 517            guests = state
 518                .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
 519        }
 520
 521        let sender = request.sender_id;
 522        let receipt = request.receipt();
 523        let response = self
 524            .peer
 525            .forward_request(sender, host, request.payload.clone())
 526            .await?;
 527
 528        broadcast(host, guests, |conn_id| {
 529            let response = response.clone();
 530            let peer = &self.peer;
 531            async move {
 532                if conn_id == sender {
 533                    peer.respond(receipt, response).await
 534                } else {
 535                    peer.forward_send(host, conn_id, response).await
 536                }
 537            }
 538        })
 539        .await?;
 540
 541        Ok(())
 542    }
 543
 544    async fn update_buffer(
 545        self: Arc<Server>,
 546        request: TypedEnvelope<proto::UpdateBuffer>,
 547    ) -> tide::Result<()> {
 548        broadcast(
 549            request.sender_id,
 550            self.store
 551                .read()
 552                .await
 553                .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?,
 554            |connection_id| {
 555                self.peer
 556                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 557            },
 558        )
 559        .await?;
 560        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 561        Ok(())
 562    }
 563
 564    async fn buffer_saved(
 565        self: Arc<Server>,
 566        request: TypedEnvelope<proto::BufferSaved>,
 567    ) -> tide::Result<()> {
 568        broadcast(
 569            request.sender_id,
 570            self.store
 571                .read()
 572                .await
 573                .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?,
 574            |connection_id| {
 575                self.peer
 576                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 577            },
 578        )
 579        .await?;
 580        Ok(())
 581    }
 582
 583    async fn get_channels(
 584        self: Arc<Server>,
 585        request: TypedEnvelope<proto::GetChannels>,
 586    ) -> tide::Result<()> {
 587        let user_id = self
 588            .store
 589            .read()
 590            .await
 591            .user_id_for_connection(request.sender_id)?;
 592        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 593        self.peer
 594            .respond(
 595                request.receipt(),
 596                proto::GetChannelsResponse {
 597                    channels: channels
 598                        .into_iter()
 599                        .map(|chan| proto::Channel {
 600                            id: chan.id.to_proto(),
 601                            name: chan.name,
 602                        })
 603                        .collect(),
 604                },
 605            )
 606            .await?;
 607        Ok(())
 608    }
 609
 610    async fn get_users(
 611        self: Arc<Server>,
 612        request: TypedEnvelope<proto::GetUsers>,
 613    ) -> tide::Result<()> {
 614        let receipt = request.receipt();
 615        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 616        let users = self
 617            .app_state
 618            .db
 619            .get_users_by_ids(user_ids)
 620            .await?
 621            .into_iter()
 622            .map(|user| proto::User {
 623                id: user.id.to_proto(),
 624                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 625                github_login: user.github_login,
 626            })
 627            .collect();
 628        self.peer
 629            .respond(receipt, proto::GetUsersResponse { users })
 630            .await?;
 631        Ok(())
 632    }
 633
 634    async fn update_collaborators_for_users<'a>(
 635        self: &Arc<Server>,
 636        user_ids: impl IntoIterator<Item = &'a UserId>,
 637    ) -> tide::Result<()> {
 638        let mut send_futures = Vec::new();
 639
 640        let state = self.store.read().await;
 641        for user_id in user_ids {
 642            let collaborators = state.collaborators_for_user(*user_id);
 643            for connection_id in state.connection_ids_for_user(*user_id) {
 644                send_futures.push(self.peer.send(
 645                    connection_id,
 646                    proto::UpdateCollaborators {
 647                        collaborators: collaborators.clone(),
 648                    },
 649                ));
 650            }
 651        }
 652
 653        drop(state);
 654        futures::future::try_join_all(send_futures).await?;
 655
 656        Ok(())
 657    }
 658
 659    async fn join_channel(
 660        self: Arc<Self>,
 661        request: TypedEnvelope<proto::JoinChannel>,
 662    ) -> tide::Result<()> {
 663        let user_id = self
 664            .store
 665            .read()
 666            .await
 667            .user_id_for_connection(request.sender_id)?;
 668        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 669        if !self
 670            .app_state
 671            .db
 672            .can_user_access_channel(user_id, channel_id)
 673            .await?
 674        {
 675            Err(anyhow!("access denied"))?;
 676        }
 677
 678        self.store
 679            .write()
 680            .await
 681            .join_channel(request.sender_id, channel_id);
 682        let messages = self
 683            .app_state
 684            .db
 685            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 686            .await?
 687            .into_iter()
 688            .map(|msg| proto::ChannelMessage {
 689                id: msg.id.to_proto(),
 690                body: msg.body,
 691                timestamp: msg.sent_at.unix_timestamp() as u64,
 692                sender_id: msg.sender_id.to_proto(),
 693                nonce: Some(msg.nonce.as_u128().into()),
 694            })
 695            .collect::<Vec<_>>();
 696        self.peer
 697            .respond(
 698                request.receipt(),
 699                proto::JoinChannelResponse {
 700                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 701                    messages,
 702                },
 703            )
 704            .await?;
 705        Ok(())
 706    }
 707
 708    async fn leave_channel(
 709        self: Arc<Self>,
 710        request: TypedEnvelope<proto::LeaveChannel>,
 711    ) -> tide::Result<()> {
 712        let user_id = self
 713            .store
 714            .read()
 715            .await
 716            .user_id_for_connection(request.sender_id)?;
 717        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 718        if !self
 719            .app_state
 720            .db
 721            .can_user_access_channel(user_id, channel_id)
 722            .await?
 723        {
 724            Err(anyhow!("access denied"))?;
 725        }
 726
 727        self.store
 728            .write()
 729            .await
 730            .leave_channel(request.sender_id, channel_id);
 731
 732        Ok(())
 733    }
 734
 735    async fn send_channel_message(
 736        self: Arc<Self>,
 737        request: TypedEnvelope<proto::SendChannelMessage>,
 738    ) -> tide::Result<()> {
 739        let receipt = request.receipt();
 740        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 741        let user_id;
 742        let connection_ids;
 743        {
 744            let state = self.store.read().await;
 745            user_id = state.user_id_for_connection(request.sender_id)?;
 746            if let Some(ids) = state.channel_connection_ids(channel_id) {
 747                connection_ids = ids;
 748            } else {
 749                return Ok(());
 750            }
 751        }
 752
 753        // Validate the message body.
 754        let body = request.payload.body.trim().to_string();
 755        if body.len() > MAX_MESSAGE_LEN {
 756            self.peer
 757                .respond_with_error(
 758                    receipt,
 759                    proto::Error {
 760                        message: "message is too long".to_string(),
 761                    },
 762                )
 763                .await?;
 764            return Ok(());
 765        }
 766        if body.is_empty() {
 767            self.peer
 768                .respond_with_error(
 769                    receipt,
 770                    proto::Error {
 771                        message: "message can't be blank".to_string(),
 772                    },
 773                )
 774                .await?;
 775            return Ok(());
 776        }
 777
 778        let timestamp = OffsetDateTime::now_utc();
 779        let nonce = if let Some(nonce) = request.payload.nonce {
 780            nonce
 781        } else {
 782            self.peer
 783                .respond_with_error(
 784                    receipt,
 785                    proto::Error {
 786                        message: "nonce can't be blank".to_string(),
 787                    },
 788                )
 789                .await?;
 790            return Ok(());
 791        };
 792
 793        let message_id = self
 794            .app_state
 795            .db
 796            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 797            .await?
 798            .to_proto();
 799        let message = proto::ChannelMessage {
 800            sender_id: user_id.to_proto(),
 801            id: message_id,
 802            body,
 803            timestamp: timestamp.unix_timestamp() as u64,
 804            nonce: Some(nonce),
 805        };
 806        broadcast(request.sender_id, connection_ids, |conn_id| {
 807            self.peer.send(
 808                conn_id,
 809                proto::ChannelMessageSent {
 810                    channel_id: channel_id.to_proto(),
 811                    message: Some(message.clone()),
 812                },
 813            )
 814        })
 815        .await?;
 816        self.peer
 817            .respond(
 818                receipt,
 819                proto::SendChannelMessageResponse {
 820                    message: Some(message),
 821                },
 822            )
 823            .await?;
 824        Ok(())
 825    }
 826
 827    async fn get_channel_messages(
 828        self: Arc<Self>,
 829        request: TypedEnvelope<proto::GetChannelMessages>,
 830    ) -> tide::Result<()> {
 831        let user_id = self
 832            .store
 833            .read()
 834            .await
 835            .user_id_for_connection(request.sender_id)?;
 836        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 837        if !self
 838            .app_state
 839            .db
 840            .can_user_access_channel(user_id, channel_id)
 841            .await?
 842        {
 843            Err(anyhow!("access denied"))?;
 844        }
 845
 846        let messages = self
 847            .app_state
 848            .db
 849            .get_channel_messages(
 850                channel_id,
 851                MESSAGE_COUNT_PER_PAGE,
 852                Some(MessageId::from_proto(request.payload.before_message_id)),
 853            )
 854            .await?
 855            .into_iter()
 856            .map(|msg| proto::ChannelMessage {
 857                id: msg.id.to_proto(),
 858                body: msg.body,
 859                timestamp: msg.sent_at.unix_timestamp() as u64,
 860                sender_id: msg.sender_id.to_proto(),
 861                nonce: Some(msg.nonce.as_u128().into()),
 862            })
 863            .collect::<Vec<_>>();
 864        self.peer
 865            .respond(
 866                request.receipt(),
 867                proto::GetChannelMessagesResponse {
 868                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 869                    messages,
 870                },
 871            )
 872            .await?;
 873        Ok(())
 874    }
 875}
 876
 877pub async fn broadcast<F, T>(
 878    sender_id: ConnectionId,
 879    receiver_ids: Vec<ConnectionId>,
 880    mut f: F,
 881) -> anyhow::Result<()>
 882where
 883    F: FnMut(ConnectionId) -> T,
 884    T: Future<Output = anyhow::Result<()>>,
 885{
 886    let futures = receiver_ids
 887        .into_iter()
 888        .filter(|id| *id != sender_id)
 889        .map(|id| f(id));
 890    futures::future::try_join_all(futures).await?;
 891    Ok(())
 892}
 893
 894pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 895    let server = Server::new(app.state().clone(), rpc.clone(), None);
 896    app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
 897        let user_id = request.ext::<UserId>().copied();
 898        let server = server.clone();
 899        async move {
 900            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 901
 902            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 903            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 904            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 905
 906            if !upgrade_requested {
 907                return Ok(Response::new(StatusCode::UpgradeRequired));
 908            }
 909
 910            let header = match request.header("Sec-Websocket-Key") {
 911                Some(h) => h.as_str(),
 912                None => return Err(anyhow!("expected sec-websocket-key"))?,
 913            };
 914
 915            let mut response = Response::new(StatusCode::SwitchingProtocols);
 916            response.insert_header(UPGRADE, "websocket");
 917            response.insert_header(CONNECTION, "Upgrade");
 918            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 919            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 920            response.insert_header("Sec-Websocket-Version", "13");
 921
 922            let http_res: &mut tide::http::Response = response.as_mut();
 923            let upgrade_receiver = http_res.recv_upgrade().await;
 924            let addr = request.remote().unwrap_or("unknown").to_string();
 925            let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?;
 926            task::spawn(async move {
 927                if let Some(stream) = upgrade_receiver.await {
 928                    server.handle_connection(Connection::new(WebSocketStream::from_raw_socket(stream, Role::Server, None).await), addr, user_id).await;
 929                }
 930            });
 931
 932            Ok(response)
 933        }
 934    });
 935}
 936
 937fn header_contains_ignore_case<T>(
 938    request: &tide::Request<T>,
 939    header_name: HeaderName,
 940    value: &str,
 941) -> bool {
 942    request
 943        .header(header_name)
 944        .map(|h| {
 945            h.as_str()
 946                .split(',')
 947                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 948        })
 949        .unwrap_or(false)
 950}
 951
 952#[cfg(test)]
 953mod tests {
 954    use super::*;
 955    use crate::{
 956        auth,
 957        db::{tests::TestDb, UserId},
 958        github, AppState, Config,
 959    };
 960    use async_std::{sync::RwLockReadGuard, task};
 961    use gpui::{ModelHandle, TestAppContext};
 962    use parking_lot::Mutex;
 963    use postage::{mpsc, watch};
 964    use serde_json::json;
 965    use sqlx::types::time::OffsetDateTime;
 966    use std::{
 967        path::Path,
 968        sync::{
 969            atomic::{AtomicBool, Ordering::SeqCst},
 970            Arc,
 971        },
 972        time::Duration,
 973    };
 974    use zed::{
 975        channel::{Channel, ChannelDetails, ChannelList},
 976        editor::{Editor, Insert},
 977        fs::{FakeFs, Fs as _},
 978        language::LanguageRegistry,
 979        rpc::{self, Client, Credentials, EstablishConnectionError},
 980        settings,
 981        test::FakeHttpClient,
 982        user::UserStore,
 983        worktree::Worktree,
 984    };
 985    use zrpc::Peer;
 986
 987    #[gpui::test]
 988    async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
 989        let (window_b, _) = cx_b.add_window(|_| EmptyView);
 990        let settings = cx_b.read(settings::test).1;
 991        let lang_registry = Arc::new(LanguageRegistry::new());
 992
 993        // Connect to a server as 2 clients.
 994        let mut server = TestServer::start().await;
 995        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
 996        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
 997
 998        cx_a.foreground().forbid_parking();
 999
1000        // Share a local worktree as client A
1001        let fs = Arc::new(FakeFs::new());
1002        fs.insert_tree(
1003            "/a",
1004            json!({
1005                ".zed.toml": r#"collaborators = ["user_b"]"#,
1006                "a.txt": "a-contents",
1007                "b.txt": "b-contents",
1008            }),
1009        )
1010        .await;
1011        let worktree_a = Worktree::open_local(
1012            client_a.clone(),
1013            "/a".as_ref(),
1014            fs,
1015            lang_registry.clone(),
1016            &mut cx_a.to_async(),
1017        )
1018        .await
1019        .unwrap();
1020        worktree_a
1021            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1022            .await;
1023        let worktree_id = worktree_a
1024            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1025            .await
1026            .unwrap();
1027
1028        // Join that worktree as client B, and see that a guest has joined as client A.
1029        let worktree_b = Worktree::open_remote(
1030            client_b.clone(),
1031            worktree_id,
1032            lang_registry.clone(),
1033            &mut cx_b.to_async(),
1034        )
1035        .await
1036        .unwrap();
1037        let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
1038        worktree_a
1039            .condition(&cx_a, |tree, _| {
1040                tree.peers()
1041                    .values()
1042                    .any(|replica_id| *replica_id == replica_id_b)
1043            })
1044            .await;
1045
1046        // Open the same file as client B and client A.
1047        let buffer_b = worktree_b
1048            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1049            .await
1050            .unwrap();
1051        buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1052        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1053        let buffer_a = worktree_a
1054            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1055            .await
1056            .unwrap();
1057
1058        // Create a selection set as client B and see that selection set as client A.
1059        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, settings, cx));
1060        buffer_a
1061            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1062            .await;
1063
1064        // Edit the buffer as client B and see that edit as client A.
1065        editor_b.update(&mut cx_b, |editor, cx| {
1066            editor.insert(&Insert("ok, ".into()), cx)
1067        });
1068        buffer_a
1069            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1070            .await;
1071
1072        // Remove the selection set as client B, see those selections disappear as client A.
1073        cx_b.update(move |_| drop(editor_b));
1074        buffer_a
1075            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1076            .await;
1077
1078        // Close the buffer as client A, see that the buffer is closed.
1079        cx_a.update(move |_| drop(buffer_a));
1080        worktree_a
1081            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1082            .await;
1083
1084        // Dropping the worktree removes client B from client A's peers.
1085        cx_b.update(move |_| drop(worktree_b));
1086        worktree_a
1087            .condition(&cx_a, |tree, _| tree.peers().is_empty())
1088            .await;
1089    }
1090
1091    #[gpui::test]
1092    async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1093        mut cx_a: TestAppContext,
1094        mut cx_b: TestAppContext,
1095        mut cx_c: TestAppContext,
1096    ) {
1097        cx_a.foreground().forbid_parking();
1098        let lang_registry = Arc::new(LanguageRegistry::new());
1099
1100        // Connect to a server as 3 clients.
1101        let mut server = TestServer::start().await;
1102        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1103        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1104        let (client_c, _) = server.create_client(&mut cx_c, "user_c").await;
1105
1106        let fs = Arc::new(FakeFs::new());
1107
1108        // Share a worktree as client A.
1109        fs.insert_tree(
1110            "/a",
1111            json!({
1112                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1113                "file1": "",
1114                "file2": ""
1115            }),
1116        )
1117        .await;
1118
1119        let worktree_a = Worktree::open_local(
1120            client_a.clone(),
1121            "/a".as_ref(),
1122            fs.clone(),
1123            lang_registry.clone(),
1124            &mut cx_a.to_async(),
1125        )
1126        .await
1127        .unwrap();
1128        worktree_a
1129            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1130            .await;
1131        let worktree_id = worktree_a
1132            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1133            .await
1134            .unwrap();
1135
1136        // Join that worktree as clients B and C.
1137        let worktree_b = Worktree::open_remote(
1138            client_b.clone(),
1139            worktree_id,
1140            lang_registry.clone(),
1141            &mut cx_b.to_async(),
1142        )
1143        .await
1144        .unwrap();
1145        let worktree_c = Worktree::open_remote(
1146            client_c.clone(),
1147            worktree_id,
1148            lang_registry.clone(),
1149            &mut cx_c.to_async(),
1150        )
1151        .await
1152        .unwrap();
1153
1154        // Open and edit a buffer as both guests B and C.
1155        let buffer_b = worktree_b
1156            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1157            .await
1158            .unwrap();
1159        let buffer_c = worktree_c
1160            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1161            .await
1162            .unwrap();
1163        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1164        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1165
1166        // Open and edit that buffer as the host.
1167        let buffer_a = worktree_a
1168            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1169            .await
1170            .unwrap();
1171
1172        buffer_a
1173            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1174            .await;
1175        buffer_a.update(&mut cx_a, |buf, cx| {
1176            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1177        });
1178
1179        // Wait for edits to propagate
1180        buffer_a
1181            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1182            .await;
1183        buffer_b
1184            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1185            .await;
1186        buffer_c
1187            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1188            .await;
1189
1190        // Edit the buffer as the host and concurrently save as guest B.
1191        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1192        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1193        save_b.await.unwrap();
1194        assert_eq!(
1195            fs.load("/a/file1".as_ref()).await.unwrap(),
1196            "hi-a, i-am-c, i-am-b, i-am-a"
1197        );
1198        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1199        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1200        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1201
1202        // Make changes on host's file system, see those changes on the guests.
1203        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1204            .await
1205            .unwrap();
1206        fs.insert_file(Path::new("/a/file4"), "4".into())
1207            .await
1208            .unwrap();
1209
1210        worktree_b
1211            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1212            .await;
1213        worktree_c
1214            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1215            .await;
1216        worktree_b.read_with(&cx_b, |tree, _| {
1217            assert_eq!(
1218                tree.paths()
1219                    .map(|p| p.to_string_lossy())
1220                    .collect::<Vec<_>>(),
1221                &[".zed.toml", "file1", "file3", "file4"]
1222            )
1223        });
1224        worktree_c.read_with(&cx_c, |tree, _| {
1225            assert_eq!(
1226                tree.paths()
1227                    .map(|p| p.to_string_lossy())
1228                    .collect::<Vec<_>>(),
1229                &[".zed.toml", "file1", "file3", "file4"]
1230            )
1231        });
1232    }
1233
1234    #[gpui::test]
1235    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1236        cx_a.foreground().forbid_parking();
1237        let lang_registry = Arc::new(LanguageRegistry::new());
1238
1239        // Connect to a server as 2 clients.
1240        let mut server = TestServer::start().await;
1241        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1242        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1243
1244        // Share a local worktree as client A
1245        let fs = Arc::new(FakeFs::new());
1246        fs.insert_tree(
1247            "/dir",
1248            json!({
1249                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1250                "a.txt": "a-contents",
1251            }),
1252        )
1253        .await;
1254
1255        let worktree_a = Worktree::open_local(
1256            client_a.clone(),
1257            "/dir".as_ref(),
1258            fs,
1259            lang_registry.clone(),
1260            &mut cx_a.to_async(),
1261        )
1262        .await
1263        .unwrap();
1264        worktree_a
1265            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1266            .await;
1267        let worktree_id = worktree_a
1268            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1269            .await
1270            .unwrap();
1271
1272        // Join that worktree as client B, and see that a guest has joined as client A.
1273        let worktree_b = Worktree::open_remote(
1274            client_b.clone(),
1275            worktree_id,
1276            lang_registry.clone(),
1277            &mut cx_b.to_async(),
1278        )
1279        .await
1280        .unwrap();
1281
1282        let buffer_b = worktree_b
1283            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1284            .await
1285            .unwrap();
1286        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime);
1287
1288        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1289        buffer_b.read_with(&cx_b, |buf, _| {
1290            assert!(buf.is_dirty());
1291            assert!(!buf.has_conflict());
1292        });
1293
1294        buffer_b
1295            .update(&mut cx_b, |buf, cx| buf.save(cx))
1296            .unwrap()
1297            .await
1298            .unwrap();
1299        worktree_b
1300            .condition(&cx_b, |_, cx| {
1301                buffer_b.read(cx).file().unwrap().mtime != mtime
1302            })
1303            .await;
1304        buffer_b.read_with(&cx_b, |buf, _| {
1305            assert!(!buf.is_dirty());
1306            assert!(!buf.has_conflict());
1307        });
1308
1309        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1310        buffer_b.read_with(&cx_b, |buf, _| {
1311            assert!(buf.is_dirty());
1312            assert!(!buf.has_conflict());
1313        });
1314    }
1315
1316    #[gpui::test]
1317    async fn test_editing_while_guest_opens_buffer(
1318        mut cx_a: TestAppContext,
1319        mut cx_b: TestAppContext,
1320    ) {
1321        cx_a.foreground().forbid_parking();
1322        let lang_registry = Arc::new(LanguageRegistry::new());
1323
1324        // Connect to a server as 2 clients.
1325        let mut server = TestServer::start().await;
1326        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1327        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1328
1329        // Share a local worktree as client A
1330        let fs = Arc::new(FakeFs::new());
1331        fs.insert_tree(
1332            "/dir",
1333            json!({
1334                ".zed.toml": r#"collaborators = ["user_b"]"#,
1335                "a.txt": "a-contents",
1336            }),
1337        )
1338        .await;
1339        let worktree_a = Worktree::open_local(
1340            client_a.clone(),
1341            "/dir".as_ref(),
1342            fs,
1343            lang_registry.clone(),
1344            &mut cx_a.to_async(),
1345        )
1346        .await
1347        .unwrap();
1348        worktree_a
1349            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1350            .await;
1351        let worktree_id = worktree_a
1352            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1353            .await
1354            .unwrap();
1355
1356        // Join that worktree as client B, and see that a guest has joined as client A.
1357        let worktree_b = Worktree::open_remote(
1358            client_b.clone(),
1359            worktree_id,
1360            lang_registry.clone(),
1361            &mut cx_b.to_async(),
1362        )
1363        .await
1364        .unwrap();
1365
1366        let buffer_a = worktree_a
1367            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1368            .await
1369            .unwrap();
1370        let buffer_b = cx_b
1371            .background()
1372            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1373
1374        task::yield_now().await;
1375        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1376
1377        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1378        let buffer_b = buffer_b.await.unwrap();
1379        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1380    }
1381
1382    #[gpui::test]
1383    async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) {
1384        cx_a.foreground().forbid_parking();
1385        let lang_registry = Arc::new(LanguageRegistry::new());
1386
1387        // Connect to a server as 2 clients.
1388        let mut server = TestServer::start().await;
1389        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1390        let (client_b, _) = server.create_client(&mut cx_a, "user_b").await;
1391
1392        // Share a local worktree as client A
1393        let fs = Arc::new(FakeFs::new());
1394        fs.insert_tree(
1395            "/a",
1396            json!({
1397                ".zed.toml": r#"collaborators = ["user_b"]"#,
1398                "a.txt": "a-contents",
1399                "b.txt": "b-contents",
1400            }),
1401        )
1402        .await;
1403        let worktree_a = Worktree::open_local(
1404            client_a.clone(),
1405            "/a".as_ref(),
1406            fs,
1407            lang_registry.clone(),
1408            &mut cx_a.to_async(),
1409        )
1410        .await
1411        .unwrap();
1412        worktree_a
1413            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1414            .await;
1415        let worktree_id = worktree_a
1416            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1417            .await
1418            .unwrap();
1419
1420        // Join that worktree as client B, and see that a guest has joined as client A.
1421        let _worktree_b = Worktree::open_remote(
1422            client_b.clone(),
1423            worktree_id,
1424            lang_registry.clone(),
1425            &mut cx_b.to_async(),
1426        )
1427        .await
1428        .unwrap();
1429        worktree_a
1430            .condition(&cx_a, |tree, _| tree.peers().len() == 1)
1431            .await;
1432
1433        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1434        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1435        worktree_a
1436            .condition(&cx_a, |tree, _| tree.peers().len() == 0)
1437            .await;
1438    }
1439
1440    #[gpui::test]
1441    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1442        cx_a.foreground().forbid_parking();
1443
1444        // Connect to a server as 2 clients.
1445        let mut server = TestServer::start().await;
1446        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1447        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1448
1449        // Create an org that includes these 2 users.
1450        let db = &server.app_state.db;
1451        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1452        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1453            .await
1454            .unwrap();
1455        db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
1456            .await
1457            .unwrap();
1458
1459        // Create a channel that includes all the users.
1460        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1461        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1462            .await
1463            .unwrap();
1464        db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
1465            .await
1466            .unwrap();
1467        db.create_channel_message(
1468            channel_id,
1469            current_user_id(&user_store_b, &cx_b),
1470            "hello A, it's B.",
1471            OffsetDateTime::now_utc(),
1472            1,
1473        )
1474        .await
1475        .unwrap();
1476
1477        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1478        channels_a
1479            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1480            .await;
1481        channels_a.read_with(&cx_a, |list, _| {
1482            assert_eq!(
1483                list.available_channels().unwrap(),
1484                &[ChannelDetails {
1485                    id: channel_id.to_proto(),
1486                    name: "test-channel".to_string()
1487                }]
1488            )
1489        });
1490        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1491            this.get_channel(channel_id.to_proto(), cx).unwrap()
1492        });
1493        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1494        channel_a
1495            .condition(&cx_a, |channel, _| {
1496                channel_messages(channel)
1497                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1498            })
1499            .await;
1500
1501        let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b, client_b, cx));
1502        channels_b
1503            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1504            .await;
1505        channels_b.read_with(&cx_b, |list, _| {
1506            assert_eq!(
1507                list.available_channels().unwrap(),
1508                &[ChannelDetails {
1509                    id: channel_id.to_proto(),
1510                    name: "test-channel".to_string()
1511                }]
1512            )
1513        });
1514
1515        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1516            this.get_channel(channel_id.to_proto(), cx).unwrap()
1517        });
1518        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1519        channel_b
1520            .condition(&cx_b, |channel, _| {
1521                channel_messages(channel)
1522                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1523            })
1524            .await;
1525
1526        channel_a
1527            .update(&mut cx_a, |channel, cx| {
1528                channel
1529                    .send_message("oh, hi B.".to_string(), cx)
1530                    .unwrap()
1531                    .detach();
1532                let task = channel.send_message("sup".to_string(), cx).unwrap();
1533                assert_eq!(
1534                    channel_messages(channel),
1535                    &[
1536                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1537                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1538                        ("user_a".to_string(), "sup".to_string(), true)
1539                    ]
1540                );
1541                task
1542            })
1543            .await
1544            .unwrap();
1545
1546        channel_b
1547            .condition(&cx_b, |channel, _| {
1548                channel_messages(channel)
1549                    == [
1550                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1551                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1552                        ("user_a".to_string(), "sup".to_string(), false),
1553                    ]
1554            })
1555            .await;
1556
1557        assert_eq!(
1558            server
1559                .state()
1560                .await
1561                .channel(channel_id)
1562                .unwrap()
1563                .connection_ids
1564                .len(),
1565            2
1566        );
1567        cx_b.update(|_| drop(channel_b));
1568        server
1569            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1570            .await;
1571
1572        cx_a.update(|_| drop(channel_a));
1573        server
1574            .condition(|state| state.channel(channel_id).is_none())
1575            .await;
1576    }
1577
1578    #[gpui::test]
1579    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1580        cx_a.foreground().forbid_parking();
1581
1582        let mut server = TestServer::start().await;
1583        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1584
1585        let db = &server.app_state.db;
1586        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1587        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1588        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1589            .await
1590            .unwrap();
1591        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1592            .await
1593            .unwrap();
1594
1595        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1596        channels_a
1597            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1598            .await;
1599        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1600            this.get_channel(channel_id.to_proto(), cx).unwrap()
1601        });
1602
1603        // Messages aren't allowed to be too long.
1604        channel_a
1605            .update(&mut cx_a, |channel, cx| {
1606                let long_body = "this is long.\n".repeat(1024);
1607                channel.send_message(long_body, cx).unwrap()
1608            })
1609            .await
1610            .unwrap_err();
1611
1612        // Messages aren't allowed to be blank.
1613        channel_a.update(&mut cx_a, |channel, cx| {
1614            channel.send_message(String::new(), cx).unwrap_err()
1615        });
1616
1617        // Leading and trailing whitespace are trimmed.
1618        channel_a
1619            .update(&mut cx_a, |channel, cx| {
1620                channel
1621                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
1622                    .unwrap()
1623            })
1624            .await
1625            .unwrap();
1626        assert_eq!(
1627            db.get_channel_messages(channel_id, 10, None)
1628                .await
1629                .unwrap()
1630                .iter()
1631                .map(|m| &m.body)
1632                .collect::<Vec<_>>(),
1633            &["surrounded by whitespace"]
1634        );
1635    }
1636
1637    #[gpui::test]
1638    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1639        cx_a.foreground().forbid_parking();
1640
1641        // Connect to a server as 2 clients.
1642        let mut server = TestServer::start().await;
1643        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1644        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1645        let mut status_b = client_b.status();
1646
1647        // Create an org that includes these 2 users.
1648        let db = &server.app_state.db;
1649        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1650        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1651            .await
1652            .unwrap();
1653        db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
1654            .await
1655            .unwrap();
1656
1657        // Create a channel that includes all the users.
1658        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1659        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1660            .await
1661            .unwrap();
1662        db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
1663            .await
1664            .unwrap();
1665        db.create_channel_message(
1666            channel_id,
1667            current_user_id(&user_store_b, &cx_b),
1668            "hello A, it's B.",
1669            OffsetDateTime::now_utc(),
1670            2,
1671        )
1672        .await
1673        .unwrap();
1674
1675        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1676        channels_a
1677            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1678            .await;
1679
1680        channels_a.read_with(&cx_a, |list, _| {
1681            assert_eq!(
1682                list.available_channels().unwrap(),
1683                &[ChannelDetails {
1684                    id: channel_id.to_proto(),
1685                    name: "test-channel".to_string()
1686                }]
1687            )
1688        });
1689        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1690            this.get_channel(channel_id.to_proto(), cx).unwrap()
1691        });
1692        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1693        channel_a
1694            .condition(&cx_a, |channel, _| {
1695                channel_messages(channel)
1696                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1697            })
1698            .await;
1699
1700        let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b.clone(), client_b, cx));
1701        channels_b
1702            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1703            .await;
1704        channels_b.read_with(&cx_b, |list, _| {
1705            assert_eq!(
1706                list.available_channels().unwrap(),
1707                &[ChannelDetails {
1708                    id: channel_id.to_proto(),
1709                    name: "test-channel".to_string()
1710                }]
1711            )
1712        });
1713
1714        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1715            this.get_channel(channel_id.to_proto(), cx).unwrap()
1716        });
1717        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1718        channel_b
1719            .condition(&cx_b, |channel, _| {
1720                channel_messages(channel)
1721                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1722            })
1723            .await;
1724
1725        // Disconnect client B, ensuring we can still access its cached channel data.
1726        server.forbid_connections();
1727        server.disconnect_client(current_user_id(&user_store_b, &cx_b));
1728        while !matches!(
1729            status_b.recv().await,
1730            Some(rpc::Status::ReconnectionError { .. })
1731        ) {}
1732
1733        channels_b.read_with(&cx_b, |channels, _| {
1734            assert_eq!(
1735                channels.available_channels().unwrap(),
1736                [ChannelDetails {
1737                    id: channel_id.to_proto(),
1738                    name: "test-channel".to_string()
1739                }]
1740            )
1741        });
1742        channel_b.read_with(&cx_b, |channel, _| {
1743            assert_eq!(
1744                channel_messages(channel),
1745                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1746            )
1747        });
1748
1749        // Send a message from client B while it is disconnected.
1750        channel_b
1751            .update(&mut cx_b, |channel, cx| {
1752                let task = channel
1753                    .send_message("can you see this?".to_string(), cx)
1754                    .unwrap();
1755                assert_eq!(
1756                    channel_messages(channel),
1757                    &[
1758                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1759                        ("user_b".to_string(), "can you see this?".to_string(), true)
1760                    ]
1761                );
1762                task
1763            })
1764            .await
1765            .unwrap_err();
1766
1767        // Send a message from client A while B is disconnected.
1768        channel_a
1769            .update(&mut cx_a, |channel, cx| {
1770                channel
1771                    .send_message("oh, hi B.".to_string(), cx)
1772                    .unwrap()
1773                    .detach();
1774                let task = channel.send_message("sup".to_string(), cx).unwrap();
1775                assert_eq!(
1776                    channel_messages(channel),
1777                    &[
1778                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1779                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1780                        ("user_a".to_string(), "sup".to_string(), true)
1781                    ]
1782                );
1783                task
1784            })
1785            .await
1786            .unwrap();
1787
1788        // Give client B a chance to reconnect.
1789        server.allow_connections();
1790        cx_b.foreground().advance_clock(Duration::from_secs(10));
1791
1792        // Verify that B sees the new messages upon reconnection, as well as the message client B
1793        // sent while offline.
1794        channel_b
1795            .condition(&cx_b, |channel, _| {
1796                channel_messages(channel)
1797                    == [
1798                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1799                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1800                        ("user_a".to_string(), "sup".to_string(), false),
1801                        ("user_b".to_string(), "can you see this?".to_string(), false),
1802                    ]
1803            })
1804            .await;
1805
1806        // Ensure client A and B can communicate normally after reconnection.
1807        channel_a
1808            .update(&mut cx_a, |channel, cx| {
1809                channel.send_message("you online?".to_string(), cx).unwrap()
1810            })
1811            .await
1812            .unwrap();
1813        channel_b
1814            .condition(&cx_b, |channel, _| {
1815                channel_messages(channel)
1816                    == [
1817                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1818                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1819                        ("user_a".to_string(), "sup".to_string(), false),
1820                        ("user_b".to_string(), "can you see this?".to_string(), false),
1821                        ("user_a".to_string(), "you online?".to_string(), false),
1822                    ]
1823            })
1824            .await;
1825
1826        channel_b
1827            .update(&mut cx_b, |channel, cx| {
1828                channel.send_message("yep".to_string(), cx).unwrap()
1829            })
1830            .await
1831            .unwrap();
1832        channel_a
1833            .condition(&cx_a, |channel, _| {
1834                channel_messages(channel)
1835                    == [
1836                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1837                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1838                        ("user_a".to_string(), "sup".to_string(), false),
1839                        ("user_b".to_string(), "can you see this?".to_string(), false),
1840                        ("user_a".to_string(), "you online?".to_string(), false),
1841                        ("user_b".to_string(), "yep".to_string(), false),
1842                    ]
1843            })
1844            .await;
1845    }
1846
1847    #[gpui::test]
1848    async fn test_collaborators(
1849        mut cx_a: TestAppContext,
1850        mut cx_b: TestAppContext,
1851        mut cx_c: TestAppContext,
1852    ) {
1853        cx_a.foreground().forbid_parking();
1854        let lang_registry = Arc::new(LanguageRegistry::new());
1855
1856        // Connect to a server as 3 clients.
1857        let mut server = TestServer::start().await;
1858        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1859        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1860        let (_client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await;
1861
1862        let fs = Arc::new(FakeFs::new());
1863
1864        // Share a worktree as client A.
1865        fs.insert_tree(
1866            "/a",
1867            json!({
1868                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1869            }),
1870        )
1871        .await;
1872
1873        let worktree_a = Worktree::open_local(
1874            client_a.clone(),
1875            "/a".as_ref(),
1876            fs.clone(),
1877            lang_registry.clone(),
1878            &mut cx_a.to_async(),
1879        )
1880        .await
1881        .unwrap();
1882
1883        user_store_a
1884            .condition(&cx_a, |user_store, _| {
1885                collaborators(user_store) == vec![("user_a", vec![("a", vec![])])]
1886            })
1887            .await;
1888        user_store_b
1889            .condition(&cx_b, |user_store, _| {
1890                collaborators(user_store) == vec![("user_a", vec![("a", vec![])])]
1891            })
1892            .await;
1893        user_store_c
1894            .condition(&cx_c, |user_store, _| {
1895                collaborators(user_store) == vec![("user_a", vec![("a", vec![])])]
1896            })
1897            .await;
1898
1899        let worktree_id = worktree_a
1900            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1901            .await
1902            .unwrap();
1903
1904        let _worktree_b = Worktree::open_remote(
1905            client_b.clone(),
1906            worktree_id,
1907            lang_registry.clone(),
1908            &mut cx_b.to_async(),
1909        )
1910        .await
1911        .unwrap();
1912
1913        user_store_a
1914            .condition(&cx_a, |user_store, _| {
1915                collaborators(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
1916            })
1917            .await;
1918        user_store_b
1919            .condition(&cx_b, |user_store, _| {
1920                collaborators(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
1921            })
1922            .await;
1923        user_store_c
1924            .condition(&cx_c, |user_store, _| {
1925                collaborators(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
1926            })
1927            .await;
1928
1929        cx_a.update(move |_| drop(worktree_a));
1930        user_store_a
1931            .condition(&cx_a, |user_store, _| collaborators(user_store) == vec![])
1932            .await;
1933        user_store_b
1934            .condition(&cx_b, |user_store, _| collaborators(user_store) == vec![])
1935            .await;
1936        user_store_c
1937            .condition(&cx_c, |user_store, _| collaborators(user_store) == vec![])
1938            .await;
1939
1940        fn collaborators(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
1941            user_store
1942                .collaborators()
1943                .iter()
1944                .map(|collaborator| {
1945                    let worktrees = collaborator
1946                        .worktrees
1947                        .iter()
1948                        .map(|w| {
1949                            (
1950                                w.root_name.as_str(),
1951                                w.participants
1952                                    .iter()
1953                                    .map(|p| p.github_login.as_str())
1954                                    .collect(),
1955                            )
1956                        })
1957                        .collect();
1958                    (collaborator.user.github_login.as_str(), worktrees)
1959                })
1960                .collect()
1961        }
1962    }
1963
1964    struct TestServer {
1965        peer: Arc<Peer>,
1966        app_state: Arc<AppState>,
1967        server: Arc<Server>,
1968        notifications: mpsc::Receiver<()>,
1969        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
1970        forbid_connections: Arc<AtomicBool>,
1971        _test_db: TestDb,
1972    }
1973
1974    impl TestServer {
1975        async fn start() -> Self {
1976            let test_db = TestDb::new();
1977            let app_state = Self::build_app_state(&test_db).await;
1978            let peer = Peer::new();
1979            let notifications = mpsc::channel(128);
1980            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
1981            Self {
1982                peer,
1983                app_state,
1984                server,
1985                notifications: notifications.1,
1986                connection_killers: Default::default(),
1987                forbid_connections: Default::default(),
1988                _test_db: test_db,
1989            }
1990        }
1991
1992        async fn create_client(
1993            &mut self,
1994            cx: &mut TestAppContext,
1995            name: &str,
1996        ) -> (Arc<Client>, ModelHandle<UserStore>) {
1997            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
1998            let client_name = name.to_string();
1999            let mut client = Client::new();
2000            let server = self.server.clone();
2001            let connection_killers = self.connection_killers.clone();
2002            let forbid_connections = self.forbid_connections.clone();
2003            Arc::get_mut(&mut client)
2004                .unwrap()
2005                .override_authenticate(move |cx| {
2006                    cx.spawn(|_| async move {
2007                        let access_token = "the-token".to_string();
2008                        Ok(Credentials {
2009                            user_id: user_id.0 as u64,
2010                            access_token,
2011                        })
2012                    })
2013                })
2014                .override_establish_connection(move |credentials, cx| {
2015                    assert_eq!(credentials.user_id, user_id.0 as u64);
2016                    assert_eq!(credentials.access_token, "the-token");
2017
2018                    let server = server.clone();
2019                    let connection_killers = connection_killers.clone();
2020                    let forbid_connections = forbid_connections.clone();
2021                    let client_name = client_name.clone();
2022                    cx.spawn(move |cx| async move {
2023                        if forbid_connections.load(SeqCst) {
2024                            Err(EstablishConnectionError::other(anyhow!(
2025                                "server is forbidding connections"
2026                            )))
2027                        } else {
2028                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2029                            connection_killers.lock().insert(user_id, kill_conn);
2030                            cx.background()
2031                                .spawn(server.handle_connection(server_conn, client_name, user_id))
2032                                .detach();
2033                            Ok(client_conn)
2034                        }
2035                    })
2036                });
2037
2038            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2039            client
2040                .authenticate_and_connect(&cx.to_async())
2041                .await
2042                .unwrap();
2043
2044            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2045            let mut authed_user =
2046                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2047            while authed_user.recv().await.unwrap().is_none() {}
2048
2049            (client, user_store)
2050        }
2051
2052        fn disconnect_client(&self, user_id: UserId) {
2053            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2054                let _ = kill_conn.try_send(Some(()));
2055            }
2056        }
2057
2058        fn forbid_connections(&self) {
2059            self.forbid_connections.store(true, SeqCst);
2060        }
2061
2062        fn allow_connections(&self) {
2063            self.forbid_connections.store(false, SeqCst);
2064        }
2065
2066        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2067            let mut config = Config::default();
2068            config.session_secret = "a".repeat(32);
2069            config.database_url = test_db.url.clone();
2070            let github_client = github::AppClient::test();
2071            Arc::new(AppState {
2072                db: test_db.db().clone(),
2073                handlebars: Default::default(),
2074                auth_client: auth::build_client("", ""),
2075                repo_client: github::RepoClient::test(&github_client),
2076                github_client,
2077                config,
2078            })
2079        }
2080
2081        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2082            self.server.store.read().await
2083        }
2084
2085        async fn condition<F>(&mut self, mut predicate: F)
2086        where
2087            F: FnMut(&Store) -> bool,
2088        {
2089            async_std::future::timeout(Duration::from_millis(500), async {
2090                while !(predicate)(&*self.server.store.read().await) {
2091                    self.notifications.recv().await;
2092                }
2093            })
2094            .await
2095            .expect("condition timed out");
2096        }
2097    }
2098
2099    impl Drop for TestServer {
2100        fn drop(&mut self) {
2101            task::block_on(self.peer.reset());
2102        }
2103    }
2104
2105    fn current_user_id(user_store: &ModelHandle<UserStore>, cx: &TestAppContext) -> UserId {
2106        UserId::from_proto(
2107            user_store.read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2108        )
2109    }
2110
2111    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2112        channel
2113            .messages()
2114            .cursor::<(), ()>()
2115            .map(|m| {
2116                (
2117                    m.sender.github_login.clone(),
2118                    m.body.clone(),
2119                    m.is_pending(),
2120                )
2121            })
2122            .collect()
2123    }
2124
2125    struct EmptyView;
2126
2127    impl gpui::Entity for EmptyView {
2128        type Event = ();
2129    }
2130
2131    impl gpui::View for EmptyView {
2132        fn ui_name() -> &'static str {
2133            "empty view"
2134        }
2135
2136        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2137            gpui::Element::boxed(gpui::elements::Empty)
2138        }
2139    }
2140}