rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use futures::{future::BoxFuture, FutureExt};
  12use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  13use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{
  20    any::TypeId,
  21    collections::{HashMap, HashSet},
  22    future::Future,
  23    mem,
  24    sync::Arc,
  25    time::Instant,
  26};
  27use store::{Store, Worktree};
  28use surf::StatusCode;
  29use tide::log;
  30use tide::{
  31    http::headers::{HeaderName, CONNECTION, UPGRADE},
  32    Request, Response,
  33};
  34use time::OffsetDateTime;
  35
  36type MessageHandler = Box<
  37    dyn Send
  38        + Sync
  39        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  40>;
  41
  42pub struct Server {
  43    peer: Arc<Peer>,
  44    store: RwLock<Store>,
  45    app_state: Arc<AppState>,
  46    handlers: HashMap<TypeId, MessageHandler>,
  47    notifications: Option<mpsc::Sender<()>>,
  48}
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_handler(Server::ping)
  69            .add_handler(Server::open_worktree)
  70            .add_handler(Server::close_worktree)
  71            .add_handler(Server::share_worktree)
  72            .add_handler(Server::unshare_worktree)
  73            .add_handler(Server::join_worktree)
  74            .add_handler(Server::leave_worktree)
  75            .add_handler(Server::update_worktree)
  76            .add_handler(Server::open_buffer)
  77            .add_handler(Server::close_buffer)
  78            .add_handler(Server::update_buffer)
  79            .add_handler(Server::buffer_saved)
  80            .add_handler(Server::save_buffer)
  81            .add_handler(Server::get_channels)
  82            .add_handler(Server::get_users)
  83            .add_handler(Server::join_channel)
  84            .add_handler(Server::leave_channel)
  85            .add_handler(Server::send_channel_message)
  86            .add_handler(Server::get_channel_messages);
  87
  88        Arc::new(server)
  89    }
  90
  91    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  92    where
  93        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  94        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  95        M: EnvelopedMessage,
  96    {
  97        let prev_handler = self.handlers.insert(
  98            TypeId::of::<M>(),
  99            Box::new(move |server, envelope| {
 100                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 101                (handler)(server, *envelope).boxed()
 102            }),
 103        );
 104        if prev_handler.is_some() {
 105            panic!("registered a handler for the same message twice");
 106        }
 107        self
 108    }
 109
 110    pub fn handle_connection(
 111        self: &Arc<Self>,
 112        connection: Connection,
 113        addr: String,
 114        user_id: UserId,
 115    ) -> impl Future<Output = ()> {
 116        let mut this = self.clone();
 117        async move {
 118            let (connection_id, handle_io, mut incoming_rx) =
 119                this.peer.add_connection(connection).await;
 120            this.state_mut().add_connection(connection_id, user_id);
 121            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 122                log::error!("error updating contacts for {:?}: {}", user_id, err);
 123            }
 124
 125            let handle_io = handle_io.fuse();
 126            futures::pin_mut!(handle_io);
 127            loop {
 128                let next_message = incoming_rx.recv().fuse();
 129                futures::pin_mut!(next_message);
 130                futures::select_biased! {
 131                    message = next_message => {
 132                        if let Some(message) = message {
 133                            let start_time = Instant::now();
 134                            log::info!("RPC message received: {}", message.payload_type_name());
 135                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 136                                if let Err(err) = (handler)(this.clone(), message).await {
 137                                    log::error!("error handling message: {:?}", err);
 138                                } else {
 139                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 140                                }
 141
 142                                if let Some(mut notifications) = this.notifications.clone() {
 143                                    let _ = notifications.send(()).await;
 144                                }
 145                            } else {
 146                                log::warn!("unhandled message: {}", message.payload_type_name());
 147                            }
 148                        } else {
 149                            log::info!("rpc connection closed {:?}", addr);
 150                            break;
 151                        }
 152                    }
 153                    handle_io = handle_io => {
 154                        if let Err(err) = handle_io {
 155                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 156                        }
 157                        break;
 158                    }
 159                }
 160            }
 161
 162            if let Err(err) = this.sign_out(connection_id).await {
 163                log::error!("error signing out connection {:?} - {:?}", addr, err);
 164            }
 165        }
 166    }
 167
 168    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 169        self.peer.disconnect(connection_id).await;
 170        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 171
 172        for (worktree_id, worktree) in removed_connection.hosted_worktrees {
 173            if let Some(share) = worktree.share {
 174                broadcast(
 175                    connection_id,
 176                    share.guests.keys().copied().collect(),
 177                    |conn_id| {
 178                        self.peer
 179                            .send(conn_id, proto::UnshareWorktree { worktree_id })
 180                    },
 181                )
 182                .await?;
 183            }
 184        }
 185
 186        for (worktree_id, peer_ids) in removed_connection.guest_worktree_ids {
 187            broadcast(connection_id, peer_ids, |conn_id| {
 188                self.peer.send(
 189                    conn_id,
 190                    proto::RemovePeer {
 191                        worktree_id,
 192                        peer_id: connection_id.0,
 193                    },
 194                )
 195            })
 196            .await?;
 197        }
 198
 199        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 200            .await?;
 201
 202        Ok(())
 203    }
 204
 205    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 206        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 207        Ok(())
 208    }
 209
 210    async fn open_worktree(
 211        mut self: Arc<Server>,
 212        request: TypedEnvelope<proto::OpenWorktree>,
 213    ) -> tide::Result<()> {
 214        let receipt = request.receipt();
 215        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 216
 217        let mut contact_user_ids = HashSet::new();
 218        contact_user_ids.insert(host_user_id);
 219        for github_login in request.payload.collaborator_logins {
 220            match self.app_state.db.create_user(&github_login, false).await {
 221                Ok(contact_user_id) => {
 222                    contact_user_ids.insert(contact_user_id);
 223                }
 224                Err(err) => {
 225                    let message = err.to_string();
 226                    self.peer
 227                        .respond_with_error(receipt, proto::Error { message })
 228                        .await?;
 229                    return Ok(());
 230                }
 231            }
 232        }
 233
 234        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 235        let worktree_id = self.state_mut().add_worktree(Worktree {
 236            host_connection_id: request.sender_id,
 237            host_user_id,
 238            contact_user_ids: contact_user_ids.clone(),
 239            root_name: request.payload.root_name,
 240            share: None,
 241        });
 242
 243        self.peer
 244            .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
 245            .await?;
 246        self.update_contacts_for_users(&contact_user_ids).await?;
 247
 248        Ok(())
 249    }
 250
 251    async fn close_worktree(
 252        mut self: Arc<Server>,
 253        request: TypedEnvelope<proto::CloseWorktree>,
 254    ) -> tide::Result<()> {
 255        let worktree_id = request.payload.worktree_id;
 256        let worktree = self
 257            .state_mut()
 258            .remove_worktree(worktree_id, request.sender_id)?;
 259
 260        if let Some(share) = worktree.share {
 261            broadcast(
 262                request.sender_id,
 263                share.guests.keys().copied().collect(),
 264                |conn_id| {
 265                    self.peer
 266                        .send(conn_id, proto::UnshareWorktree { worktree_id })
 267                },
 268            )
 269            .await?;
 270        }
 271        self.update_contacts_for_users(&worktree.contact_user_ids)
 272            .await?;
 273        Ok(())
 274    }
 275
 276    async fn share_worktree(
 277        mut self: Arc<Server>,
 278        mut request: TypedEnvelope<proto::ShareWorktree>,
 279    ) -> tide::Result<()> {
 280        let worktree = request
 281            .payload
 282            .worktree
 283            .as_mut()
 284            .ok_or_else(|| anyhow!("missing worktree"))?;
 285        let entries = mem::take(&mut worktree.entries)
 286            .into_iter()
 287            .map(|entry| (entry.id, entry))
 288            .collect();
 289
 290        let contact_user_ids =
 291            self.state_mut()
 292                .share_worktree(worktree.id, request.sender_id, entries);
 293        if let Some(contact_user_ids) = contact_user_ids {
 294            self.peer
 295                .respond(request.receipt(), proto::ShareWorktreeResponse {})
 296                .await?;
 297            self.update_contacts_for_users(&contact_user_ids).await?;
 298        } else {
 299            self.peer
 300                .respond_with_error(
 301                    request.receipt(),
 302                    proto::Error {
 303                        message: "no such worktree".to_string(),
 304                    },
 305                )
 306                .await?;
 307        }
 308        Ok(())
 309    }
 310
 311    async fn unshare_worktree(
 312        mut self: Arc<Server>,
 313        request: TypedEnvelope<proto::UnshareWorktree>,
 314    ) -> tide::Result<()> {
 315        let worktree_id = request.payload.worktree_id;
 316        let worktree = self
 317            .state_mut()
 318            .unshare_worktree(worktree_id, request.sender_id)?;
 319
 320        broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
 321            self.peer
 322                .send(conn_id, proto::UnshareWorktree { worktree_id })
 323        })
 324        .await?;
 325        self.update_contacts_for_users(&worktree.contact_ids)
 326            .await?;
 327
 328        Ok(())
 329    }
 330
 331    async fn join_worktree(
 332        mut self: Arc<Server>,
 333        request: TypedEnvelope<proto::JoinWorktree>,
 334    ) -> tide::Result<()> {
 335        let worktree_id = request.payload.worktree_id;
 336
 337        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 338        let response_data = self
 339            .state_mut()
 340            .join_worktree(request.sender_id, user_id, worktree_id)
 341            .and_then(|joined| {
 342                let share = joined.worktree.share()?;
 343                let peer_count = share.guests.len();
 344                let mut peers = Vec::with_capacity(peer_count);
 345                peers.push(proto::Peer {
 346                    peer_id: joined.worktree.host_connection_id.0,
 347                    replica_id: 0,
 348                    user_id: joined.worktree.host_user_id.to_proto(),
 349                });
 350                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 351                    if *peer_conn_id != request.sender_id {
 352                        peers.push(proto::Peer {
 353                            peer_id: peer_conn_id.0,
 354                            replica_id: *peer_replica_id as u32,
 355                            user_id: peer_user_id.to_proto(),
 356                        });
 357                    }
 358                }
 359                let response = proto::JoinWorktreeResponse {
 360                    worktree: Some(proto::Worktree {
 361                        id: worktree_id,
 362                        root_name: joined.worktree.root_name.clone(),
 363                        entries: share.entries.values().cloned().collect(),
 364                    }),
 365                    replica_id: joined.replica_id as u32,
 366                    peers,
 367                };
 368                let connection_ids = joined.worktree.connection_ids();
 369                let contact_user_ids = joined.worktree.contact_user_ids.clone();
 370                Ok((response, connection_ids, contact_user_ids))
 371            });
 372
 373        match response_data {
 374            Ok((response, connection_ids, contact_user_ids)) => {
 375                broadcast(request.sender_id, connection_ids, |conn_id| {
 376                    self.peer.send(
 377                        conn_id,
 378                        proto::AddPeer {
 379                            worktree_id,
 380                            peer: Some(proto::Peer {
 381                                peer_id: request.sender_id.0,
 382                                replica_id: response.replica_id,
 383                                user_id: user_id.to_proto(),
 384                            }),
 385                        },
 386                    )
 387                })
 388                .await?;
 389                self.peer.respond(request.receipt(), response).await?;
 390                self.update_contacts_for_users(&contact_user_ids).await?;
 391            }
 392            Err(error) => {
 393                self.peer
 394                    .respond_with_error(
 395                        request.receipt(),
 396                        proto::Error {
 397                            message: error.to_string(),
 398                        },
 399                    )
 400                    .await?;
 401            }
 402        }
 403
 404        Ok(())
 405    }
 406
 407    async fn leave_worktree(
 408        mut self: Arc<Server>,
 409        request: TypedEnvelope<proto::LeaveWorktree>,
 410    ) -> tide::Result<()> {
 411        let sender_id = request.sender_id;
 412        let worktree_id = request.payload.worktree_id;
 413        let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
 414        if let Some(worktree) = worktree {
 415            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 416                self.peer.send(
 417                    conn_id,
 418                    proto::RemovePeer {
 419                        worktree_id,
 420                        peer_id: sender_id.0,
 421                    },
 422                )
 423            })
 424            .await?;
 425            self.update_contacts_for_users(&worktree.contact_ids)
 426                .await?;
 427        }
 428        Ok(())
 429    }
 430
 431    async fn update_worktree(
 432        mut self: Arc<Server>,
 433        request: TypedEnvelope<proto::UpdateWorktree>,
 434    ) -> tide::Result<()> {
 435        let connection_ids = self.state_mut().update_worktree(
 436            request.sender_id,
 437            request.payload.worktree_id,
 438            &request.payload.removed_entries,
 439            &request.payload.updated_entries,
 440        )?;
 441
 442        broadcast(request.sender_id, connection_ids, |connection_id| {
 443            self.peer
 444                .forward_send(request.sender_id, connection_id, request.payload.clone())
 445        })
 446        .await?;
 447
 448        Ok(())
 449    }
 450
 451    async fn open_buffer(
 452        self: Arc<Server>,
 453        request: TypedEnvelope<proto::OpenBuffer>,
 454    ) -> tide::Result<()> {
 455        let receipt = request.receipt();
 456        let host_connection_id = self
 457            .state()
 458            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 459        let response = self
 460            .peer
 461            .forward_request(request.sender_id, host_connection_id, request.payload)
 462            .await?;
 463        self.peer.respond(receipt, response).await?;
 464        Ok(())
 465    }
 466
 467    async fn close_buffer(
 468        self: Arc<Server>,
 469        request: TypedEnvelope<proto::CloseBuffer>,
 470    ) -> tide::Result<()> {
 471        let host_connection_id = self
 472            .state()
 473            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 474        self.peer
 475            .forward_send(request.sender_id, host_connection_id, request.payload)
 476            .await?;
 477        Ok(())
 478    }
 479
 480    async fn save_buffer(
 481        self: Arc<Server>,
 482        request: TypedEnvelope<proto::SaveBuffer>,
 483    ) -> tide::Result<()> {
 484        let host;
 485        let guests;
 486        {
 487            let state = self.state();
 488            host = state
 489                .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 490            guests = state
 491                .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
 492        }
 493
 494        let sender = request.sender_id;
 495        let receipt = request.receipt();
 496        let response = self
 497            .peer
 498            .forward_request(sender, host, request.payload.clone())
 499            .await?;
 500
 501        broadcast(host, guests, |conn_id| {
 502            let response = response.clone();
 503            let peer = &self.peer;
 504            async move {
 505                if conn_id == sender {
 506                    peer.respond(receipt, response).await
 507                } else {
 508                    peer.forward_send(host, conn_id, response).await
 509                }
 510            }
 511        })
 512        .await?;
 513
 514        Ok(())
 515    }
 516
 517    async fn update_buffer(
 518        self: Arc<Server>,
 519        request: TypedEnvelope<proto::UpdateBuffer>,
 520    ) -> tide::Result<()> {
 521        let receiver_ids = self
 522            .state()
 523            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 524        broadcast(request.sender_id, receiver_ids, |connection_id| {
 525            self.peer
 526                .forward_send(request.sender_id, connection_id, request.payload.clone())
 527        })
 528        .await?;
 529        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 530        Ok(())
 531    }
 532
 533    async fn buffer_saved(
 534        self: Arc<Server>,
 535        request: TypedEnvelope<proto::BufferSaved>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = self
 538            .state()
 539            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 540        broadcast(request.sender_id, receiver_ids, |connection_id| {
 541            self.peer
 542                .forward_send(request.sender_id, connection_id, request.payload.clone())
 543        })
 544        .await?;
 545        Ok(())
 546    }
 547
 548    async fn get_channels(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<proto::GetChannels>,
 551    ) -> tide::Result<()> {
 552        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 553        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 554        self.peer
 555            .respond(
 556                request.receipt(),
 557                proto::GetChannelsResponse {
 558                    channels: channels
 559                        .into_iter()
 560                        .map(|chan| proto::Channel {
 561                            id: chan.id.to_proto(),
 562                            name: chan.name,
 563                        })
 564                        .collect(),
 565                },
 566            )
 567            .await?;
 568        Ok(())
 569    }
 570
 571    async fn get_users(
 572        self: Arc<Server>,
 573        request: TypedEnvelope<proto::GetUsers>,
 574    ) -> tide::Result<()> {
 575        let receipt = request.receipt();
 576        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 577        let users = self
 578            .app_state
 579            .db
 580            .get_users_by_ids(user_ids)
 581            .await?
 582            .into_iter()
 583            .map(|user| proto::User {
 584                id: user.id.to_proto(),
 585                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 586                github_login: user.github_login,
 587            })
 588            .collect();
 589        self.peer
 590            .respond(receipt, proto::GetUsersResponse { users })
 591            .await?;
 592        Ok(())
 593    }
 594
 595    async fn update_contacts_for_users<'a>(
 596        self: &Arc<Server>,
 597        user_ids: impl IntoIterator<Item = &'a UserId>,
 598    ) -> tide::Result<()> {
 599        let mut send_futures = Vec::new();
 600
 601        {
 602            let state = self.state();
 603            for user_id in user_ids {
 604                let contacts = state.contacts_for_user(*user_id);
 605                for connection_id in state.connection_ids_for_user(*user_id) {
 606                    send_futures.push(self.peer.send(
 607                        connection_id,
 608                        proto::UpdateContacts {
 609                            contacts: contacts.clone(),
 610                        },
 611                    ));
 612                }
 613            }
 614        }
 615        futures::future::try_join_all(send_futures).await?;
 616
 617        Ok(())
 618    }
 619
 620    async fn join_channel(
 621        mut self: Arc<Self>,
 622        request: TypedEnvelope<proto::JoinChannel>,
 623    ) -> tide::Result<()> {
 624        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 625        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 626        if !self
 627            .app_state
 628            .db
 629            .can_user_access_channel(user_id, channel_id)
 630            .await?
 631        {
 632            Err(anyhow!("access denied"))?;
 633        }
 634
 635        self.state_mut().join_channel(request.sender_id, channel_id);
 636        let messages = self
 637            .app_state
 638            .db
 639            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 640            .await?
 641            .into_iter()
 642            .map(|msg| proto::ChannelMessage {
 643                id: msg.id.to_proto(),
 644                body: msg.body,
 645                timestamp: msg.sent_at.unix_timestamp() as u64,
 646                sender_id: msg.sender_id.to_proto(),
 647                nonce: Some(msg.nonce.as_u128().into()),
 648            })
 649            .collect::<Vec<_>>();
 650        self.peer
 651            .respond(
 652                request.receipt(),
 653                proto::JoinChannelResponse {
 654                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 655                    messages,
 656                },
 657            )
 658            .await?;
 659        Ok(())
 660    }
 661
 662    async fn leave_channel(
 663        mut self: Arc<Self>,
 664        request: TypedEnvelope<proto::LeaveChannel>,
 665    ) -> tide::Result<()> {
 666        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 667        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 668        if !self
 669            .app_state
 670            .db
 671            .can_user_access_channel(user_id, channel_id)
 672            .await?
 673        {
 674            Err(anyhow!("access denied"))?;
 675        }
 676
 677        self.state_mut()
 678            .leave_channel(request.sender_id, channel_id);
 679
 680        Ok(())
 681    }
 682
 683    async fn send_channel_message(
 684        self: Arc<Self>,
 685        request: TypedEnvelope<proto::SendChannelMessage>,
 686    ) -> tide::Result<()> {
 687        let receipt = request.receipt();
 688        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 689        let user_id;
 690        let connection_ids;
 691        {
 692            let state = self.state();
 693            user_id = state.user_id_for_connection(request.sender_id)?;
 694            if let Some(ids) = state.channel_connection_ids(channel_id) {
 695                connection_ids = ids;
 696            } else {
 697                return Ok(());
 698            }
 699        }
 700
 701        // Validate the message body.
 702        let body = request.payload.body.trim().to_string();
 703        if body.len() > MAX_MESSAGE_LEN {
 704            self.peer
 705                .respond_with_error(
 706                    receipt,
 707                    proto::Error {
 708                        message: "message is too long".to_string(),
 709                    },
 710                )
 711                .await?;
 712            return Ok(());
 713        }
 714        if body.is_empty() {
 715            self.peer
 716                .respond_with_error(
 717                    receipt,
 718                    proto::Error {
 719                        message: "message can't be blank".to_string(),
 720                    },
 721                )
 722                .await?;
 723            return Ok(());
 724        }
 725
 726        let timestamp = OffsetDateTime::now_utc();
 727        let nonce = if let Some(nonce) = request.payload.nonce {
 728            nonce
 729        } else {
 730            self.peer
 731                .respond_with_error(
 732                    receipt,
 733                    proto::Error {
 734                        message: "nonce can't be blank".to_string(),
 735                    },
 736                )
 737                .await?;
 738            return Ok(());
 739        };
 740
 741        let message_id = self
 742            .app_state
 743            .db
 744            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 745            .await?
 746            .to_proto();
 747        let message = proto::ChannelMessage {
 748            sender_id: user_id.to_proto(),
 749            id: message_id,
 750            body,
 751            timestamp: timestamp.unix_timestamp() as u64,
 752            nonce: Some(nonce),
 753        };
 754        broadcast(request.sender_id, connection_ids, |conn_id| {
 755            self.peer.send(
 756                conn_id,
 757                proto::ChannelMessageSent {
 758                    channel_id: channel_id.to_proto(),
 759                    message: Some(message.clone()),
 760                },
 761            )
 762        })
 763        .await?;
 764        self.peer
 765            .respond(
 766                receipt,
 767                proto::SendChannelMessageResponse {
 768                    message: Some(message),
 769                },
 770            )
 771            .await?;
 772        Ok(())
 773    }
 774
 775    async fn get_channel_messages(
 776        self: Arc<Self>,
 777        request: TypedEnvelope<proto::GetChannelMessages>,
 778    ) -> tide::Result<()> {
 779        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 780        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 781        if !self
 782            .app_state
 783            .db
 784            .can_user_access_channel(user_id, channel_id)
 785            .await?
 786        {
 787            Err(anyhow!("access denied"))?;
 788        }
 789
 790        let messages = self
 791            .app_state
 792            .db
 793            .get_channel_messages(
 794                channel_id,
 795                MESSAGE_COUNT_PER_PAGE,
 796                Some(MessageId::from_proto(request.payload.before_message_id)),
 797            )
 798            .await?
 799            .into_iter()
 800            .map(|msg| proto::ChannelMessage {
 801                id: msg.id.to_proto(),
 802                body: msg.body,
 803                timestamp: msg.sent_at.unix_timestamp() as u64,
 804                sender_id: msg.sender_id.to_proto(),
 805                nonce: Some(msg.nonce.as_u128().into()),
 806            })
 807            .collect::<Vec<_>>();
 808        self.peer
 809            .respond(
 810                request.receipt(),
 811                proto::GetChannelMessagesResponse {
 812                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 813                    messages,
 814                },
 815            )
 816            .await?;
 817        Ok(())
 818    }
 819
 820    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 821        self.store.read()
 822    }
 823
 824    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 825        self.store.write()
 826    }
 827}
 828
 829pub async fn broadcast<F, T>(
 830    sender_id: ConnectionId,
 831    receiver_ids: Vec<ConnectionId>,
 832    mut f: F,
 833) -> anyhow::Result<()>
 834where
 835    F: FnMut(ConnectionId) -> T,
 836    T: Future<Output = anyhow::Result<()>>,
 837{
 838    let futures = receiver_ids
 839        .into_iter()
 840        .filter(|id| *id != sender_id)
 841        .map(|id| f(id));
 842    futures::future::try_join_all(futures).await?;
 843    Ok(())
 844}
 845
 846pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 847    let server = Server::new(app.state().clone(), rpc.clone(), None);
 848    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 849        let server = server.clone();
 850        async move {
 851            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 852
 853            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 854            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 855            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 856            let client_protocol_version: Option<u32> = request
 857                .header("X-Zed-Protocol-Version")
 858                .and_then(|v| v.as_str().parse().ok());
 859
 860            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 861                return Ok(Response::new(StatusCode::UpgradeRequired));
 862            }
 863
 864            let header = match request.header("Sec-Websocket-Key") {
 865                Some(h) => h.as_str(),
 866                None => return Err(anyhow!("expected sec-websocket-key"))?,
 867            };
 868
 869            let user_id = process_auth_header(&request).await?;
 870
 871            let mut response = Response::new(StatusCode::SwitchingProtocols);
 872            response.insert_header(UPGRADE, "websocket");
 873            response.insert_header(CONNECTION, "Upgrade");
 874            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 875            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 876            response.insert_header("Sec-Websocket-Version", "13");
 877
 878            let http_res: &mut tide::http::Response = response.as_mut();
 879            let upgrade_receiver = http_res.recv_upgrade().await;
 880            let addr = request.remote().unwrap_or("unknown").to_string();
 881            task::spawn(async move {
 882                if let Some(stream) = upgrade_receiver.await {
 883                    server
 884                        .handle_connection(
 885                            Connection::new(
 886                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 887                            ),
 888                            addr,
 889                            user_id,
 890                        )
 891                        .await;
 892                }
 893            });
 894
 895            Ok(response)
 896        }
 897    });
 898}
 899
 900fn header_contains_ignore_case<T>(
 901    request: &tide::Request<T>,
 902    header_name: HeaderName,
 903    value: &str,
 904) -> bool {
 905    request
 906        .header(header_name)
 907        .map(|h| {
 908            h.as_str()
 909                .split(',')
 910                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 911        })
 912        .unwrap_or(false)
 913}
 914
 915#[cfg(test)]
 916mod tests {
 917    use super::*;
 918    use crate::{
 919        auth,
 920        db::{tests::TestDb, UserId},
 921        github, AppState, Config,
 922    };
 923    use ::rpc::Peer;
 924    use async_std::task;
 925    use gpui::{ModelHandle, TestAppContext};
 926    use parking_lot::Mutex;
 927    use postage::{mpsc, watch};
 928    use serde_json::json;
 929    use sqlx::types::time::OffsetDateTime;
 930    use std::{
 931        path::Path,
 932        sync::{
 933            atomic::{AtomicBool, Ordering::SeqCst},
 934            Arc,
 935        },
 936        time::Duration,
 937    };
 938    use zed::{
 939        client::{
 940            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
 941            EstablishConnectionError, UserStore,
 942        },
 943        editor::{Editor, EditorSettings, Input},
 944        fs::{FakeFs, Fs as _},
 945        language::{
 946            tree_sitter_rust, Diagnostic, Language, LanguageConfig, LanguageRegistry,
 947            LanguageServerConfig, Point,
 948        },
 949        lsp,
 950        people_panel::JoinWorktree,
 951        project::{ProjectPath, Worktree},
 952        test::test_app_state,
 953        workspace::Workspace,
 954    };
 955
 956    #[gpui::test]
 957    async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
 958        let (window_b, _) = cx_b.add_window(|_| EmptyView);
 959        let lang_registry = Arc::new(LanguageRegistry::new());
 960
 961        // Connect to a server as 2 clients.
 962        let mut server = TestServer::start().await;
 963        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
 964        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
 965
 966        cx_a.foreground().forbid_parking();
 967
 968        // Share a local worktree as client A
 969        let fs = Arc::new(FakeFs::new());
 970        fs.insert_tree(
 971            "/a",
 972            json!({
 973                ".zed.toml": r#"collaborators = ["user_b"]"#,
 974                "a.txt": "a-contents",
 975                "b.txt": "b-contents",
 976            }),
 977        )
 978        .await;
 979        let worktree_a = Worktree::open_local(
 980            client_a.clone(),
 981            "/a".as_ref(),
 982            fs,
 983            lang_registry.clone(),
 984            &mut cx_a.to_async(),
 985        )
 986        .await
 987        .unwrap();
 988        worktree_a
 989            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
 990            .await;
 991        let worktree_id = worktree_a
 992            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
 993            .await
 994            .unwrap();
 995
 996        // Join that worktree as client B, and see that a guest has joined as client A.
 997        let worktree_b = Worktree::open_remote(
 998            client_b.clone(),
 999            worktree_id,
1000            lang_registry.clone(),
1001            &mut cx_b.to_async(),
1002        )
1003        .await
1004        .unwrap();
1005        let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
1006        worktree_a
1007            .condition(&cx_a, |tree, _| {
1008                tree.peers()
1009                    .values()
1010                    .any(|replica_id| *replica_id == replica_id_b)
1011            })
1012            .await;
1013
1014        // Open the same file as client B and client A.
1015        let buffer_b = worktree_b
1016            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1017            .await
1018            .unwrap();
1019        buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1020        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1021        let buffer_a = worktree_a
1022            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1023            .await
1024            .unwrap();
1025
1026        // Create a selection set as client B and see that selection set as client A.
1027        let editor_b = cx_b.add_view(window_b, |cx| {
1028            Editor::for_buffer(buffer_b, |cx| EditorSettings::test(cx), cx)
1029        });
1030        buffer_a
1031            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1032            .await;
1033
1034        // Edit the buffer as client B and see that edit as client A.
1035        editor_b.update(&mut cx_b, |editor, cx| {
1036            editor.handle_input(&Input("ok, ".into()), cx)
1037        });
1038        buffer_a
1039            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1040            .await;
1041
1042        // Remove the selection set as client B, see those selections disappear as client A.
1043        cx_b.update(move |_| drop(editor_b));
1044        buffer_a
1045            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1046            .await;
1047
1048        // Close the buffer as client A, see that the buffer is closed.
1049        cx_a.update(move |_| drop(buffer_a));
1050        worktree_a
1051            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1052            .await;
1053
1054        // Dropping the worktree removes client B from client A's peers.
1055        cx_b.update(move |_| drop(worktree_b));
1056        worktree_a
1057            .condition(&cx_a, |tree, _| tree.peers().is_empty())
1058            .await;
1059    }
1060
1061    #[gpui::test]
1062    async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1063        cx_b.update(zed::people_panel::init);
1064        let mut app_state_a = cx_a.update(test_app_state);
1065        let mut app_state_b = cx_b.update(test_app_state);
1066
1067        // Connect to a server as 2 clients.
1068        let mut server = TestServer::start().await;
1069        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1070        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1071        Arc::get_mut(&mut app_state_a).unwrap().client = client_a;
1072        Arc::get_mut(&mut app_state_a).unwrap().user_store = user_store_a;
1073        Arc::get_mut(&mut app_state_b).unwrap().client = client_b;
1074        Arc::get_mut(&mut app_state_b).unwrap().user_store = user_store_b;
1075
1076        cx_a.foreground().forbid_parking();
1077
1078        // Share a local worktree as client A
1079        let fs = Arc::new(FakeFs::new());
1080        fs.insert_tree(
1081            "/a",
1082            json!({
1083                ".zed.toml": r#"collaborators = ["user_b"]"#,
1084                "a.txt": "a-contents",
1085                "b.txt": "b-contents",
1086            }),
1087        )
1088        .await;
1089        let worktree_a = Worktree::open_local(
1090            app_state_a.client.clone(),
1091            "/a".as_ref(),
1092            fs,
1093            app_state_a.languages.clone(),
1094            &mut cx_a.to_async(),
1095        )
1096        .await
1097        .unwrap();
1098        worktree_a
1099            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1100            .await;
1101
1102        let remote_worktree_id = worktree_a
1103            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1104            .await
1105            .unwrap();
1106
1107        let (window_b, workspace_b) =
1108            cx_b.add_window(|cx| Workspace::new(&app_state_b.as_ref().into(), cx));
1109        cx_b.update(|cx| {
1110            cx.dispatch_action(
1111                window_b,
1112                vec![workspace_b.id()],
1113                &JoinWorktree(remote_worktree_id),
1114            );
1115        });
1116        workspace_b
1117            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1118            .await;
1119
1120        let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1121            let active_pane = workspace.active_pane().read(cx);
1122            assert!(active_pane.active_item().is_none());
1123            workspace.worktrees(cx).first().unwrap().id()
1124        });
1125        workspace_b
1126            .update(&mut cx_b, |workspace, cx| {
1127                workspace.open_entry(
1128                    ProjectPath {
1129                        worktree_id: local_worktree_id_b,
1130                        path: Path::new("a.txt").into(),
1131                    },
1132                    cx,
1133                )
1134            })
1135            .unwrap()
1136            .await;
1137        workspace_b.read_with(&cx_b, |workspace, cx| {
1138            let active_pane = workspace.active_pane().read(cx);
1139            assert!(active_pane.active_item().is_some());
1140        });
1141
1142        worktree_a.update(&mut cx_a, |tree, cx| {
1143            tree.as_local_mut().unwrap().unshare(cx);
1144        });
1145        workspace_b
1146            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1147            .await;
1148        workspace_b.read_with(&cx_b, |workspace, cx| {
1149            let active_pane = workspace.active_pane().read(cx);
1150            assert!(active_pane.active_item().is_none());
1151        });
1152    }
1153
1154    #[gpui::test]
1155    async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1156        mut cx_a: TestAppContext,
1157        mut cx_b: TestAppContext,
1158        mut cx_c: TestAppContext,
1159    ) {
1160        cx_a.foreground().forbid_parking();
1161        let lang_registry = Arc::new(LanguageRegistry::new());
1162
1163        // Connect to a server as 3 clients.
1164        let mut server = TestServer::start().await;
1165        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1166        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1167        let (client_c, _) = server.create_client(&mut cx_c, "user_c").await;
1168
1169        let fs = Arc::new(FakeFs::new());
1170
1171        // Share a worktree as client A.
1172        fs.insert_tree(
1173            "/a",
1174            json!({
1175                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1176                "file1": "",
1177                "file2": ""
1178            }),
1179        )
1180        .await;
1181
1182        let worktree_a = Worktree::open_local(
1183            client_a.clone(),
1184            "/a".as_ref(),
1185            fs.clone(),
1186            lang_registry.clone(),
1187            &mut cx_a.to_async(),
1188        )
1189        .await
1190        .unwrap();
1191        worktree_a
1192            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1193            .await;
1194        let worktree_id = worktree_a
1195            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1196            .await
1197            .unwrap();
1198
1199        // Join that worktree as clients B and C.
1200        let worktree_b = Worktree::open_remote(
1201            client_b.clone(),
1202            worktree_id,
1203            lang_registry.clone(),
1204            &mut cx_b.to_async(),
1205        )
1206        .await
1207        .unwrap();
1208        let worktree_c = Worktree::open_remote(
1209            client_c.clone(),
1210            worktree_id,
1211            lang_registry.clone(),
1212            &mut cx_c.to_async(),
1213        )
1214        .await
1215        .unwrap();
1216
1217        // Open and edit a buffer as both guests B and C.
1218        let buffer_b = worktree_b
1219            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1220            .await
1221            .unwrap();
1222        let buffer_c = worktree_c
1223            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1224            .await
1225            .unwrap();
1226        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1227        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1228
1229        // Open and edit that buffer as the host.
1230        let buffer_a = worktree_a
1231            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1232            .await
1233            .unwrap();
1234
1235        buffer_a
1236            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1237            .await;
1238        buffer_a.update(&mut cx_a, |buf, cx| {
1239            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1240        });
1241
1242        // Wait for edits to propagate
1243        buffer_a
1244            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1245            .await;
1246        buffer_b
1247            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1248            .await;
1249        buffer_c
1250            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1251            .await;
1252
1253        // Edit the buffer as the host and concurrently save as guest B.
1254        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1255        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1256        save_b.await.unwrap();
1257        assert_eq!(
1258            fs.load("/a/file1".as_ref()).await.unwrap(),
1259            "hi-a, i-am-c, i-am-b, i-am-a"
1260        );
1261        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1262        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1263        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1264
1265        // Make changes on host's file system, see those changes on the guests.
1266        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1267            .await
1268            .unwrap();
1269        fs.insert_file(Path::new("/a/file4"), "4".into())
1270            .await
1271            .unwrap();
1272
1273        worktree_b
1274            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1275            .await;
1276        worktree_c
1277            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1278            .await;
1279        worktree_b.read_with(&cx_b, |tree, _| {
1280            assert_eq!(
1281                tree.paths()
1282                    .map(|p| p.to_string_lossy())
1283                    .collect::<Vec<_>>(),
1284                &[".zed.toml", "file1", "file3", "file4"]
1285            )
1286        });
1287        worktree_c.read_with(&cx_c, |tree, _| {
1288            assert_eq!(
1289                tree.paths()
1290                    .map(|p| p.to_string_lossy())
1291                    .collect::<Vec<_>>(),
1292                &[".zed.toml", "file1", "file3", "file4"]
1293            )
1294        });
1295    }
1296
1297    #[gpui::test]
1298    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1299        cx_a.foreground().forbid_parking();
1300        let lang_registry = Arc::new(LanguageRegistry::new());
1301
1302        // Connect to a server as 2 clients.
1303        let mut server = TestServer::start().await;
1304        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1305        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1306
1307        // Share a local worktree as client A
1308        let fs = Arc::new(FakeFs::new());
1309        fs.insert_tree(
1310            "/dir",
1311            json!({
1312                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1313                "a.txt": "a-contents",
1314            }),
1315        )
1316        .await;
1317
1318        let worktree_a = Worktree::open_local(
1319            client_a.clone(),
1320            "/dir".as_ref(),
1321            fs,
1322            lang_registry.clone(),
1323            &mut cx_a.to_async(),
1324        )
1325        .await
1326        .unwrap();
1327        worktree_a
1328            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1329            .await;
1330        let worktree_id = worktree_a
1331            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1332            .await
1333            .unwrap();
1334
1335        // Join that worktree as client B, and see that a guest has joined as client A.
1336        let worktree_b = Worktree::open_remote(
1337            client_b.clone(),
1338            worktree_id,
1339            lang_registry.clone(),
1340            &mut cx_b.to_async(),
1341        )
1342        .await
1343        .unwrap();
1344
1345        let buffer_b = worktree_b
1346            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1347            .await
1348            .unwrap();
1349        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1350
1351        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1352        buffer_b.read_with(&cx_b, |buf, _| {
1353            assert!(buf.is_dirty());
1354            assert!(!buf.has_conflict());
1355        });
1356
1357        buffer_b
1358            .update(&mut cx_b, |buf, cx| buf.save(cx))
1359            .unwrap()
1360            .await
1361            .unwrap();
1362        worktree_b
1363            .condition(&cx_b, |_, cx| {
1364                buffer_b.read(cx).file().unwrap().mtime() != mtime
1365            })
1366            .await;
1367        buffer_b.read_with(&cx_b, |buf, _| {
1368            assert!(!buf.is_dirty());
1369            assert!(!buf.has_conflict());
1370        });
1371
1372        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1373        buffer_b.read_with(&cx_b, |buf, _| {
1374            assert!(buf.is_dirty());
1375            assert!(!buf.has_conflict());
1376        });
1377    }
1378
1379    #[gpui::test]
1380    async fn test_editing_while_guest_opens_buffer(
1381        mut cx_a: TestAppContext,
1382        mut cx_b: TestAppContext,
1383    ) {
1384        cx_a.foreground().forbid_parking();
1385        let lang_registry = Arc::new(LanguageRegistry::new());
1386
1387        // Connect to a server as 2 clients.
1388        let mut server = TestServer::start().await;
1389        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1390        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1391
1392        // Share a local worktree as client A
1393        let fs = Arc::new(FakeFs::new());
1394        fs.insert_tree(
1395            "/dir",
1396            json!({
1397                ".zed.toml": r#"collaborators = ["user_b"]"#,
1398                "a.txt": "a-contents",
1399            }),
1400        )
1401        .await;
1402        let worktree_a = Worktree::open_local(
1403            client_a.clone(),
1404            "/dir".as_ref(),
1405            fs,
1406            lang_registry.clone(),
1407            &mut cx_a.to_async(),
1408        )
1409        .await
1410        .unwrap();
1411        worktree_a
1412            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1413            .await;
1414        let worktree_id = worktree_a
1415            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1416            .await
1417            .unwrap();
1418
1419        // Join that worktree as client B, and see that a guest has joined as client A.
1420        let worktree_b = Worktree::open_remote(
1421            client_b.clone(),
1422            worktree_id,
1423            lang_registry.clone(),
1424            &mut cx_b.to_async(),
1425        )
1426        .await
1427        .unwrap();
1428
1429        let buffer_a = worktree_a
1430            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1431            .await
1432            .unwrap();
1433        let buffer_b = cx_b
1434            .background()
1435            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1436
1437        task::yield_now().await;
1438        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1439
1440        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1441        let buffer_b = buffer_b.await.unwrap();
1442        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1443    }
1444
1445    #[gpui::test]
1446    async fn test_leaving_worktree_while_opening_buffer(
1447        mut cx_a: TestAppContext,
1448        mut cx_b: TestAppContext,
1449    ) {
1450        cx_a.foreground().forbid_parking();
1451        let lang_registry = Arc::new(LanguageRegistry::new());
1452
1453        // Connect to a server as 2 clients.
1454        let mut server = TestServer::start().await;
1455        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1456        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1457
1458        // Share a local worktree as client A
1459        let fs = Arc::new(FakeFs::new());
1460        fs.insert_tree(
1461            "/dir",
1462            json!({
1463                ".zed.toml": r#"collaborators = ["user_b"]"#,
1464                "a.txt": "a-contents",
1465            }),
1466        )
1467        .await;
1468        let worktree_a = Worktree::open_local(
1469            client_a.clone(),
1470            "/dir".as_ref(),
1471            fs,
1472            lang_registry.clone(),
1473            &mut cx_a.to_async(),
1474        )
1475        .await
1476        .unwrap();
1477        worktree_a
1478            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1479            .await;
1480        let worktree_id = worktree_a
1481            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1482            .await
1483            .unwrap();
1484
1485        // Join that worktree as client B, and see that a guest has joined as client A.
1486        let worktree_b = Worktree::open_remote(
1487            client_b.clone(),
1488            worktree_id,
1489            lang_registry.clone(),
1490            &mut cx_b.to_async(),
1491        )
1492        .await
1493        .unwrap();
1494        worktree_a
1495            .condition(&cx_a, |tree, _| tree.peers().len() == 1)
1496            .await;
1497
1498        let buffer_b = cx_b
1499            .background()
1500            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1501        cx_b.update(|_| drop(worktree_b));
1502        drop(buffer_b);
1503        worktree_a
1504            .condition(&cx_a, |tree, _| tree.peers().len() == 0)
1505            .await;
1506    }
1507
1508    #[gpui::test]
1509    async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) {
1510        cx_a.foreground().forbid_parking();
1511        let lang_registry = Arc::new(LanguageRegistry::new());
1512
1513        // Connect to a server as 2 clients.
1514        let mut server = TestServer::start().await;
1515        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1516        let (client_b, _) = server.create_client(&mut cx_a, "user_b").await;
1517
1518        // Share a local worktree as client A
1519        let fs = Arc::new(FakeFs::new());
1520        fs.insert_tree(
1521            "/a",
1522            json!({
1523                ".zed.toml": r#"collaborators = ["user_b"]"#,
1524                "a.txt": "a-contents",
1525                "b.txt": "b-contents",
1526            }),
1527        )
1528        .await;
1529        let worktree_a = Worktree::open_local(
1530            client_a.clone(),
1531            "/a".as_ref(),
1532            fs,
1533            lang_registry.clone(),
1534            &mut cx_a.to_async(),
1535        )
1536        .await
1537        .unwrap();
1538        worktree_a
1539            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1540            .await;
1541        let worktree_id = worktree_a
1542            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1543            .await
1544            .unwrap();
1545
1546        // Join that worktree as client B, and see that a guest has joined as client A.
1547        let _worktree_b = Worktree::open_remote(
1548            client_b.clone(),
1549            worktree_id,
1550            lang_registry.clone(),
1551            &mut cx_b.to_async(),
1552        )
1553        .await
1554        .unwrap();
1555        worktree_a
1556            .condition(&cx_a, |tree, _| tree.peers().len() == 1)
1557            .await;
1558
1559        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1560        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1561        worktree_a
1562            .condition(&cx_a, |tree, _| tree.peers().len() == 0)
1563            .await;
1564    }
1565
1566    #[gpui::test]
1567    async fn test_collaborating_with_diagnostics(
1568        mut cx_a: TestAppContext,
1569        mut cx_b: TestAppContext,
1570    ) {
1571        cx_a.foreground().forbid_parking();
1572        let (language_server_config, mut fake_language_server) =
1573            LanguageServerConfig::fake(cx_a.background()).await;
1574        let mut lang_registry = LanguageRegistry::new();
1575        lang_registry.add(Arc::new(Language::new(
1576            LanguageConfig {
1577                name: "Rust".to_string(),
1578                path_suffixes: vec!["rs".to_string()],
1579                language_server: Some(language_server_config),
1580                ..Default::default()
1581            },
1582            tree_sitter_rust::language(),
1583        )));
1584
1585        let lang_registry = Arc::new(lang_registry);
1586
1587        // Connect to a server as 2 clients.
1588        let mut server = TestServer::start().await;
1589        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1590        let (client_b, _) = server.create_client(&mut cx_a, "user_b").await;
1591
1592        // Share a local worktree as client A
1593        let fs = Arc::new(FakeFs::new());
1594        fs.insert_tree(
1595            "/a",
1596            json!({
1597                ".zed.toml": r#"collaborators = ["user_b"]"#,
1598                "a.rs": "let one = two",
1599                "other.rs": "",
1600            }),
1601        )
1602        .await;
1603        let worktree_a = Worktree::open_local(
1604            client_a.clone(),
1605            "/a".as_ref(),
1606            fs,
1607            lang_registry.clone(),
1608            &mut cx_a.to_async(),
1609        )
1610        .await
1611        .unwrap();
1612        worktree_a
1613            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1614            .await;
1615        let worktree_id = worktree_a
1616            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1617            .await
1618            .unwrap();
1619
1620        // Cause language server to start.
1621        let _ = cx_a
1622            .background()
1623            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1624                worktree.open_buffer("other.rs", cx)
1625            }))
1626            .await
1627            .unwrap();
1628
1629        // Simulate a language server reporting errors for a file.
1630        fake_language_server
1631            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1632                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1633                version: None,
1634                diagnostics: vec![
1635                    lsp::Diagnostic {
1636                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1637                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1638                        message: "message 1".to_string(),
1639                        ..Default::default()
1640                    },
1641                    lsp::Diagnostic {
1642                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1643                        range: lsp::Range::new(
1644                            lsp::Position::new(0, 10),
1645                            lsp::Position::new(0, 13),
1646                        ),
1647                        message: "message 2".to_string(),
1648                        ..Default::default()
1649                    },
1650                ],
1651            })
1652            .await;
1653
1654        // Join the worktree as client B.
1655        let worktree_b = Worktree::open_remote(
1656            client_b.clone(),
1657            worktree_id,
1658            lang_registry.clone(),
1659            &mut cx_b.to_async(),
1660        )
1661        .await
1662        .unwrap();
1663
1664        // Open the file with the errors.
1665        let buffer_b = cx_b
1666            .background()
1667            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1668            .await
1669            .unwrap();
1670
1671        buffer_b.read_with(&cx_b, |buffer, _| {
1672            assert_eq!(
1673                buffer
1674                    .diagnostics_in_range(0..buffer.len())
1675                    .collect::<Vec<_>>(),
1676                &[
1677                    (
1678                        Point::new(0, 4)..Point::new(0, 7),
1679                        &Diagnostic {
1680                            group_id: 0,
1681                            message: "message 1".to_string(),
1682                            severity: lsp::DiagnosticSeverity::ERROR,
1683                            is_primary: true
1684                        }
1685                    ),
1686                    (
1687                        Point::new(0, 10)..Point::new(0, 13),
1688                        &Diagnostic {
1689                            group_id: 1,
1690                            severity: lsp::DiagnosticSeverity::WARNING,
1691                            message: "message 2".to_string(),
1692                            is_primary: true
1693                        }
1694                    )
1695                ]
1696            );
1697        });
1698    }
1699
1700    #[gpui::test]
1701    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1702        cx_a.foreground().forbid_parking();
1703
1704        // Connect to a server as 2 clients.
1705        let mut server = TestServer::start().await;
1706        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1707        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1708
1709        // Create an org that includes these 2 users.
1710        let db = &server.app_state.db;
1711        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1712        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1713            .await
1714            .unwrap();
1715        db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
1716            .await
1717            .unwrap();
1718
1719        // Create a channel that includes all the users.
1720        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1721        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1722            .await
1723            .unwrap();
1724        db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
1725            .await
1726            .unwrap();
1727        db.create_channel_message(
1728            channel_id,
1729            current_user_id(&user_store_b, &cx_b),
1730            "hello A, it's B.",
1731            OffsetDateTime::now_utc(),
1732            1,
1733        )
1734        .await
1735        .unwrap();
1736
1737        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1738        channels_a
1739            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1740            .await;
1741        channels_a.read_with(&cx_a, |list, _| {
1742            assert_eq!(
1743                list.available_channels().unwrap(),
1744                &[ChannelDetails {
1745                    id: channel_id.to_proto(),
1746                    name: "test-channel".to_string()
1747                }]
1748            )
1749        });
1750        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1751            this.get_channel(channel_id.to_proto(), cx).unwrap()
1752        });
1753        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1754        channel_a
1755            .condition(&cx_a, |channel, _| {
1756                channel_messages(channel)
1757                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1758            })
1759            .await;
1760
1761        let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b, client_b, cx));
1762        channels_b
1763            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1764            .await;
1765        channels_b.read_with(&cx_b, |list, _| {
1766            assert_eq!(
1767                list.available_channels().unwrap(),
1768                &[ChannelDetails {
1769                    id: channel_id.to_proto(),
1770                    name: "test-channel".to_string()
1771                }]
1772            )
1773        });
1774
1775        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1776            this.get_channel(channel_id.to_proto(), cx).unwrap()
1777        });
1778        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1779        channel_b
1780            .condition(&cx_b, |channel, _| {
1781                channel_messages(channel)
1782                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1783            })
1784            .await;
1785
1786        channel_a
1787            .update(&mut cx_a, |channel, cx| {
1788                channel
1789                    .send_message("oh, hi B.".to_string(), cx)
1790                    .unwrap()
1791                    .detach();
1792                let task = channel.send_message("sup".to_string(), cx).unwrap();
1793                assert_eq!(
1794                    channel_messages(channel),
1795                    &[
1796                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1797                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1798                        ("user_a".to_string(), "sup".to_string(), true)
1799                    ]
1800                );
1801                task
1802            })
1803            .await
1804            .unwrap();
1805
1806        channel_b
1807            .condition(&cx_b, |channel, _| {
1808                channel_messages(channel)
1809                    == [
1810                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1811                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1812                        ("user_a".to_string(), "sup".to_string(), false),
1813                    ]
1814            })
1815            .await;
1816
1817        assert_eq!(
1818            server
1819                .state()
1820                .await
1821                .channel(channel_id)
1822                .unwrap()
1823                .connection_ids
1824                .len(),
1825            2
1826        );
1827        cx_b.update(|_| drop(channel_b));
1828        server
1829            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1830            .await;
1831
1832        cx_a.update(|_| drop(channel_a));
1833        server
1834            .condition(|state| state.channel(channel_id).is_none())
1835            .await;
1836    }
1837
1838    #[gpui::test]
1839    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1840        cx_a.foreground().forbid_parking();
1841
1842        let mut server = TestServer::start().await;
1843        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1844
1845        let db = &server.app_state.db;
1846        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1847        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1848        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1849            .await
1850            .unwrap();
1851        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1852            .await
1853            .unwrap();
1854
1855        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1856        channels_a
1857            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1858            .await;
1859        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1860            this.get_channel(channel_id.to_proto(), cx).unwrap()
1861        });
1862
1863        // Messages aren't allowed to be too long.
1864        channel_a
1865            .update(&mut cx_a, |channel, cx| {
1866                let long_body = "this is long.\n".repeat(1024);
1867                channel.send_message(long_body, cx).unwrap()
1868            })
1869            .await
1870            .unwrap_err();
1871
1872        // Messages aren't allowed to be blank.
1873        channel_a.update(&mut cx_a, |channel, cx| {
1874            channel.send_message(String::new(), cx).unwrap_err()
1875        });
1876
1877        // Leading and trailing whitespace are trimmed.
1878        channel_a
1879            .update(&mut cx_a, |channel, cx| {
1880                channel
1881                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
1882                    .unwrap()
1883            })
1884            .await
1885            .unwrap();
1886        assert_eq!(
1887            db.get_channel_messages(channel_id, 10, None)
1888                .await
1889                .unwrap()
1890                .iter()
1891                .map(|m| &m.body)
1892                .collect::<Vec<_>>(),
1893            &["surrounded by whitespace"]
1894        );
1895    }
1896
1897    #[gpui::test]
1898    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1899        cx_a.foreground().forbid_parking();
1900
1901        // Connect to a server as 2 clients.
1902        let mut server = TestServer::start().await;
1903        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1904        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1905        let mut status_b = client_b.status();
1906
1907        // Create an org that includes these 2 users.
1908        let db = &server.app_state.db;
1909        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1910        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1911            .await
1912            .unwrap();
1913        db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
1914            .await
1915            .unwrap();
1916
1917        // Create a channel that includes all the users.
1918        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1919        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1920            .await
1921            .unwrap();
1922        db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
1923            .await
1924            .unwrap();
1925        db.create_channel_message(
1926            channel_id,
1927            current_user_id(&user_store_b, &cx_b),
1928            "hello A, it's B.",
1929            OffsetDateTime::now_utc(),
1930            2,
1931        )
1932        .await
1933        .unwrap();
1934
1935        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1936        channels_a
1937            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1938            .await;
1939
1940        channels_a.read_with(&cx_a, |list, _| {
1941            assert_eq!(
1942                list.available_channels().unwrap(),
1943                &[ChannelDetails {
1944                    id: channel_id.to_proto(),
1945                    name: "test-channel".to_string()
1946                }]
1947            )
1948        });
1949        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1950            this.get_channel(channel_id.to_proto(), cx).unwrap()
1951        });
1952        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1953        channel_a
1954            .condition(&cx_a, |channel, _| {
1955                channel_messages(channel)
1956                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1957            })
1958            .await;
1959
1960        let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b.clone(), client_b, cx));
1961        channels_b
1962            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1963            .await;
1964        channels_b.read_with(&cx_b, |list, _| {
1965            assert_eq!(
1966                list.available_channels().unwrap(),
1967                &[ChannelDetails {
1968                    id: channel_id.to_proto(),
1969                    name: "test-channel".to_string()
1970                }]
1971            )
1972        });
1973
1974        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1975            this.get_channel(channel_id.to_proto(), cx).unwrap()
1976        });
1977        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1978        channel_b
1979            .condition(&cx_b, |channel, _| {
1980                channel_messages(channel)
1981                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1982            })
1983            .await;
1984
1985        // Disconnect client B, ensuring we can still access its cached channel data.
1986        server.forbid_connections();
1987        server.disconnect_client(current_user_id(&user_store_b, &cx_b));
1988        while !matches!(
1989            status_b.recv().await,
1990            Some(client::Status::ReconnectionError { .. })
1991        ) {}
1992
1993        channels_b.read_with(&cx_b, |channels, _| {
1994            assert_eq!(
1995                channels.available_channels().unwrap(),
1996                [ChannelDetails {
1997                    id: channel_id.to_proto(),
1998                    name: "test-channel".to_string()
1999                }]
2000            )
2001        });
2002        channel_b.read_with(&cx_b, |channel, _| {
2003            assert_eq!(
2004                channel_messages(channel),
2005                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2006            )
2007        });
2008
2009        // Send a message from client B while it is disconnected.
2010        channel_b
2011            .update(&mut cx_b, |channel, cx| {
2012                let task = channel
2013                    .send_message("can you see this?".to_string(), cx)
2014                    .unwrap();
2015                assert_eq!(
2016                    channel_messages(channel),
2017                    &[
2018                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2019                        ("user_b".to_string(), "can you see this?".to_string(), true)
2020                    ]
2021                );
2022                task
2023            })
2024            .await
2025            .unwrap_err();
2026
2027        // Send a message from client A while B is disconnected.
2028        channel_a
2029            .update(&mut cx_a, |channel, cx| {
2030                channel
2031                    .send_message("oh, hi B.".to_string(), cx)
2032                    .unwrap()
2033                    .detach();
2034                let task = channel.send_message("sup".to_string(), cx).unwrap();
2035                assert_eq!(
2036                    channel_messages(channel),
2037                    &[
2038                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2039                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2040                        ("user_a".to_string(), "sup".to_string(), true)
2041                    ]
2042                );
2043                task
2044            })
2045            .await
2046            .unwrap();
2047
2048        // Give client B a chance to reconnect.
2049        server.allow_connections();
2050        cx_b.foreground().advance_clock(Duration::from_secs(10));
2051
2052        // Verify that B sees the new messages upon reconnection, as well as the message client B
2053        // sent while offline.
2054        channel_b
2055            .condition(&cx_b, |channel, _| {
2056                channel_messages(channel)
2057                    == [
2058                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2059                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2060                        ("user_a".to_string(), "sup".to_string(), false),
2061                        ("user_b".to_string(), "can you see this?".to_string(), false),
2062                    ]
2063            })
2064            .await;
2065
2066        // Ensure client A and B can communicate normally after reconnection.
2067        channel_a
2068            .update(&mut cx_a, |channel, cx| {
2069                channel.send_message("you online?".to_string(), cx).unwrap()
2070            })
2071            .await
2072            .unwrap();
2073        channel_b
2074            .condition(&cx_b, |channel, _| {
2075                channel_messages(channel)
2076                    == [
2077                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2078                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2079                        ("user_a".to_string(), "sup".to_string(), false),
2080                        ("user_b".to_string(), "can you see this?".to_string(), false),
2081                        ("user_a".to_string(), "you online?".to_string(), false),
2082                    ]
2083            })
2084            .await;
2085
2086        channel_b
2087            .update(&mut cx_b, |channel, cx| {
2088                channel.send_message("yep".to_string(), cx).unwrap()
2089            })
2090            .await
2091            .unwrap();
2092        channel_a
2093            .condition(&cx_a, |channel, _| {
2094                channel_messages(channel)
2095                    == [
2096                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2097                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2098                        ("user_a".to_string(), "sup".to_string(), false),
2099                        ("user_b".to_string(), "can you see this?".to_string(), false),
2100                        ("user_a".to_string(), "you online?".to_string(), false),
2101                        ("user_b".to_string(), "yep".to_string(), false),
2102                    ]
2103            })
2104            .await;
2105    }
2106
2107    #[gpui::test]
2108    async fn test_contacts(
2109        mut cx_a: TestAppContext,
2110        mut cx_b: TestAppContext,
2111        mut cx_c: TestAppContext,
2112    ) {
2113        cx_a.foreground().forbid_parking();
2114        let lang_registry = Arc::new(LanguageRegistry::new());
2115
2116        // Connect to a server as 3 clients.
2117        let mut server = TestServer::start().await;
2118        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
2119        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
2120        let (_client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await;
2121
2122        let fs = Arc::new(FakeFs::new());
2123
2124        // Share a worktree as client A.
2125        fs.insert_tree(
2126            "/a",
2127            json!({
2128                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2129            }),
2130        )
2131        .await;
2132
2133        let worktree_a = Worktree::open_local(
2134            client_a.clone(),
2135            "/a".as_ref(),
2136            fs.clone(),
2137            lang_registry.clone(),
2138            &mut cx_a.to_async(),
2139        )
2140        .await
2141        .unwrap();
2142
2143        user_store_a
2144            .condition(&cx_a, |user_store, _| {
2145                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2146            })
2147            .await;
2148        user_store_b
2149            .condition(&cx_b, |user_store, _| {
2150                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2151            })
2152            .await;
2153        user_store_c
2154            .condition(&cx_c, |user_store, _| {
2155                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2156            })
2157            .await;
2158
2159        let worktree_id = worktree_a
2160            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2161            .await
2162            .unwrap();
2163
2164        let _worktree_b = Worktree::open_remote(
2165            client_b.clone(),
2166            worktree_id,
2167            lang_registry.clone(),
2168            &mut cx_b.to_async(),
2169        )
2170        .await
2171        .unwrap();
2172
2173        user_store_a
2174            .condition(&cx_a, |user_store, _| {
2175                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2176            })
2177            .await;
2178        user_store_b
2179            .condition(&cx_b, |user_store, _| {
2180                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2181            })
2182            .await;
2183        user_store_c
2184            .condition(&cx_c, |user_store, _| {
2185                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2186            })
2187            .await;
2188
2189        cx_a.update(move |_| drop(worktree_a));
2190        user_store_a
2191            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2192            .await;
2193        user_store_b
2194            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2195            .await;
2196        user_store_c
2197            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2198            .await;
2199
2200        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2201            user_store
2202                .contacts()
2203                .iter()
2204                .map(|contact| {
2205                    let worktrees = contact
2206                        .worktrees
2207                        .iter()
2208                        .map(|w| {
2209                            (
2210                                w.root_name.as_str(),
2211                                w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2212                            )
2213                        })
2214                        .collect();
2215                    (contact.user.github_login.as_str(), worktrees)
2216                })
2217                .collect()
2218        }
2219    }
2220
2221    struct TestServer {
2222        peer: Arc<Peer>,
2223        app_state: Arc<AppState>,
2224        server: Arc<Server>,
2225        notifications: mpsc::Receiver<()>,
2226        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2227        forbid_connections: Arc<AtomicBool>,
2228        _test_db: TestDb,
2229    }
2230
2231    impl TestServer {
2232        async fn start() -> Self {
2233            let test_db = TestDb::new();
2234            let app_state = Self::build_app_state(&test_db).await;
2235            let peer = Peer::new();
2236            let notifications = mpsc::channel(128);
2237            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2238            Self {
2239                peer,
2240                app_state,
2241                server,
2242                notifications: notifications.1,
2243                connection_killers: Default::default(),
2244                forbid_connections: Default::default(),
2245                _test_db: test_db,
2246            }
2247        }
2248
2249        async fn create_client(
2250            &mut self,
2251            cx: &mut TestAppContext,
2252            name: &str,
2253        ) -> (Arc<Client>, ModelHandle<UserStore>) {
2254            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2255            let client_name = name.to_string();
2256            let mut client = Client::new();
2257            let server = self.server.clone();
2258            let connection_killers = self.connection_killers.clone();
2259            let forbid_connections = self.forbid_connections.clone();
2260            Arc::get_mut(&mut client)
2261                .unwrap()
2262                .override_authenticate(move |cx| {
2263                    cx.spawn(|_| async move {
2264                        let access_token = "the-token".to_string();
2265                        Ok(Credentials {
2266                            user_id: user_id.0 as u64,
2267                            access_token,
2268                        })
2269                    })
2270                })
2271                .override_establish_connection(move |credentials, cx| {
2272                    assert_eq!(credentials.user_id, user_id.0 as u64);
2273                    assert_eq!(credentials.access_token, "the-token");
2274
2275                    let server = server.clone();
2276                    let connection_killers = connection_killers.clone();
2277                    let forbid_connections = forbid_connections.clone();
2278                    let client_name = client_name.clone();
2279                    cx.spawn(move |cx| async move {
2280                        if forbid_connections.load(SeqCst) {
2281                            Err(EstablishConnectionError::other(anyhow!(
2282                                "server is forbidding connections"
2283                            )))
2284                        } else {
2285                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2286                            connection_killers.lock().insert(user_id, kill_conn);
2287                            cx.background()
2288                                .spawn(server.handle_connection(server_conn, client_name, user_id))
2289                                .detach();
2290                            Ok(client_conn)
2291                        }
2292                    })
2293                });
2294
2295            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2296            client
2297                .authenticate_and_connect(&cx.to_async())
2298                .await
2299                .unwrap();
2300
2301            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2302            let mut authed_user =
2303                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2304            while authed_user.recv().await.unwrap().is_none() {}
2305
2306            (client, user_store)
2307        }
2308
2309        fn disconnect_client(&self, user_id: UserId) {
2310            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2311                let _ = kill_conn.try_send(Some(()));
2312            }
2313        }
2314
2315        fn forbid_connections(&self) {
2316            self.forbid_connections.store(true, SeqCst);
2317        }
2318
2319        fn allow_connections(&self) {
2320            self.forbid_connections.store(false, SeqCst);
2321        }
2322
2323        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2324            let mut config = Config::default();
2325            config.session_secret = "a".repeat(32);
2326            config.database_url = test_db.url.clone();
2327            let github_client = github::AppClient::test();
2328            Arc::new(AppState {
2329                db: test_db.db().clone(),
2330                handlebars: Default::default(),
2331                auth_client: auth::build_client("", ""),
2332                repo_client: github::RepoClient::test(&github_client),
2333                github_client,
2334                config,
2335            })
2336        }
2337
2338        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2339            self.server.store.read()
2340        }
2341
2342        async fn condition<F>(&mut self, mut predicate: F)
2343        where
2344            F: FnMut(&Store) -> bool,
2345        {
2346            async_std::future::timeout(Duration::from_millis(500), async {
2347                while !(predicate)(&*self.server.store.read()) {
2348                    self.notifications.recv().await;
2349                }
2350            })
2351            .await
2352            .expect("condition timed out");
2353        }
2354    }
2355
2356    impl Drop for TestServer {
2357        fn drop(&mut self) {
2358            task::block_on(self.peer.reset());
2359        }
2360    }
2361
2362    fn current_user_id(user_store: &ModelHandle<UserStore>, cx: &TestAppContext) -> UserId {
2363        UserId::from_proto(
2364            user_store.read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2365        )
2366    }
2367
2368    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2369        channel
2370            .messages()
2371            .cursor::<()>()
2372            .map(|m| {
2373                (
2374                    m.sender.github_login.clone(),
2375                    m.body.clone(),
2376                    m.is_pending(),
2377                )
2378            })
2379            .collect()
2380    }
2381
2382    struct EmptyView;
2383
2384    impl gpui::Entity for EmptyView {
2385        type Event = ();
2386    }
2387
2388    impl gpui::View for EmptyView {
2389        fn ui_name() -> &'static str {
2390            "empty view"
2391        }
2392
2393        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2394            gpui::Element::boxed(gpui::elements::Empty)
2395        }
2396    }
2397}