rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use futures::{future::BoxFuture, FutureExt};
  12use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  13use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{
  20    any::TypeId,
  21    collections::{HashMap, HashSet},
  22    future::Future,
  23    mem,
  24    sync::Arc,
  25    time::Instant,
  26};
  27use store::{Store, Worktree};
  28use surf::StatusCode;
  29use tide::log;
  30use tide::{
  31    http::headers::{HeaderName, CONNECTION, UPGRADE},
  32    Request, Response,
  33};
  34use time::OffsetDateTime;
  35
  36type MessageHandler = Box<
  37    dyn Send
  38        + Sync
  39        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  40>;
  41
  42pub struct Server {
  43    peer: Arc<Peer>,
  44    store: RwLock<Store>,
  45    app_state: Arc<AppState>,
  46    handlers: HashMap<TypeId, MessageHandler>,
  47    notifications: Option<mpsc::Sender<()>>,
  48}
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_handler(Server::ping)
  69            .add_handler(Server::open_worktree)
  70            .add_handler(Server::close_worktree)
  71            .add_handler(Server::share_worktree)
  72            .add_handler(Server::unshare_worktree)
  73            .add_handler(Server::join_worktree)
  74            .add_handler(Server::leave_worktree)
  75            .add_handler(Server::update_worktree)
  76            .add_handler(Server::open_buffer)
  77            .add_handler(Server::close_buffer)
  78            .add_handler(Server::update_buffer)
  79            .add_handler(Server::buffer_saved)
  80            .add_handler(Server::save_buffer)
  81            .add_handler(Server::get_channels)
  82            .add_handler(Server::get_users)
  83            .add_handler(Server::join_channel)
  84            .add_handler(Server::leave_channel)
  85            .add_handler(Server::send_channel_message)
  86            .add_handler(Server::get_channel_messages);
  87
  88        Arc::new(server)
  89    }
  90
  91    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  92    where
  93        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  94        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  95        M: EnvelopedMessage,
  96    {
  97        let prev_handler = self.handlers.insert(
  98            TypeId::of::<M>(),
  99            Box::new(move |server, envelope| {
 100                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 101                (handler)(server, *envelope).boxed()
 102            }),
 103        );
 104        if prev_handler.is_some() {
 105            panic!("registered a handler for the same message twice");
 106        }
 107        self
 108    }
 109
 110    pub fn handle_connection(
 111        self: &Arc<Self>,
 112        connection: Connection,
 113        addr: String,
 114        user_id: UserId,
 115        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 116    ) -> impl Future<Output = ()> {
 117        let mut this = self.clone();
 118        async move {
 119            let (connection_id, handle_io, mut incoming_rx) =
 120                this.peer.add_connection(connection).await;
 121
 122            if let Some(send_connection_id) = send_connection_id.as_mut() {
 123                let _ = send_connection_id.send(connection_id).await;
 124            }
 125
 126            this.state_mut().add_connection(connection_id, user_id);
 127            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 128                log::error!("error updating contacts for {:?}: {}", user_id, err);
 129            }
 130
 131            let handle_io = handle_io.fuse();
 132            futures::pin_mut!(handle_io);
 133            loop {
 134                let next_message = incoming_rx.recv().fuse();
 135                futures::pin_mut!(next_message);
 136                futures::select_biased! {
 137                    message = next_message => {
 138                        if let Some(message) = message {
 139                            let start_time = Instant::now();
 140                            log::info!("RPC message received: {}", message.payload_type_name());
 141                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 142                                if let Err(err) = (handler)(this.clone(), message).await {
 143                                    log::error!("error handling message: {:?}", err);
 144                                } else {
 145                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 146                                }
 147
 148                                if let Some(mut notifications) = this.notifications.clone() {
 149                                    let _ = notifications.send(()).await;
 150                                }
 151                            } else {
 152                                log::warn!("unhandled message: {}", message.payload_type_name());
 153                            }
 154                        } else {
 155                            log::info!("rpc connection closed {:?}", addr);
 156                            break;
 157                        }
 158                    }
 159                    handle_io = handle_io => {
 160                        if let Err(err) = handle_io {
 161                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 162                        }
 163                        break;
 164                    }
 165                }
 166            }
 167
 168            if let Err(err) = this.sign_out(connection_id).await {
 169                log::error!("error signing out connection {:?} - {:?}", addr, err);
 170            }
 171        }
 172    }
 173
 174    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 175        self.peer.disconnect(connection_id).await;
 176        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 177
 178        for (worktree_id, worktree) in removed_connection.hosted_worktrees {
 179            if let Some(share) = worktree.share {
 180                broadcast(
 181                    connection_id,
 182                    share.guests.keys().copied().collect(),
 183                    |conn_id| {
 184                        self.peer
 185                            .send(conn_id, proto::UnshareWorktree { worktree_id })
 186                    },
 187                )
 188                .await?;
 189            }
 190        }
 191
 192        for (worktree_id, peer_ids) in removed_connection.guest_worktree_ids {
 193            broadcast(connection_id, peer_ids, |conn_id| {
 194                self.peer.send(
 195                    conn_id,
 196                    proto::RemoveCollaborator {
 197                        worktree_id,
 198                        peer_id: connection_id.0,
 199                    },
 200                )
 201            })
 202            .await?;
 203        }
 204
 205        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 206            .await?;
 207
 208        Ok(())
 209    }
 210
 211    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 212        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 213        Ok(())
 214    }
 215
 216    async fn open_worktree(
 217        mut self: Arc<Server>,
 218        request: TypedEnvelope<proto::OpenWorktree>,
 219    ) -> tide::Result<()> {
 220        let receipt = request.receipt();
 221        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 222
 223        let mut contact_user_ids = HashSet::new();
 224        contact_user_ids.insert(host_user_id);
 225        for github_login in request.payload.authorized_logins {
 226            match self.app_state.db.create_user(&github_login, false).await {
 227                Ok(contact_user_id) => {
 228                    contact_user_ids.insert(contact_user_id);
 229                }
 230                Err(err) => {
 231                    let message = err.to_string();
 232                    self.peer
 233                        .respond_with_error(receipt, proto::Error { message })
 234                        .await?;
 235                    return Ok(());
 236                }
 237            }
 238        }
 239
 240        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 241        let worktree_id = self.state_mut().add_worktree(Worktree {
 242            host_connection_id: request.sender_id,
 243            host_user_id,
 244            authorized_user_ids: contact_user_ids.clone(),
 245            root_name: request.payload.root_name,
 246            share: None,
 247        });
 248
 249        self.peer
 250            .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
 251            .await?;
 252        self.update_contacts_for_users(&contact_user_ids).await?;
 253
 254        Ok(())
 255    }
 256
 257    async fn close_worktree(
 258        mut self: Arc<Server>,
 259        request: TypedEnvelope<proto::CloseWorktree>,
 260    ) -> tide::Result<()> {
 261        let worktree_id = request.payload.worktree_id;
 262        let worktree = self
 263            .state_mut()
 264            .remove_worktree(worktree_id, request.sender_id)?;
 265
 266        if let Some(share) = worktree.share {
 267            broadcast(
 268                request.sender_id,
 269                share.guests.keys().copied().collect(),
 270                |conn_id| {
 271                    self.peer
 272                        .send(conn_id, proto::UnshareWorktree { worktree_id })
 273                },
 274            )
 275            .await?;
 276        }
 277        self.update_contacts_for_users(&worktree.authorized_user_ids)
 278            .await?;
 279        Ok(())
 280    }
 281
 282    async fn share_worktree(
 283        mut self: Arc<Server>,
 284        mut request: TypedEnvelope<proto::ShareWorktree>,
 285    ) -> tide::Result<()> {
 286        let worktree = request
 287            .payload
 288            .worktree
 289            .as_mut()
 290            .ok_or_else(|| anyhow!("missing worktree"))?;
 291        let entries = mem::take(&mut worktree.entries)
 292            .into_iter()
 293            .map(|entry| (entry.id, entry))
 294            .collect();
 295
 296        let contact_user_ids =
 297            self.state_mut()
 298                .share_worktree(worktree.id, request.sender_id, entries);
 299        if let Some(contact_user_ids) = contact_user_ids {
 300            self.peer
 301                .respond(request.receipt(), proto::ShareWorktreeResponse {})
 302                .await?;
 303            self.update_contacts_for_users(&contact_user_ids).await?;
 304        } else {
 305            self.peer
 306                .respond_with_error(
 307                    request.receipt(),
 308                    proto::Error {
 309                        message: "no such worktree".to_string(),
 310                    },
 311                )
 312                .await?;
 313        }
 314        Ok(())
 315    }
 316
 317    async fn unshare_worktree(
 318        mut self: Arc<Server>,
 319        request: TypedEnvelope<proto::UnshareWorktree>,
 320    ) -> tide::Result<()> {
 321        let worktree_id = request.payload.worktree_id;
 322        let worktree = self
 323            .state_mut()
 324            .unshare_worktree(worktree_id, request.sender_id)?;
 325
 326        broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
 327            self.peer
 328                .send(conn_id, proto::UnshareWorktree { worktree_id })
 329        })
 330        .await?;
 331        self.update_contacts_for_users(&worktree.authorized_user_ids)
 332            .await?;
 333
 334        Ok(())
 335    }
 336
 337    async fn join_worktree(
 338        mut self: Arc<Server>,
 339        request: TypedEnvelope<proto::JoinWorktree>,
 340    ) -> tide::Result<()> {
 341        let worktree_id = request.payload.worktree_id;
 342
 343        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 344        let response_data = self
 345            .state_mut()
 346            .join_worktree(request.sender_id, user_id, worktree_id)
 347            .and_then(|joined| {
 348                let share = joined.worktree.share()?;
 349                let peer_count = share.guests.len();
 350                let mut collaborators = Vec::with_capacity(peer_count);
 351                collaborators.push(proto::Collaborator {
 352                    peer_id: joined.worktree.host_connection_id.0,
 353                    replica_id: 0,
 354                    user_id: joined.worktree.host_user_id.to_proto(),
 355                });
 356                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 357                    if *peer_conn_id != request.sender_id {
 358                        collaborators.push(proto::Collaborator {
 359                            peer_id: peer_conn_id.0,
 360                            replica_id: *peer_replica_id as u32,
 361                            user_id: peer_user_id.to_proto(),
 362                        });
 363                    }
 364                }
 365                let response = proto::JoinWorktreeResponse {
 366                    worktree: Some(proto::Worktree {
 367                        id: worktree_id,
 368                        root_name: joined.worktree.root_name.clone(),
 369                        entries: share.entries.values().cloned().collect(),
 370                    }),
 371                    replica_id: joined.replica_id as u32,
 372                    collaborators,
 373                };
 374                let connection_ids = joined.worktree.connection_ids();
 375                let contact_user_ids = joined.worktree.authorized_user_ids.clone();
 376                Ok((response, connection_ids, contact_user_ids))
 377            });
 378
 379        match response_data {
 380            Ok((response, connection_ids, contact_user_ids)) => {
 381                broadcast(request.sender_id, connection_ids, |conn_id| {
 382                    self.peer.send(
 383                        conn_id,
 384                        proto::AddCollaborator {
 385                            worktree_id,
 386                            collaborator: Some(proto::Collaborator {
 387                                peer_id: request.sender_id.0,
 388                                replica_id: response.replica_id,
 389                                user_id: user_id.to_proto(),
 390                            }),
 391                        },
 392                    )
 393                })
 394                .await?;
 395                self.peer.respond(request.receipt(), response).await?;
 396                self.update_contacts_for_users(&contact_user_ids).await?;
 397            }
 398            Err(error) => {
 399                self.peer
 400                    .respond_with_error(
 401                        request.receipt(),
 402                        proto::Error {
 403                            message: error.to_string(),
 404                        },
 405                    )
 406                    .await?;
 407            }
 408        }
 409
 410        Ok(())
 411    }
 412
 413    async fn leave_worktree(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::LeaveWorktree>,
 416    ) -> tide::Result<()> {
 417        let sender_id = request.sender_id;
 418        let worktree_id = request.payload.worktree_id;
 419        let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
 420        if let Some(worktree) = worktree {
 421            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 422                self.peer.send(
 423                    conn_id,
 424                    proto::RemoveCollaborator {
 425                        worktree_id,
 426                        peer_id: sender_id.0,
 427                    },
 428                )
 429            })
 430            .await?;
 431            self.update_contacts_for_users(&worktree.authorized_user_ids)
 432                .await?;
 433        }
 434        Ok(())
 435    }
 436
 437    async fn update_worktree(
 438        mut self: Arc<Server>,
 439        request: TypedEnvelope<proto::UpdateWorktree>,
 440    ) -> tide::Result<()> {
 441        let connection_ids = self.state_mut().update_worktree(
 442            request.sender_id,
 443            request.payload.worktree_id,
 444            &request.payload.removed_entries,
 445            &request.payload.updated_entries,
 446        )?;
 447
 448        broadcast(request.sender_id, connection_ids, |connection_id| {
 449            self.peer
 450                .forward_send(request.sender_id, connection_id, request.payload.clone())
 451        })
 452        .await?;
 453
 454        Ok(())
 455    }
 456
 457    async fn open_buffer(
 458        self: Arc<Server>,
 459        request: TypedEnvelope<proto::OpenBuffer>,
 460    ) -> tide::Result<()> {
 461        let receipt = request.receipt();
 462        let host_connection_id = self
 463            .state()
 464            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 465        let response = self
 466            .peer
 467            .forward_request(request.sender_id, host_connection_id, request.payload)
 468            .await?;
 469        self.peer.respond(receipt, response).await?;
 470        Ok(())
 471    }
 472
 473    async fn close_buffer(
 474        self: Arc<Server>,
 475        request: TypedEnvelope<proto::CloseBuffer>,
 476    ) -> tide::Result<()> {
 477        let host_connection_id = self
 478            .state()
 479            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 480        self.peer
 481            .forward_send(request.sender_id, host_connection_id, request.payload)
 482            .await?;
 483        Ok(())
 484    }
 485
 486    async fn save_buffer(
 487        self: Arc<Server>,
 488        request: TypedEnvelope<proto::SaveBuffer>,
 489    ) -> tide::Result<()> {
 490        let host;
 491        let guests;
 492        {
 493            let state = self.state();
 494            host = state
 495                .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 496            guests = state
 497                .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
 498        }
 499
 500        let sender = request.sender_id;
 501        let receipt = request.receipt();
 502        let response = self
 503            .peer
 504            .forward_request(sender, host, request.payload.clone())
 505            .await?;
 506
 507        broadcast(host, guests, |conn_id| {
 508            let response = response.clone();
 509            let peer = &self.peer;
 510            async move {
 511                if conn_id == sender {
 512                    peer.respond(receipt, response).await
 513                } else {
 514                    peer.forward_send(host, conn_id, response).await
 515                }
 516            }
 517        })
 518        .await?;
 519
 520        Ok(())
 521    }
 522
 523    async fn update_buffer(
 524        self: Arc<Server>,
 525        request: TypedEnvelope<proto::UpdateBuffer>,
 526    ) -> tide::Result<()> {
 527        let receiver_ids = self
 528            .state()
 529            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 530        broadcast(request.sender_id, receiver_ids, |connection_id| {
 531            self.peer
 532                .forward_send(request.sender_id, connection_id, request.payload.clone())
 533        })
 534        .await?;
 535        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 536        Ok(())
 537    }
 538
 539    async fn buffer_saved(
 540        self: Arc<Server>,
 541        request: TypedEnvelope<proto::BufferSaved>,
 542    ) -> tide::Result<()> {
 543        let receiver_ids = self
 544            .state()
 545            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 546        broadcast(request.sender_id, receiver_ids, |connection_id| {
 547            self.peer
 548                .forward_send(request.sender_id, connection_id, request.payload.clone())
 549        })
 550        .await?;
 551        Ok(())
 552    }
 553
 554    async fn get_channels(
 555        self: Arc<Server>,
 556        request: TypedEnvelope<proto::GetChannels>,
 557    ) -> tide::Result<()> {
 558        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 559        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 560        self.peer
 561            .respond(
 562                request.receipt(),
 563                proto::GetChannelsResponse {
 564                    channels: channels
 565                        .into_iter()
 566                        .map(|chan| proto::Channel {
 567                            id: chan.id.to_proto(),
 568                            name: chan.name,
 569                        })
 570                        .collect(),
 571                },
 572            )
 573            .await?;
 574        Ok(())
 575    }
 576
 577    async fn get_users(
 578        self: Arc<Server>,
 579        request: TypedEnvelope<proto::GetUsers>,
 580    ) -> tide::Result<()> {
 581        let receipt = request.receipt();
 582        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 583        let users = self
 584            .app_state
 585            .db
 586            .get_users_by_ids(user_ids)
 587            .await?
 588            .into_iter()
 589            .map(|user| proto::User {
 590                id: user.id.to_proto(),
 591                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 592                github_login: user.github_login,
 593            })
 594            .collect();
 595        self.peer
 596            .respond(receipt, proto::GetUsersResponse { users })
 597            .await?;
 598        Ok(())
 599    }
 600
 601    async fn update_contacts_for_users<'a>(
 602        self: &Arc<Server>,
 603        user_ids: impl IntoIterator<Item = &'a UserId>,
 604    ) -> tide::Result<()> {
 605        let mut send_futures = Vec::new();
 606
 607        {
 608            let state = self.state();
 609            for user_id in user_ids {
 610                let contacts = state.contacts_for_user(*user_id);
 611                for connection_id in state.connection_ids_for_user(*user_id) {
 612                    send_futures.push(self.peer.send(
 613                        connection_id,
 614                        proto::UpdateContacts {
 615                            contacts: contacts.clone(),
 616                        },
 617                    ));
 618                }
 619            }
 620        }
 621        futures::future::try_join_all(send_futures).await?;
 622
 623        Ok(())
 624    }
 625
 626    async fn join_channel(
 627        mut self: Arc<Self>,
 628        request: TypedEnvelope<proto::JoinChannel>,
 629    ) -> tide::Result<()> {
 630        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 631        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 632        if !self
 633            .app_state
 634            .db
 635            .can_user_access_channel(user_id, channel_id)
 636            .await?
 637        {
 638            Err(anyhow!("access denied"))?;
 639        }
 640
 641        self.state_mut().join_channel(request.sender_id, channel_id);
 642        let messages = self
 643            .app_state
 644            .db
 645            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 646            .await?
 647            .into_iter()
 648            .map(|msg| proto::ChannelMessage {
 649                id: msg.id.to_proto(),
 650                body: msg.body,
 651                timestamp: msg.sent_at.unix_timestamp() as u64,
 652                sender_id: msg.sender_id.to_proto(),
 653                nonce: Some(msg.nonce.as_u128().into()),
 654            })
 655            .collect::<Vec<_>>();
 656        self.peer
 657            .respond(
 658                request.receipt(),
 659                proto::JoinChannelResponse {
 660                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 661                    messages,
 662                },
 663            )
 664            .await?;
 665        Ok(())
 666    }
 667
 668    async fn leave_channel(
 669        mut self: Arc<Self>,
 670        request: TypedEnvelope<proto::LeaveChannel>,
 671    ) -> tide::Result<()> {
 672        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 673        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 674        if !self
 675            .app_state
 676            .db
 677            .can_user_access_channel(user_id, channel_id)
 678            .await?
 679        {
 680            Err(anyhow!("access denied"))?;
 681        }
 682
 683        self.state_mut()
 684            .leave_channel(request.sender_id, channel_id);
 685
 686        Ok(())
 687    }
 688
 689    async fn send_channel_message(
 690        self: Arc<Self>,
 691        request: TypedEnvelope<proto::SendChannelMessage>,
 692    ) -> tide::Result<()> {
 693        let receipt = request.receipt();
 694        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 695        let user_id;
 696        let connection_ids;
 697        {
 698            let state = self.state();
 699            user_id = state.user_id_for_connection(request.sender_id)?;
 700            if let Some(ids) = state.channel_connection_ids(channel_id) {
 701                connection_ids = ids;
 702            } else {
 703                return Ok(());
 704            }
 705        }
 706
 707        // Validate the message body.
 708        let body = request.payload.body.trim().to_string();
 709        if body.len() > MAX_MESSAGE_LEN {
 710            self.peer
 711                .respond_with_error(
 712                    receipt,
 713                    proto::Error {
 714                        message: "message is too long".to_string(),
 715                    },
 716                )
 717                .await?;
 718            return Ok(());
 719        }
 720        if body.is_empty() {
 721            self.peer
 722                .respond_with_error(
 723                    receipt,
 724                    proto::Error {
 725                        message: "message can't be blank".to_string(),
 726                    },
 727                )
 728                .await?;
 729            return Ok(());
 730        }
 731
 732        let timestamp = OffsetDateTime::now_utc();
 733        let nonce = if let Some(nonce) = request.payload.nonce {
 734            nonce
 735        } else {
 736            self.peer
 737                .respond_with_error(
 738                    receipt,
 739                    proto::Error {
 740                        message: "nonce can't be blank".to_string(),
 741                    },
 742                )
 743                .await?;
 744            return Ok(());
 745        };
 746
 747        let message_id = self
 748            .app_state
 749            .db
 750            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 751            .await?
 752            .to_proto();
 753        let message = proto::ChannelMessage {
 754            sender_id: user_id.to_proto(),
 755            id: message_id,
 756            body,
 757            timestamp: timestamp.unix_timestamp() as u64,
 758            nonce: Some(nonce),
 759        };
 760        broadcast(request.sender_id, connection_ids, |conn_id| {
 761            self.peer.send(
 762                conn_id,
 763                proto::ChannelMessageSent {
 764                    channel_id: channel_id.to_proto(),
 765                    message: Some(message.clone()),
 766                },
 767            )
 768        })
 769        .await?;
 770        self.peer
 771            .respond(
 772                receipt,
 773                proto::SendChannelMessageResponse {
 774                    message: Some(message),
 775                },
 776            )
 777            .await?;
 778        Ok(())
 779    }
 780
 781    async fn get_channel_messages(
 782        self: Arc<Self>,
 783        request: TypedEnvelope<proto::GetChannelMessages>,
 784    ) -> tide::Result<()> {
 785        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 786        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 787        if !self
 788            .app_state
 789            .db
 790            .can_user_access_channel(user_id, channel_id)
 791            .await?
 792        {
 793            Err(anyhow!("access denied"))?;
 794        }
 795
 796        let messages = self
 797            .app_state
 798            .db
 799            .get_channel_messages(
 800                channel_id,
 801                MESSAGE_COUNT_PER_PAGE,
 802                Some(MessageId::from_proto(request.payload.before_message_id)),
 803            )
 804            .await?
 805            .into_iter()
 806            .map(|msg| proto::ChannelMessage {
 807                id: msg.id.to_proto(),
 808                body: msg.body,
 809                timestamp: msg.sent_at.unix_timestamp() as u64,
 810                sender_id: msg.sender_id.to_proto(),
 811                nonce: Some(msg.nonce.as_u128().into()),
 812            })
 813            .collect::<Vec<_>>();
 814        self.peer
 815            .respond(
 816                request.receipt(),
 817                proto::GetChannelMessagesResponse {
 818                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 819                    messages,
 820                },
 821            )
 822            .await?;
 823        Ok(())
 824    }
 825
 826    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 827        self.store.read()
 828    }
 829
 830    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 831        self.store.write()
 832    }
 833}
 834
 835pub async fn broadcast<F, T>(
 836    sender_id: ConnectionId,
 837    receiver_ids: Vec<ConnectionId>,
 838    mut f: F,
 839) -> anyhow::Result<()>
 840where
 841    F: FnMut(ConnectionId) -> T,
 842    T: Future<Output = anyhow::Result<()>>,
 843{
 844    let futures = receiver_ids
 845        .into_iter()
 846        .filter(|id| *id != sender_id)
 847        .map(|id| f(id));
 848    futures::future::try_join_all(futures).await?;
 849    Ok(())
 850}
 851
 852pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 853    let server = Server::new(app.state().clone(), rpc.clone(), None);
 854    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 855        let server = server.clone();
 856        async move {
 857            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 858
 859            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 860            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 861            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 862            let client_protocol_version: Option<u32> = request
 863                .header("X-Zed-Protocol-Version")
 864                .and_then(|v| v.as_str().parse().ok());
 865
 866            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 867                return Ok(Response::new(StatusCode::UpgradeRequired));
 868            }
 869
 870            let header = match request.header("Sec-Websocket-Key") {
 871                Some(h) => h.as_str(),
 872                None => return Err(anyhow!("expected sec-websocket-key"))?,
 873            };
 874
 875            let user_id = process_auth_header(&request).await?;
 876
 877            let mut response = Response::new(StatusCode::SwitchingProtocols);
 878            response.insert_header(UPGRADE, "websocket");
 879            response.insert_header(CONNECTION, "Upgrade");
 880            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 881            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 882            response.insert_header("Sec-Websocket-Version", "13");
 883
 884            let http_res: &mut tide::http::Response = response.as_mut();
 885            let upgrade_receiver = http_res.recv_upgrade().await;
 886            let addr = request.remote().unwrap_or("unknown").to_string();
 887            task::spawn(async move {
 888                if let Some(stream) = upgrade_receiver.await {
 889                    server
 890                        .handle_connection(
 891                            Connection::new(
 892                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 893                            ),
 894                            addr,
 895                            user_id,
 896                            None,
 897                        )
 898                        .await;
 899                }
 900            });
 901
 902            Ok(response)
 903        }
 904    });
 905}
 906
 907fn header_contains_ignore_case<T>(
 908    request: &tide::Request<T>,
 909    header_name: HeaderName,
 910    value: &str,
 911) -> bool {
 912    request
 913        .header(header_name)
 914        .map(|h| {
 915            h.as_str()
 916                .split(',')
 917                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 918        })
 919        .unwrap_or(false)
 920}
 921
 922#[cfg(test)]
 923mod tests {
 924    use super::*;
 925    use crate::{
 926        auth,
 927        db::{tests::TestDb, UserId},
 928        github, AppState, Config,
 929    };
 930    use ::rpc::Peer;
 931    use async_std::task;
 932    use gpui::{ModelHandle, TestAppContext};
 933    use parking_lot::Mutex;
 934    use postage::{mpsc, watch};
 935    use rpc::PeerId;
 936    use serde_json::json;
 937    use sqlx::types::time::OffsetDateTime;
 938    use std::{
 939        ops::Deref,
 940        path::Path,
 941        sync::{
 942            atomic::{AtomicBool, Ordering::SeqCst},
 943            Arc,
 944        },
 945        time::Duration,
 946    };
 947    use zed::{
 948        client::{
 949            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
 950            EstablishConnectionError, UserStore,
 951        },
 952        contacts_panel::JoinWorktree,
 953        editor::{Editor, EditorSettings, Input},
 954        fs::{FakeFs, Fs as _},
 955        language::{
 956            tree_sitter_rust, Diagnostic, Language, LanguageConfig, LanguageRegistry,
 957            LanguageServerConfig, Point,
 958        },
 959        lsp,
 960        project::{ProjectPath, Worktree},
 961        test::test_app_state,
 962        workspace::Workspace,
 963    };
 964
 965    #[gpui::test]
 966    async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
 967        let (window_b, _) = cx_b.add_window(|_| EmptyView);
 968        let lang_registry = Arc::new(LanguageRegistry::new());
 969
 970        // Connect to a server as 2 clients.
 971        let mut server = TestServer::start().await;
 972        let client_a = server.create_client(&mut cx_a, "user_a").await;
 973        let client_b = server.create_client(&mut cx_b, "user_b").await;
 974
 975        cx_a.foreground().forbid_parking();
 976
 977        // Share a local worktree as client A
 978        let fs = Arc::new(FakeFs::new());
 979        fs.insert_tree(
 980            "/a",
 981            json!({
 982                ".zed.toml": r#"collaborators = ["user_b"]"#,
 983                "a.txt": "a-contents",
 984                "b.txt": "b-contents",
 985            }),
 986        )
 987        .await;
 988        let worktree_a = Worktree::open_local(
 989            client_a.clone(),
 990            client_a.user_store.clone(),
 991            "/a".as_ref(),
 992            fs,
 993            lang_registry.clone(),
 994            &mut cx_a.to_async(),
 995        )
 996        .await
 997        .unwrap();
 998        worktree_a
 999            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1000            .await;
1001        let worktree_id = worktree_a
1002            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1003            .await
1004            .unwrap();
1005
1006        // Join that worktree as client B, and see that a guest has joined as client A.
1007        let worktree_b = Worktree::open_remote(
1008            client_b.clone(),
1009            worktree_id,
1010            lang_registry.clone(),
1011            client_b.user_store.clone(),
1012            &mut cx_b.to_async(),
1013        )
1014        .await
1015        .unwrap();
1016
1017        let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| {
1018            assert_eq!(
1019                tree.collaborators()
1020                    .get(&client_a.peer_id)
1021                    .unwrap()
1022                    .user
1023                    .github_login,
1024                "user_a"
1025            );
1026            tree.replica_id()
1027        });
1028        worktree_a
1029            .condition(&cx_a, |tree, _| {
1030                tree.collaborators()
1031                    .get(&client_b.peer_id)
1032                    .map_or(false, |collaborator| {
1033                        collaborator.replica_id == replica_id_b
1034                            && collaborator.user.github_login == "user_b"
1035                    })
1036            })
1037            .await;
1038
1039        // Open the same file as client B and client A.
1040        let buffer_b = worktree_b
1041            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1042            .await
1043            .unwrap();
1044        buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1045        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1046        let buffer_a = worktree_a
1047            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1048            .await
1049            .unwrap();
1050
1051        // Create a selection set as client B and see that selection set as client A.
1052        let editor_b = cx_b.add_view(window_b, |cx| {
1053            Editor::for_buffer(buffer_b, |cx| EditorSettings::test(cx), cx)
1054        });
1055        buffer_a
1056            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1057            .await;
1058
1059        // Edit the buffer as client B and see that edit as client A.
1060        editor_b.update(&mut cx_b, |editor, cx| {
1061            editor.handle_input(&Input("ok, ".into()), cx)
1062        });
1063        buffer_a
1064            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1065            .await;
1066
1067        // Remove the selection set as client B, see those selections disappear as client A.
1068        cx_b.update(move |_| drop(editor_b));
1069        buffer_a
1070            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1071            .await;
1072
1073        // Close the buffer as client A, see that the buffer is closed.
1074        cx_a.update(move |_| drop(buffer_a));
1075        worktree_a
1076            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1077            .await;
1078
1079        // Dropping the worktree removes client B from client A's collaborators.
1080        cx_b.update(move |_| drop(worktree_b));
1081        worktree_a
1082            .condition(&cx_a, |tree, _| tree.collaborators().is_empty())
1083            .await;
1084    }
1085
1086    #[gpui::test]
1087    async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1088        cx_b.update(zed::contacts_panel::init);
1089        let mut app_state_a = cx_a.update(test_app_state);
1090        let mut app_state_b = cx_b.update(test_app_state);
1091
1092        // Connect to a server as 2 clients.
1093        let mut server = TestServer::start().await;
1094        let client_a = server.create_client(&mut cx_a, "user_a").await;
1095        let client_b = server.create_client(&mut cx_b, "user_b").await;
1096        Arc::get_mut(&mut app_state_a).unwrap().client = client_a.clone();
1097        Arc::get_mut(&mut app_state_a).unwrap().user_store = client_a.user_store.clone();
1098        Arc::get_mut(&mut app_state_b).unwrap().client = client_b.clone();
1099        Arc::get_mut(&mut app_state_b).unwrap().user_store = client_b.user_store.clone();
1100
1101        cx_a.foreground().forbid_parking();
1102
1103        // Share a local worktree as client A
1104        let fs = Arc::new(FakeFs::new());
1105        fs.insert_tree(
1106            "/a",
1107            json!({
1108                ".zed.toml": r#"collaborators = ["user_b"]"#,
1109                "a.txt": "a-contents",
1110                "b.txt": "b-contents",
1111            }),
1112        )
1113        .await;
1114        let worktree_a = Worktree::open_local(
1115            app_state_a.client.clone(),
1116            app_state_a.user_store.clone(),
1117            "/a".as_ref(),
1118            fs,
1119            app_state_a.languages.clone(),
1120            &mut cx_a.to_async(),
1121        )
1122        .await
1123        .unwrap();
1124        worktree_a
1125            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1126            .await;
1127
1128        let remote_worktree_id = worktree_a
1129            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1130            .await
1131            .unwrap();
1132
1133        let (window_b, workspace_b) =
1134            cx_b.add_window(|cx| Workspace::new(&app_state_b.as_ref().into(), cx));
1135        cx_b.update(|cx| {
1136            cx.dispatch_action(
1137                window_b,
1138                vec![workspace_b.id()],
1139                &JoinWorktree(remote_worktree_id),
1140            );
1141        });
1142        workspace_b
1143            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1144            .await;
1145
1146        let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1147            let active_pane = workspace.active_pane().read(cx);
1148            assert!(active_pane.active_item().is_none());
1149            workspace.worktrees(cx).first().unwrap().id()
1150        });
1151        workspace_b
1152            .update(&mut cx_b, |workspace, cx| {
1153                workspace.open_entry(
1154                    ProjectPath {
1155                        worktree_id: local_worktree_id_b,
1156                        path: Path::new("a.txt").into(),
1157                    },
1158                    cx,
1159                )
1160            })
1161            .unwrap()
1162            .await;
1163        workspace_b.read_with(&cx_b, |workspace, cx| {
1164            let active_pane = workspace.active_pane().read(cx);
1165            assert!(active_pane.active_item().is_some());
1166        });
1167
1168        worktree_a.update(&mut cx_a, |tree, cx| {
1169            tree.as_local_mut().unwrap().unshare(cx);
1170        });
1171        workspace_b
1172            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1173            .await;
1174        workspace_b.read_with(&cx_b, |workspace, cx| {
1175            let active_pane = workspace.active_pane().read(cx);
1176            assert!(active_pane.active_item().is_none());
1177        });
1178    }
1179
1180    #[gpui::test]
1181    async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1182        mut cx_a: TestAppContext,
1183        mut cx_b: TestAppContext,
1184        mut cx_c: TestAppContext,
1185    ) {
1186        cx_a.foreground().forbid_parking();
1187        let lang_registry = Arc::new(LanguageRegistry::new());
1188
1189        // Connect to a server as 3 clients.
1190        let mut server = TestServer::start().await;
1191        let client_a = server.create_client(&mut cx_a, "user_a").await;
1192        let client_b = server.create_client(&mut cx_b, "user_b").await;
1193        let client_c = server.create_client(&mut cx_c, "user_c").await;
1194
1195        let fs = Arc::new(FakeFs::new());
1196
1197        // Share a worktree as client A.
1198        fs.insert_tree(
1199            "/a",
1200            json!({
1201                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1202                "file1": "",
1203                "file2": ""
1204            }),
1205        )
1206        .await;
1207
1208        let worktree_a = Worktree::open_local(
1209            client_a.clone(),
1210            client_a.user_store.clone(),
1211            "/a".as_ref(),
1212            fs.clone(),
1213            lang_registry.clone(),
1214            &mut cx_a.to_async(),
1215        )
1216        .await
1217        .unwrap();
1218        worktree_a
1219            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1220            .await;
1221        let worktree_id = worktree_a
1222            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1223            .await
1224            .unwrap();
1225
1226        // Join that worktree as clients B and C.
1227        let worktree_b = Worktree::open_remote(
1228            client_b.clone(),
1229            worktree_id,
1230            lang_registry.clone(),
1231            client_b.user_store.clone(),
1232            &mut cx_b.to_async(),
1233        )
1234        .await
1235        .unwrap();
1236        let worktree_c = Worktree::open_remote(
1237            client_c.clone(),
1238            worktree_id,
1239            lang_registry.clone(),
1240            client_c.user_store.clone(),
1241            &mut cx_c.to_async(),
1242        )
1243        .await
1244        .unwrap();
1245
1246        // Open and edit a buffer as both guests B and C.
1247        let buffer_b = worktree_b
1248            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1249            .await
1250            .unwrap();
1251        let buffer_c = worktree_c
1252            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1253            .await
1254            .unwrap();
1255        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1256        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1257
1258        // Open and edit that buffer as the host.
1259        let buffer_a = worktree_a
1260            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1261            .await
1262            .unwrap();
1263
1264        buffer_a
1265            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1266            .await;
1267        buffer_a.update(&mut cx_a, |buf, cx| {
1268            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1269        });
1270
1271        // Wait for edits to propagate
1272        buffer_a
1273            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1274            .await;
1275        buffer_b
1276            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1277            .await;
1278        buffer_c
1279            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1280            .await;
1281
1282        // Edit the buffer as the host and concurrently save as guest B.
1283        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1284        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1285        save_b.await.unwrap();
1286        assert_eq!(
1287            fs.load("/a/file1".as_ref()).await.unwrap(),
1288            "hi-a, i-am-c, i-am-b, i-am-a"
1289        );
1290        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1291        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1292        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1293
1294        // Make changes on host's file system, see those changes on the guests.
1295        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1296            .await
1297            .unwrap();
1298        fs.insert_file(Path::new("/a/file4"), "4".into())
1299            .await
1300            .unwrap();
1301
1302        worktree_b
1303            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1304            .await;
1305        worktree_c
1306            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1307            .await;
1308        worktree_b.read_with(&cx_b, |tree, _| {
1309            assert_eq!(
1310                tree.paths()
1311                    .map(|p| p.to_string_lossy())
1312                    .collect::<Vec<_>>(),
1313                &[".zed.toml", "file1", "file3", "file4"]
1314            )
1315        });
1316        worktree_c.read_with(&cx_c, |tree, _| {
1317            assert_eq!(
1318                tree.paths()
1319                    .map(|p| p.to_string_lossy())
1320                    .collect::<Vec<_>>(),
1321                &[".zed.toml", "file1", "file3", "file4"]
1322            )
1323        });
1324    }
1325
1326    #[gpui::test]
1327    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1328        cx_a.foreground().forbid_parking();
1329        let lang_registry = Arc::new(LanguageRegistry::new());
1330
1331        // Connect to a server as 2 clients.
1332        let mut server = TestServer::start().await;
1333        let client_a = server.create_client(&mut cx_a, "user_a").await;
1334        let client_b = server.create_client(&mut cx_b, "user_b").await;
1335
1336        // Share a local worktree as client A
1337        let fs = Arc::new(FakeFs::new());
1338        fs.insert_tree(
1339            "/dir",
1340            json!({
1341                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1342                "a.txt": "a-contents",
1343            }),
1344        )
1345        .await;
1346
1347        let worktree_a = Worktree::open_local(
1348            client_a.clone(),
1349            client_a.user_store.clone(),
1350            "/dir".as_ref(),
1351            fs,
1352            lang_registry.clone(),
1353            &mut cx_a.to_async(),
1354        )
1355        .await
1356        .unwrap();
1357        worktree_a
1358            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1359            .await;
1360        let worktree_id = worktree_a
1361            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1362            .await
1363            .unwrap();
1364
1365        // Join that worktree as client B, and see that a guest has joined as client A.
1366        let worktree_b = Worktree::open_remote(
1367            client_b.clone(),
1368            worktree_id,
1369            lang_registry.clone(),
1370            client_b.user_store.clone(),
1371            &mut cx_b.to_async(),
1372        )
1373        .await
1374        .unwrap();
1375
1376        let buffer_b = worktree_b
1377            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1378            .await
1379            .unwrap();
1380        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1381
1382        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1383        buffer_b.read_with(&cx_b, |buf, _| {
1384            assert!(buf.is_dirty());
1385            assert!(!buf.has_conflict());
1386        });
1387
1388        buffer_b
1389            .update(&mut cx_b, |buf, cx| buf.save(cx))
1390            .unwrap()
1391            .await
1392            .unwrap();
1393        worktree_b
1394            .condition(&cx_b, |_, cx| {
1395                buffer_b.read(cx).file().unwrap().mtime() != mtime
1396            })
1397            .await;
1398        buffer_b.read_with(&cx_b, |buf, _| {
1399            assert!(!buf.is_dirty());
1400            assert!(!buf.has_conflict());
1401        });
1402
1403        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1404        buffer_b.read_with(&cx_b, |buf, _| {
1405            assert!(buf.is_dirty());
1406            assert!(!buf.has_conflict());
1407        });
1408    }
1409
1410    #[gpui::test]
1411    async fn test_editing_while_guest_opens_buffer(
1412        mut cx_a: TestAppContext,
1413        mut cx_b: TestAppContext,
1414    ) {
1415        cx_a.foreground().forbid_parking();
1416        let lang_registry = Arc::new(LanguageRegistry::new());
1417
1418        // Connect to a server as 2 clients.
1419        let mut server = TestServer::start().await;
1420        let client_a = server.create_client(&mut cx_a, "user_a").await;
1421        let client_b = server.create_client(&mut cx_b, "user_b").await;
1422
1423        // Share a local worktree as client A
1424        let fs = Arc::new(FakeFs::new());
1425        fs.insert_tree(
1426            "/dir",
1427            json!({
1428                ".zed.toml": r#"collaborators = ["user_b"]"#,
1429                "a.txt": "a-contents",
1430            }),
1431        )
1432        .await;
1433        let worktree_a = Worktree::open_local(
1434            client_a.clone(),
1435            client_a.user_store.clone(),
1436            "/dir".as_ref(),
1437            fs,
1438            lang_registry.clone(),
1439            &mut cx_a.to_async(),
1440        )
1441        .await
1442        .unwrap();
1443        worktree_a
1444            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1445            .await;
1446        let worktree_id = worktree_a
1447            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1448            .await
1449            .unwrap();
1450
1451        // Join that worktree as client B, and see that a guest has joined as client A.
1452        let worktree_b = Worktree::open_remote(
1453            client_b.clone(),
1454            worktree_id,
1455            lang_registry.clone(),
1456            client_b.user_store.clone(),
1457            &mut cx_b.to_async(),
1458        )
1459        .await
1460        .unwrap();
1461
1462        let buffer_a = worktree_a
1463            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1464            .await
1465            .unwrap();
1466        let buffer_b = cx_b
1467            .background()
1468            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1469
1470        task::yield_now().await;
1471        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1472
1473        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1474        let buffer_b = buffer_b.await.unwrap();
1475        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1476    }
1477
1478    #[gpui::test]
1479    async fn test_leaving_worktree_while_opening_buffer(
1480        mut cx_a: TestAppContext,
1481        mut cx_b: TestAppContext,
1482    ) {
1483        cx_a.foreground().forbid_parking();
1484        let lang_registry = Arc::new(LanguageRegistry::new());
1485
1486        // Connect to a server as 2 clients.
1487        let mut server = TestServer::start().await;
1488        let client_a = server.create_client(&mut cx_a, "user_a").await;
1489        let client_b = server.create_client(&mut cx_b, "user_b").await;
1490
1491        // Share a local worktree as client A
1492        let fs = Arc::new(FakeFs::new());
1493        fs.insert_tree(
1494            "/dir",
1495            json!({
1496                ".zed.toml": r#"collaborators = ["user_b"]"#,
1497                "a.txt": "a-contents",
1498            }),
1499        )
1500        .await;
1501        let worktree_a = Worktree::open_local(
1502            client_a.clone(),
1503            client_a.user_store.clone(),
1504            "/dir".as_ref(),
1505            fs,
1506            lang_registry.clone(),
1507            &mut cx_a.to_async(),
1508        )
1509        .await
1510        .unwrap();
1511        worktree_a
1512            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1513            .await;
1514        let worktree_id = worktree_a
1515            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1516            .await
1517            .unwrap();
1518
1519        // Join that worktree as client B, and see that a guest has joined as client A.
1520        let worktree_b = Worktree::open_remote(
1521            client_b.clone(),
1522            worktree_id,
1523            lang_registry.clone(),
1524            client_b.user_store.clone(),
1525            &mut cx_b.to_async(),
1526        )
1527        .await
1528        .unwrap();
1529        worktree_a
1530            .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1531            .await;
1532
1533        let buffer_b = cx_b
1534            .background()
1535            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1536        cx_b.update(|_| drop(worktree_b));
1537        drop(buffer_b);
1538        worktree_a
1539            .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1540            .await;
1541    }
1542
1543    #[gpui::test]
1544    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1545        cx_a.foreground().forbid_parking();
1546        let lang_registry = Arc::new(LanguageRegistry::new());
1547
1548        // Connect to a server as 2 clients.
1549        let mut server = TestServer::start().await;
1550        let client_a = server.create_client(&mut cx_a, "user_a").await;
1551        let client_b = server.create_client(&mut cx_b, "user_b").await;
1552
1553        // Share a local worktree as client A
1554        let fs = Arc::new(FakeFs::new());
1555        fs.insert_tree(
1556            "/a",
1557            json!({
1558                ".zed.toml": r#"collaborators = ["user_b"]"#,
1559                "a.txt": "a-contents",
1560                "b.txt": "b-contents",
1561            }),
1562        )
1563        .await;
1564        let worktree_a = Worktree::open_local(
1565            client_a.clone(),
1566            client_a.user_store.clone(),
1567            "/a".as_ref(),
1568            fs,
1569            lang_registry.clone(),
1570            &mut cx_a.to_async(),
1571        )
1572        .await
1573        .unwrap();
1574        worktree_a
1575            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1576            .await;
1577        let worktree_id = worktree_a
1578            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1579            .await
1580            .unwrap();
1581
1582        // Join that worktree as client B, and see that a guest has joined as client A.
1583        let _worktree_b = Worktree::open_remote(
1584            client_b.clone(),
1585            worktree_id,
1586            lang_registry.clone(),
1587            client_b.user_store.clone(),
1588            &mut cx_b.to_async(),
1589        )
1590        .await
1591        .unwrap();
1592        worktree_a
1593            .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1594            .await;
1595
1596        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1597        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1598        worktree_a
1599            .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1600            .await;
1601    }
1602
1603    #[gpui::test]
1604    async fn test_collaborating_with_diagnostics(
1605        mut cx_a: TestAppContext,
1606        mut cx_b: TestAppContext,
1607    ) {
1608        cx_a.foreground().forbid_parking();
1609        let (language_server_config, mut fake_language_server) =
1610            LanguageServerConfig::fake(cx_a.background()).await;
1611        let mut lang_registry = LanguageRegistry::new();
1612        lang_registry.add(Arc::new(Language::new(
1613            LanguageConfig {
1614                name: "Rust".to_string(),
1615                path_suffixes: vec!["rs".to_string()],
1616                language_server: Some(language_server_config),
1617                ..Default::default()
1618            },
1619            tree_sitter_rust::language(),
1620        )));
1621
1622        let lang_registry = Arc::new(lang_registry);
1623
1624        // Connect to a server as 2 clients.
1625        let mut server = TestServer::start().await;
1626        let client_a = server.create_client(&mut cx_a, "user_a").await;
1627        let client_b = server.create_client(&mut cx_b, "user_b").await;
1628
1629        // Share a local worktree as client A
1630        let fs = Arc::new(FakeFs::new());
1631        fs.insert_tree(
1632            "/a",
1633            json!({
1634                ".zed.toml": r#"collaborators = ["user_b"]"#,
1635                "a.rs": "let one = two",
1636                "other.rs": "",
1637            }),
1638        )
1639        .await;
1640        let worktree_a = Worktree::open_local(
1641            client_a.clone(),
1642            client_a.user_store.clone(),
1643            "/a".as_ref(),
1644            fs,
1645            lang_registry.clone(),
1646            &mut cx_a.to_async(),
1647        )
1648        .await
1649        .unwrap();
1650        worktree_a
1651            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1652            .await;
1653        let worktree_id = worktree_a
1654            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1655            .await
1656            .unwrap();
1657
1658        // Cause language server to start.
1659        let _ = cx_a
1660            .background()
1661            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1662                worktree.open_buffer("other.rs", cx)
1663            }))
1664            .await
1665            .unwrap();
1666
1667        // Simulate a language server reporting errors for a file.
1668        fake_language_server
1669            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1670                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1671                version: None,
1672                diagnostics: vec![
1673                    lsp::Diagnostic {
1674                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1675                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1676                        message: "message 1".to_string(),
1677                        ..Default::default()
1678                    },
1679                    lsp::Diagnostic {
1680                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1681                        range: lsp::Range::new(
1682                            lsp::Position::new(0, 10),
1683                            lsp::Position::new(0, 13),
1684                        ),
1685                        message: "message 2".to_string(),
1686                        ..Default::default()
1687                    },
1688                ],
1689            })
1690            .await;
1691
1692        // Join the worktree as client B.
1693        let worktree_b = Worktree::open_remote(
1694            client_b.clone(),
1695            worktree_id,
1696            lang_registry.clone(),
1697            client_b.user_store.clone(),
1698            &mut cx_b.to_async(),
1699        )
1700        .await
1701        .unwrap();
1702
1703        // Open the file with the errors.
1704        let buffer_b = cx_b
1705            .background()
1706            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1707            .await
1708            .unwrap();
1709
1710        buffer_b.read_with(&cx_b, |buffer, _| {
1711            assert_eq!(
1712                buffer
1713                    .diagnostics_in_range(0..buffer.len())
1714                    .collect::<Vec<_>>(),
1715                &[
1716                    (
1717                        Point::new(0, 4)..Point::new(0, 7),
1718                        &Diagnostic {
1719                            group_id: 0,
1720                            message: "message 1".to_string(),
1721                            severity: lsp::DiagnosticSeverity::ERROR,
1722                            is_primary: true
1723                        }
1724                    ),
1725                    (
1726                        Point::new(0, 10)..Point::new(0, 13),
1727                        &Diagnostic {
1728                            group_id: 1,
1729                            severity: lsp::DiagnosticSeverity::WARNING,
1730                            message: "message 2".to_string(),
1731                            is_primary: true
1732                        }
1733                    )
1734                ]
1735            );
1736        });
1737    }
1738
1739    #[gpui::test]
1740    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1741        cx_a.foreground().forbid_parking();
1742
1743        // Connect to a server as 2 clients.
1744        let mut server = TestServer::start().await;
1745        let client_a = server.create_client(&mut cx_a, "user_a").await;
1746        let client_b = server.create_client(&mut cx_b, "user_b").await;
1747
1748        // Create an org that includes these 2 users.
1749        let db = &server.app_state.db;
1750        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1751        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1752            .await
1753            .unwrap();
1754        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1755            .await
1756            .unwrap();
1757
1758        // Create a channel that includes all the users.
1759        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1760        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1761            .await
1762            .unwrap();
1763        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1764            .await
1765            .unwrap();
1766        db.create_channel_message(
1767            channel_id,
1768            client_b.current_user_id(&cx_b),
1769            "hello A, it's B.",
1770            OffsetDateTime::now_utc(),
1771            1,
1772        )
1773        .await
1774        .unwrap();
1775
1776        let channels_a = cx_a
1777            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1778        channels_a
1779            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1780            .await;
1781        channels_a.read_with(&cx_a, |list, _| {
1782            assert_eq!(
1783                list.available_channels().unwrap(),
1784                &[ChannelDetails {
1785                    id: channel_id.to_proto(),
1786                    name: "test-channel".to_string()
1787                }]
1788            )
1789        });
1790        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1791            this.get_channel(channel_id.to_proto(), cx).unwrap()
1792        });
1793        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1794        channel_a
1795            .condition(&cx_a, |channel, _| {
1796                channel_messages(channel)
1797                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1798            })
1799            .await;
1800
1801        let channels_b = cx_b
1802            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1803        channels_b
1804            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1805            .await;
1806        channels_b.read_with(&cx_b, |list, _| {
1807            assert_eq!(
1808                list.available_channels().unwrap(),
1809                &[ChannelDetails {
1810                    id: channel_id.to_proto(),
1811                    name: "test-channel".to_string()
1812                }]
1813            )
1814        });
1815
1816        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1817            this.get_channel(channel_id.to_proto(), cx).unwrap()
1818        });
1819        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1820        channel_b
1821            .condition(&cx_b, |channel, _| {
1822                channel_messages(channel)
1823                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1824            })
1825            .await;
1826
1827        channel_a
1828            .update(&mut cx_a, |channel, cx| {
1829                channel
1830                    .send_message("oh, hi B.".to_string(), cx)
1831                    .unwrap()
1832                    .detach();
1833                let task = channel.send_message("sup".to_string(), cx).unwrap();
1834                assert_eq!(
1835                    channel_messages(channel),
1836                    &[
1837                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1838                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1839                        ("user_a".to_string(), "sup".to_string(), true)
1840                    ]
1841                );
1842                task
1843            })
1844            .await
1845            .unwrap();
1846
1847        channel_b
1848            .condition(&cx_b, |channel, _| {
1849                channel_messages(channel)
1850                    == [
1851                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1852                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1853                        ("user_a".to_string(), "sup".to_string(), false),
1854                    ]
1855            })
1856            .await;
1857
1858        assert_eq!(
1859            server
1860                .state()
1861                .await
1862                .channel(channel_id)
1863                .unwrap()
1864                .connection_ids
1865                .len(),
1866            2
1867        );
1868        cx_b.update(|_| drop(channel_b));
1869        server
1870            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1871            .await;
1872
1873        cx_a.update(|_| drop(channel_a));
1874        server
1875            .condition(|state| state.channel(channel_id).is_none())
1876            .await;
1877    }
1878
1879    #[gpui::test]
1880    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1881        cx_a.foreground().forbid_parking();
1882
1883        let mut server = TestServer::start().await;
1884        let client_a = server.create_client(&mut cx_a, "user_a").await;
1885
1886        let db = &server.app_state.db;
1887        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1888        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1889        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1890            .await
1891            .unwrap();
1892        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1893            .await
1894            .unwrap();
1895
1896        let channels_a = cx_a
1897            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1898        channels_a
1899            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1900            .await;
1901        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1902            this.get_channel(channel_id.to_proto(), cx).unwrap()
1903        });
1904
1905        // Messages aren't allowed to be too long.
1906        channel_a
1907            .update(&mut cx_a, |channel, cx| {
1908                let long_body = "this is long.\n".repeat(1024);
1909                channel.send_message(long_body, cx).unwrap()
1910            })
1911            .await
1912            .unwrap_err();
1913
1914        // Messages aren't allowed to be blank.
1915        channel_a.update(&mut cx_a, |channel, cx| {
1916            channel.send_message(String::new(), cx).unwrap_err()
1917        });
1918
1919        // Leading and trailing whitespace are trimmed.
1920        channel_a
1921            .update(&mut cx_a, |channel, cx| {
1922                channel
1923                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
1924                    .unwrap()
1925            })
1926            .await
1927            .unwrap();
1928        assert_eq!(
1929            db.get_channel_messages(channel_id, 10, None)
1930                .await
1931                .unwrap()
1932                .iter()
1933                .map(|m| &m.body)
1934                .collect::<Vec<_>>(),
1935            &["surrounded by whitespace"]
1936        );
1937    }
1938
1939    #[gpui::test]
1940    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1941        cx_a.foreground().forbid_parking();
1942
1943        // Connect to a server as 2 clients.
1944        let mut server = TestServer::start().await;
1945        let client_a = server.create_client(&mut cx_a, "user_a").await;
1946        let client_b = server.create_client(&mut cx_b, "user_b").await;
1947        let mut status_b = client_b.status();
1948
1949        // Create an org that includes these 2 users.
1950        let db = &server.app_state.db;
1951        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1952        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1953            .await
1954            .unwrap();
1955        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1956            .await
1957            .unwrap();
1958
1959        // Create a channel that includes all the users.
1960        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1961        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1962            .await
1963            .unwrap();
1964        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1965            .await
1966            .unwrap();
1967        db.create_channel_message(
1968            channel_id,
1969            client_b.current_user_id(&cx_b),
1970            "hello A, it's B.",
1971            OffsetDateTime::now_utc(),
1972            2,
1973        )
1974        .await
1975        .unwrap();
1976
1977        let channels_a = cx_a
1978            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1979        channels_a
1980            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1981            .await;
1982
1983        channels_a.read_with(&cx_a, |list, _| {
1984            assert_eq!(
1985                list.available_channels().unwrap(),
1986                &[ChannelDetails {
1987                    id: channel_id.to_proto(),
1988                    name: "test-channel".to_string()
1989                }]
1990            )
1991        });
1992        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1993            this.get_channel(channel_id.to_proto(), cx).unwrap()
1994        });
1995        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1996        channel_a
1997            .condition(&cx_a, |channel, _| {
1998                channel_messages(channel)
1999                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2000            })
2001            .await;
2002
2003        let channels_b = cx_b
2004            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2005        channels_b
2006            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2007            .await;
2008        channels_b.read_with(&cx_b, |list, _| {
2009            assert_eq!(
2010                list.available_channels().unwrap(),
2011                &[ChannelDetails {
2012                    id: channel_id.to_proto(),
2013                    name: "test-channel".to_string()
2014                }]
2015            )
2016        });
2017
2018        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2019            this.get_channel(channel_id.to_proto(), cx).unwrap()
2020        });
2021        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2022        channel_b
2023            .condition(&cx_b, |channel, _| {
2024                channel_messages(channel)
2025                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2026            })
2027            .await;
2028
2029        // Disconnect client B, ensuring we can still access its cached channel data.
2030        server.forbid_connections();
2031        server.disconnect_client(client_b.current_user_id(&cx_b));
2032        while !matches!(
2033            status_b.recv().await,
2034            Some(client::Status::ReconnectionError { .. })
2035        ) {}
2036
2037        channels_b.read_with(&cx_b, |channels, _| {
2038            assert_eq!(
2039                channels.available_channels().unwrap(),
2040                [ChannelDetails {
2041                    id: channel_id.to_proto(),
2042                    name: "test-channel".to_string()
2043                }]
2044            )
2045        });
2046        channel_b.read_with(&cx_b, |channel, _| {
2047            assert_eq!(
2048                channel_messages(channel),
2049                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2050            )
2051        });
2052
2053        // Send a message from client B while it is disconnected.
2054        channel_b
2055            .update(&mut cx_b, |channel, cx| {
2056                let task = channel
2057                    .send_message("can you see this?".to_string(), cx)
2058                    .unwrap();
2059                assert_eq!(
2060                    channel_messages(channel),
2061                    &[
2062                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2063                        ("user_b".to_string(), "can you see this?".to_string(), true)
2064                    ]
2065                );
2066                task
2067            })
2068            .await
2069            .unwrap_err();
2070
2071        // Send a message from client A while B is disconnected.
2072        channel_a
2073            .update(&mut cx_a, |channel, cx| {
2074                channel
2075                    .send_message("oh, hi B.".to_string(), cx)
2076                    .unwrap()
2077                    .detach();
2078                let task = channel.send_message("sup".to_string(), cx).unwrap();
2079                assert_eq!(
2080                    channel_messages(channel),
2081                    &[
2082                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2083                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2084                        ("user_a".to_string(), "sup".to_string(), true)
2085                    ]
2086                );
2087                task
2088            })
2089            .await
2090            .unwrap();
2091
2092        // Give client B a chance to reconnect.
2093        server.allow_connections();
2094        cx_b.foreground().advance_clock(Duration::from_secs(10));
2095
2096        // Verify that B sees the new messages upon reconnection, as well as the message client B
2097        // sent while offline.
2098        channel_b
2099            .condition(&cx_b, |channel, _| {
2100                channel_messages(channel)
2101                    == [
2102                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2103                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2104                        ("user_a".to_string(), "sup".to_string(), false),
2105                        ("user_b".to_string(), "can you see this?".to_string(), false),
2106                    ]
2107            })
2108            .await;
2109
2110        // Ensure client A and B can communicate normally after reconnection.
2111        channel_a
2112            .update(&mut cx_a, |channel, cx| {
2113                channel.send_message("you online?".to_string(), cx).unwrap()
2114            })
2115            .await
2116            .unwrap();
2117        channel_b
2118            .condition(&cx_b, |channel, _| {
2119                channel_messages(channel)
2120                    == [
2121                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2122                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2123                        ("user_a".to_string(), "sup".to_string(), false),
2124                        ("user_b".to_string(), "can you see this?".to_string(), false),
2125                        ("user_a".to_string(), "you online?".to_string(), false),
2126                    ]
2127            })
2128            .await;
2129
2130        channel_b
2131            .update(&mut cx_b, |channel, cx| {
2132                channel.send_message("yep".to_string(), cx).unwrap()
2133            })
2134            .await
2135            .unwrap();
2136        channel_a
2137            .condition(&cx_a, |channel, _| {
2138                channel_messages(channel)
2139                    == [
2140                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2141                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2142                        ("user_a".to_string(), "sup".to_string(), false),
2143                        ("user_b".to_string(), "can you see this?".to_string(), false),
2144                        ("user_a".to_string(), "you online?".to_string(), false),
2145                        ("user_b".to_string(), "yep".to_string(), false),
2146                    ]
2147            })
2148            .await;
2149    }
2150
2151    #[gpui::test]
2152    async fn test_contacts(
2153        mut cx_a: TestAppContext,
2154        mut cx_b: TestAppContext,
2155        mut cx_c: TestAppContext,
2156    ) {
2157        cx_a.foreground().forbid_parking();
2158        let lang_registry = Arc::new(LanguageRegistry::new());
2159
2160        // Connect to a server as 3 clients.
2161        let mut server = TestServer::start().await;
2162        let client_a = server.create_client(&mut cx_a, "user_a").await;
2163        let client_b = server.create_client(&mut cx_b, "user_b").await;
2164        let client_c = server.create_client(&mut cx_c, "user_c").await;
2165
2166        let fs = Arc::new(FakeFs::new());
2167
2168        // Share a worktree as client A.
2169        fs.insert_tree(
2170            "/a",
2171            json!({
2172                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2173            }),
2174        )
2175        .await;
2176
2177        let worktree_a = Worktree::open_local(
2178            client_a.clone(),
2179            client_a.user_store.clone(),
2180            "/a".as_ref(),
2181            fs.clone(),
2182            lang_registry.clone(),
2183            &mut cx_a.to_async(),
2184        )
2185        .await
2186        .unwrap();
2187
2188        client_a
2189            .user_store
2190            .condition(&cx_a, |user_store, _| {
2191                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2192            })
2193            .await;
2194        client_b
2195            .user_store
2196            .condition(&cx_b, |user_store, _| {
2197                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2198            })
2199            .await;
2200        client_c
2201            .user_store
2202            .condition(&cx_c, |user_store, _| {
2203                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2204            })
2205            .await;
2206
2207        let worktree_id = worktree_a
2208            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2209            .await
2210            .unwrap();
2211
2212        let _worktree_b = Worktree::open_remote(
2213            client_b.clone(),
2214            worktree_id,
2215            lang_registry.clone(),
2216            client_b.user_store.clone(),
2217            &mut cx_b.to_async(),
2218        )
2219        .await
2220        .unwrap();
2221
2222        client_a
2223            .user_store
2224            .condition(&cx_a, |user_store, _| {
2225                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2226            })
2227            .await;
2228        client_b
2229            .user_store
2230            .condition(&cx_b, |user_store, _| {
2231                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2232            })
2233            .await;
2234        client_c
2235            .user_store
2236            .condition(&cx_c, |user_store, _| {
2237                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2238            })
2239            .await;
2240
2241        cx_a.update(move |_| drop(worktree_a));
2242        client_a
2243            .user_store
2244            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2245            .await;
2246        client_b
2247            .user_store
2248            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2249            .await;
2250        client_c
2251            .user_store
2252            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2253            .await;
2254
2255        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2256            user_store
2257                .contacts()
2258                .iter()
2259                .map(|contact| {
2260                    let worktrees = contact
2261                        .worktrees
2262                        .iter()
2263                        .map(|w| {
2264                            (
2265                                w.root_name.as_str(),
2266                                w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2267                            )
2268                        })
2269                        .collect();
2270                    (contact.user.github_login.as_str(), worktrees)
2271                })
2272                .collect()
2273        }
2274    }
2275
2276    struct TestServer {
2277        peer: Arc<Peer>,
2278        app_state: Arc<AppState>,
2279        server: Arc<Server>,
2280        notifications: mpsc::Receiver<()>,
2281        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2282        forbid_connections: Arc<AtomicBool>,
2283        _test_db: TestDb,
2284    }
2285
2286    impl TestServer {
2287        async fn start() -> Self {
2288            let test_db = TestDb::new();
2289            let app_state = Self::build_app_state(&test_db).await;
2290            let peer = Peer::new();
2291            let notifications = mpsc::channel(128);
2292            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2293            Self {
2294                peer,
2295                app_state,
2296                server,
2297                notifications: notifications.1,
2298                connection_killers: Default::default(),
2299                forbid_connections: Default::default(),
2300                _test_db: test_db,
2301            }
2302        }
2303
2304        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2305            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2306            let client_name = name.to_string();
2307            let mut client = Client::new();
2308            let server = self.server.clone();
2309            let connection_killers = self.connection_killers.clone();
2310            let forbid_connections = self.forbid_connections.clone();
2311            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2312
2313            Arc::get_mut(&mut client)
2314                .unwrap()
2315                .override_authenticate(move |cx| {
2316                    cx.spawn(|_| async move {
2317                        let access_token = "the-token".to_string();
2318                        Ok(Credentials {
2319                            user_id: user_id.0 as u64,
2320                            access_token,
2321                        })
2322                    })
2323                })
2324                .override_establish_connection(move |credentials, cx| {
2325                    assert_eq!(credentials.user_id, user_id.0 as u64);
2326                    assert_eq!(credentials.access_token, "the-token");
2327
2328                    let server = server.clone();
2329                    let connection_killers = connection_killers.clone();
2330                    let forbid_connections = forbid_connections.clone();
2331                    let client_name = client_name.clone();
2332                    let connection_id_tx = connection_id_tx.clone();
2333                    cx.spawn(move |cx| async move {
2334                        if forbid_connections.load(SeqCst) {
2335                            Err(EstablishConnectionError::other(anyhow!(
2336                                "server is forbidding connections"
2337                            )))
2338                        } else {
2339                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2340                            connection_killers.lock().insert(user_id, kill_conn);
2341                            cx.background()
2342                                .spawn(server.handle_connection(
2343                                    server_conn,
2344                                    client_name,
2345                                    user_id,
2346                                    Some(connection_id_tx),
2347                                ))
2348                                .detach();
2349                            Ok(client_conn)
2350                        }
2351                    })
2352                });
2353
2354            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2355            client
2356                .authenticate_and_connect(&cx.to_async())
2357                .await
2358                .unwrap();
2359
2360            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2361            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2362            let mut authed_user =
2363                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2364            while authed_user.recv().await.unwrap().is_none() {}
2365
2366            TestClient {
2367                client,
2368                peer_id,
2369                user_store,
2370            }
2371        }
2372
2373        fn disconnect_client(&self, user_id: UserId) {
2374            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2375                let _ = kill_conn.try_send(Some(()));
2376            }
2377        }
2378
2379        fn forbid_connections(&self) {
2380            self.forbid_connections.store(true, SeqCst);
2381        }
2382
2383        fn allow_connections(&self) {
2384            self.forbid_connections.store(false, SeqCst);
2385        }
2386
2387        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2388            let mut config = Config::default();
2389            config.session_secret = "a".repeat(32);
2390            config.database_url = test_db.url.clone();
2391            let github_client = github::AppClient::test();
2392            Arc::new(AppState {
2393                db: test_db.db().clone(),
2394                handlebars: Default::default(),
2395                auth_client: auth::build_client("", ""),
2396                repo_client: github::RepoClient::test(&github_client),
2397                github_client,
2398                config,
2399            })
2400        }
2401
2402        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2403            self.server.store.read()
2404        }
2405
2406        async fn condition<F>(&mut self, mut predicate: F)
2407        where
2408            F: FnMut(&Store) -> bool,
2409        {
2410            async_std::future::timeout(Duration::from_millis(500), async {
2411                while !(predicate)(&*self.server.store.read()) {
2412                    self.notifications.recv().await;
2413                }
2414            })
2415            .await
2416            .expect("condition timed out");
2417        }
2418    }
2419
2420    impl Drop for TestServer {
2421        fn drop(&mut self) {
2422            task::block_on(self.peer.reset());
2423        }
2424    }
2425
2426    struct TestClient {
2427        client: Arc<Client>,
2428        pub peer_id: PeerId,
2429        pub user_store: ModelHandle<UserStore>,
2430    }
2431
2432    impl Deref for TestClient {
2433        type Target = Arc<Client>;
2434
2435        fn deref(&self) -> &Self::Target {
2436            &self.client
2437        }
2438    }
2439
2440    impl TestClient {
2441        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2442            UserId::from_proto(
2443                self.user_store
2444                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2445            )
2446        }
2447    }
2448
2449    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2450        channel
2451            .messages()
2452            .cursor::<()>()
2453            .map(|m| {
2454                (
2455                    m.sender.github_login.clone(),
2456                    m.body.clone(),
2457                    m.is_pending(),
2458                )
2459            })
2460            .collect()
2461    }
2462
2463    struct EmptyView;
2464
2465    impl gpui::Entity for EmptyView {
2466        type Event = ();
2467    }
2468
2469    impl gpui::View for EmptyView {
2470        fn ui_name() -> &'static str {
2471            "empty view"
2472        }
2473
2474        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2475            gpui::Element::boxed(gpui::elements::Empty)
2476        }
2477    }
2478}