rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use futures::{future::BoxFuture, FutureExt};
  12use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  13use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{
  20    any::TypeId,
  21    collections::{HashMap, HashSet},
  22    future::Future,
  23    mem,
  24    sync::Arc,
  25    time::Instant,
  26};
  27use store::{Store, Worktree};
  28use surf::StatusCode;
  29use tide::log;
  30use tide::{
  31    http::headers::{HeaderName, CONNECTION, UPGRADE},
  32    Request, Response,
  33};
  34use time::OffsetDateTime;
  35
  36type MessageHandler = Box<
  37    dyn Send
  38        + Sync
  39        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  40>;
  41
  42pub struct Server {
  43    peer: Arc<Peer>,
  44    store: RwLock<Store>,
  45    app_state: Arc<AppState>,
  46    handlers: HashMap<TypeId, MessageHandler>,
  47    notifications: Option<mpsc::Sender<()>>,
  48}
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_handler(Server::ping)
  69            .add_handler(Server::open_worktree)
  70            .add_handler(Server::close_worktree)
  71            .add_handler(Server::share_worktree)
  72            .add_handler(Server::unshare_worktree)
  73            .add_handler(Server::join_worktree)
  74            .add_handler(Server::leave_worktree)
  75            .add_handler(Server::update_worktree)
  76            .add_handler(Server::open_buffer)
  77            .add_handler(Server::close_buffer)
  78            .add_handler(Server::update_buffer)
  79            .add_handler(Server::buffer_saved)
  80            .add_handler(Server::save_buffer)
  81            .add_handler(Server::get_channels)
  82            .add_handler(Server::get_users)
  83            .add_handler(Server::join_channel)
  84            .add_handler(Server::leave_channel)
  85            .add_handler(Server::send_channel_message)
  86            .add_handler(Server::get_channel_messages);
  87
  88        Arc::new(server)
  89    }
  90
  91    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  92    where
  93        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  94        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  95        M: EnvelopedMessage,
  96    {
  97        let prev_handler = self.handlers.insert(
  98            TypeId::of::<M>(),
  99            Box::new(move |server, envelope| {
 100                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 101                (handler)(server, *envelope).boxed()
 102            }),
 103        );
 104        if prev_handler.is_some() {
 105            panic!("registered a handler for the same message twice");
 106        }
 107        self
 108    }
 109
 110    pub fn handle_connection(
 111        self: &Arc<Self>,
 112        connection: Connection,
 113        addr: String,
 114        user_id: UserId,
 115    ) -> impl Future<Output = ()> {
 116        let mut this = self.clone();
 117        async move {
 118            let (connection_id, handle_io, mut incoming_rx) =
 119                this.peer.add_connection(connection).await;
 120            this.state_mut().add_connection(connection_id, user_id);
 121            if let Err(err) = this.update_collaborators_for_users(&[user_id]).await {
 122                log::error!("error updating collaborators for {:?}: {}", user_id, err);
 123            }
 124
 125            let handle_io = handle_io.fuse();
 126            futures::pin_mut!(handle_io);
 127            loop {
 128                let next_message = incoming_rx.recv().fuse();
 129                futures::pin_mut!(next_message);
 130                futures::select_biased! {
 131                    message = next_message => {
 132                        if let Some(message) = message {
 133                            let start_time = Instant::now();
 134                            log::info!("RPC message received: {}", message.payload_type_name());
 135                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 136                                if let Err(err) = (handler)(this.clone(), message).await {
 137                                    log::error!("error handling message: {:?}", err);
 138                                } else {
 139                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 140                                }
 141
 142                                if let Some(mut notifications) = this.notifications.clone() {
 143                                    let _ = notifications.send(()).await;
 144                                }
 145                            } else {
 146                                log::warn!("unhandled message: {}", message.payload_type_name());
 147                            }
 148                        } else {
 149                            log::info!("rpc connection closed {:?}", addr);
 150                            break;
 151                        }
 152                    }
 153                    handle_io = handle_io => {
 154                        if let Err(err) = handle_io {
 155                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 156                        }
 157                        break;
 158                    }
 159                }
 160            }
 161
 162            if let Err(err) = this.sign_out(connection_id).await {
 163                log::error!("error signing out connection {:?} - {:?}", addr, err);
 164            }
 165        }
 166    }
 167
 168    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 169        self.peer.disconnect(connection_id).await;
 170        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 171
 172        for (worktree_id, worktree) in removed_connection.hosted_worktrees {
 173            if let Some(share) = worktree.share {
 174                broadcast(
 175                    connection_id,
 176                    share.guest_connection_ids.keys().copied().collect(),
 177                    |conn_id| {
 178                        self.peer
 179                            .send(conn_id, proto::UnshareWorktree { worktree_id })
 180                    },
 181                )
 182                .await?;
 183            }
 184        }
 185
 186        for (worktree_id, peer_ids) in removed_connection.guest_worktree_ids {
 187            broadcast(connection_id, peer_ids, |conn_id| {
 188                self.peer.send(
 189                    conn_id,
 190                    proto::RemovePeer {
 191                        worktree_id,
 192                        peer_id: connection_id.0,
 193                    },
 194                )
 195            })
 196            .await?;
 197        }
 198
 199        self.update_collaborators_for_users(removed_connection.collaborator_ids.iter())
 200            .await?;
 201
 202        Ok(())
 203    }
 204
 205    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 206        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 207        Ok(())
 208    }
 209
 210    async fn open_worktree(
 211        mut self: Arc<Server>,
 212        request: TypedEnvelope<proto::OpenWorktree>,
 213    ) -> tide::Result<()> {
 214        let receipt = request.receipt();
 215        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 216
 217        let mut collaborator_user_ids = HashSet::new();
 218        collaborator_user_ids.insert(host_user_id);
 219        for github_login in request.payload.collaborator_logins {
 220            match self.app_state.db.create_user(&github_login, false).await {
 221                Ok(collaborator_user_id) => {
 222                    collaborator_user_ids.insert(collaborator_user_id);
 223                }
 224                Err(err) => {
 225                    let message = err.to_string();
 226                    self.peer
 227                        .respond_with_error(receipt, proto::Error { message })
 228                        .await?;
 229                    return Ok(());
 230                }
 231            }
 232        }
 233
 234        let collaborator_user_ids = collaborator_user_ids.into_iter().collect::<Vec<_>>();
 235        let worktree_id = self.state_mut().add_worktree(Worktree {
 236            host_connection_id: request.sender_id,
 237            collaborator_user_ids: collaborator_user_ids.clone(),
 238            root_name: request.payload.root_name,
 239            share: None,
 240        });
 241
 242        self.peer
 243            .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
 244            .await?;
 245        self.update_collaborators_for_users(&collaborator_user_ids)
 246            .await?;
 247
 248        Ok(())
 249    }
 250
 251    async fn close_worktree(
 252        mut self: Arc<Server>,
 253        request: TypedEnvelope<proto::CloseWorktree>,
 254    ) -> tide::Result<()> {
 255        let worktree_id = request.payload.worktree_id;
 256        let worktree = self
 257            .state_mut()
 258            .remove_worktree(worktree_id, request.sender_id)?;
 259
 260        if let Some(share) = worktree.share {
 261            broadcast(
 262                request.sender_id,
 263                share.guest_connection_ids.keys().copied().collect(),
 264                |conn_id| {
 265                    self.peer
 266                        .send(conn_id, proto::UnshareWorktree { worktree_id })
 267                },
 268            )
 269            .await?;
 270        }
 271        self.update_collaborators_for_users(&worktree.collaborator_user_ids)
 272            .await?;
 273        Ok(())
 274    }
 275
 276    async fn share_worktree(
 277        mut self: Arc<Server>,
 278        mut request: TypedEnvelope<proto::ShareWorktree>,
 279    ) -> tide::Result<()> {
 280        let worktree = request
 281            .payload
 282            .worktree
 283            .as_mut()
 284            .ok_or_else(|| anyhow!("missing worktree"))?;
 285        let entries = mem::take(&mut worktree.entries)
 286            .into_iter()
 287            .map(|entry| (entry.id, entry))
 288            .collect();
 289
 290        let collaborator_user_ids =
 291            self.state_mut()
 292                .share_worktree(worktree.id, request.sender_id, entries);
 293        if let Some(collaborator_user_ids) = collaborator_user_ids {
 294            self.peer
 295                .respond(request.receipt(), proto::ShareWorktreeResponse {})
 296                .await?;
 297            self.update_collaborators_for_users(&collaborator_user_ids)
 298                .await?;
 299        } else {
 300            self.peer
 301                .respond_with_error(
 302                    request.receipt(),
 303                    proto::Error {
 304                        message: "no such worktree".to_string(),
 305                    },
 306                )
 307                .await?;
 308        }
 309        Ok(())
 310    }
 311
 312    async fn unshare_worktree(
 313        mut self: Arc<Server>,
 314        request: TypedEnvelope<proto::UnshareWorktree>,
 315    ) -> tide::Result<()> {
 316        let worktree_id = request.payload.worktree_id;
 317        let worktree = self
 318            .state_mut()
 319            .unshare_worktree(worktree_id, request.sender_id)?;
 320
 321        broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
 322            self.peer
 323                .send(conn_id, proto::UnshareWorktree { worktree_id })
 324        })
 325        .await?;
 326        self.update_collaborators_for_users(&worktree.collaborator_ids)
 327            .await?;
 328
 329        Ok(())
 330    }
 331
 332    async fn join_worktree(
 333        mut self: Arc<Server>,
 334        request: TypedEnvelope<proto::JoinWorktree>,
 335    ) -> tide::Result<()> {
 336        let worktree_id = request.payload.worktree_id;
 337
 338        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 339        let response_data = self
 340            .state_mut()
 341            .join_worktree(request.sender_id, user_id, worktree_id)
 342            .and_then(|joined| {
 343                let share = joined.worktree.share()?;
 344                let peer_count = share.guest_connection_ids.len();
 345                let mut peers = Vec::with_capacity(peer_count);
 346                peers.push(proto::Peer {
 347                    peer_id: joined.worktree.host_connection_id.0,
 348                    replica_id: 0,
 349                });
 350                for (peer_conn_id, peer_replica_id) in &share.guest_connection_ids {
 351                    if *peer_conn_id != request.sender_id {
 352                        peers.push(proto::Peer {
 353                            peer_id: peer_conn_id.0,
 354                            replica_id: *peer_replica_id as u32,
 355                        });
 356                    }
 357                }
 358                let response = proto::JoinWorktreeResponse {
 359                    worktree: Some(proto::Worktree {
 360                        id: worktree_id,
 361                        root_name: joined.worktree.root_name.clone(),
 362                        entries: share.entries.values().cloned().collect(),
 363                    }),
 364                    replica_id: joined.replica_id as u32,
 365                    peers,
 366                };
 367                let connection_ids = joined.worktree.connection_ids();
 368                let collaborator_user_ids = joined.worktree.collaborator_user_ids.clone();
 369                Ok((response, connection_ids, collaborator_user_ids))
 370            });
 371
 372        match response_data {
 373            Ok((response, connection_ids, collaborator_user_ids)) => {
 374                broadcast(request.sender_id, connection_ids, |conn_id| {
 375                    self.peer.send(
 376                        conn_id,
 377                        proto::AddPeer {
 378                            worktree_id,
 379                            peer: Some(proto::Peer {
 380                                peer_id: request.sender_id.0,
 381                                replica_id: response.replica_id,
 382                            }),
 383                        },
 384                    )
 385                })
 386                .await?;
 387                self.peer.respond(request.receipt(), response).await?;
 388                self.update_collaborators_for_users(&collaborator_user_ids)
 389                    .await?;
 390            }
 391            Err(error) => {
 392                self.peer
 393                    .respond_with_error(
 394                        request.receipt(),
 395                        proto::Error {
 396                            message: error.to_string(),
 397                        },
 398                    )
 399                    .await?;
 400            }
 401        }
 402
 403        Ok(())
 404    }
 405
 406    async fn leave_worktree(
 407        mut self: Arc<Server>,
 408        request: TypedEnvelope<proto::LeaveWorktree>,
 409    ) -> tide::Result<()> {
 410        let sender_id = request.sender_id;
 411        let worktree_id = request.payload.worktree_id;
 412        let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
 413        if let Some(worktree) = worktree {
 414            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 415                self.peer.send(
 416                    conn_id,
 417                    proto::RemovePeer {
 418                        worktree_id,
 419                        peer_id: sender_id.0,
 420                    },
 421                )
 422            })
 423            .await?;
 424            self.update_collaborators_for_users(&worktree.collaborator_ids)
 425                .await?;
 426        }
 427        Ok(())
 428    }
 429
 430    async fn update_worktree(
 431        mut self: Arc<Server>,
 432        request: TypedEnvelope<proto::UpdateWorktree>,
 433    ) -> tide::Result<()> {
 434        let connection_ids = self.state_mut().update_worktree(
 435            request.sender_id,
 436            request.payload.worktree_id,
 437            &request.payload.removed_entries,
 438            &request.payload.updated_entries,
 439        )?;
 440
 441        broadcast(request.sender_id, connection_ids, |connection_id| {
 442            self.peer
 443                .forward_send(request.sender_id, connection_id, request.payload.clone())
 444        })
 445        .await?;
 446
 447        Ok(())
 448    }
 449
 450    async fn open_buffer(
 451        self: Arc<Server>,
 452        request: TypedEnvelope<proto::OpenBuffer>,
 453    ) -> tide::Result<()> {
 454        let receipt = request.receipt();
 455        let host_connection_id = self
 456            .state()
 457            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 458        let response = self
 459            .peer
 460            .forward_request(request.sender_id, host_connection_id, request.payload)
 461            .await?;
 462        self.peer.respond(receipt, response).await?;
 463        Ok(())
 464    }
 465
 466    async fn close_buffer(
 467        self: Arc<Server>,
 468        request: TypedEnvelope<proto::CloseBuffer>,
 469    ) -> tide::Result<()> {
 470        let host_connection_id = self
 471            .state()
 472            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 473        self.peer
 474            .forward_send(request.sender_id, host_connection_id, request.payload)
 475            .await?;
 476        Ok(())
 477    }
 478
 479    async fn save_buffer(
 480        self: Arc<Server>,
 481        request: TypedEnvelope<proto::SaveBuffer>,
 482    ) -> tide::Result<()> {
 483        let host;
 484        let guests;
 485        {
 486            let state = self.state();
 487            host = state
 488                .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 489            guests = state
 490                .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
 491        }
 492
 493        let sender = request.sender_id;
 494        let receipt = request.receipt();
 495        let response = self
 496            .peer
 497            .forward_request(sender, host, request.payload.clone())
 498            .await?;
 499
 500        broadcast(host, guests, |conn_id| {
 501            let response = response.clone();
 502            let peer = &self.peer;
 503            async move {
 504                if conn_id == sender {
 505                    peer.respond(receipt, response).await
 506                } else {
 507                    peer.forward_send(host, conn_id, response).await
 508                }
 509            }
 510        })
 511        .await?;
 512
 513        Ok(())
 514    }
 515
 516    async fn update_buffer(
 517        self: Arc<Server>,
 518        request: TypedEnvelope<proto::UpdateBuffer>,
 519    ) -> tide::Result<()> {
 520        let receiver_ids = self
 521            .state()
 522            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 523        broadcast(request.sender_id, receiver_ids, |connection_id| {
 524            self.peer
 525                .forward_send(request.sender_id, connection_id, request.payload.clone())
 526        })
 527        .await?;
 528        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 529        Ok(())
 530    }
 531
 532    async fn buffer_saved(
 533        self: Arc<Server>,
 534        request: TypedEnvelope<proto::BufferSaved>,
 535    ) -> tide::Result<()> {
 536        let receiver_ids = self
 537            .state()
 538            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 539        broadcast(request.sender_id, receiver_ids, |connection_id| {
 540            self.peer
 541                .forward_send(request.sender_id, connection_id, request.payload.clone())
 542        })
 543        .await?;
 544        Ok(())
 545    }
 546
 547    async fn get_channels(
 548        self: Arc<Server>,
 549        request: TypedEnvelope<proto::GetChannels>,
 550    ) -> tide::Result<()> {
 551        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 552        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 553        self.peer
 554            .respond(
 555                request.receipt(),
 556                proto::GetChannelsResponse {
 557                    channels: channels
 558                        .into_iter()
 559                        .map(|chan| proto::Channel {
 560                            id: chan.id.to_proto(),
 561                            name: chan.name,
 562                        })
 563                        .collect(),
 564                },
 565            )
 566            .await?;
 567        Ok(())
 568    }
 569
 570    async fn get_users(
 571        self: Arc<Server>,
 572        request: TypedEnvelope<proto::GetUsers>,
 573    ) -> tide::Result<()> {
 574        let receipt = request.receipt();
 575        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 576        let users = self
 577            .app_state
 578            .db
 579            .get_users_by_ids(user_ids)
 580            .await?
 581            .into_iter()
 582            .map(|user| proto::User {
 583                id: user.id.to_proto(),
 584                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 585                github_login: user.github_login,
 586            })
 587            .collect();
 588        self.peer
 589            .respond(receipt, proto::GetUsersResponse { users })
 590            .await?;
 591        Ok(())
 592    }
 593
 594    async fn update_collaborators_for_users<'a>(
 595        self: &Arc<Server>,
 596        user_ids: impl IntoIterator<Item = &'a UserId>,
 597    ) -> tide::Result<()> {
 598        let mut send_futures = Vec::new();
 599
 600        {
 601            let state = self.state();
 602            for user_id in user_ids {
 603                let collaborators = state.collaborators_for_user(*user_id);
 604                for connection_id in state.connection_ids_for_user(*user_id) {
 605                    send_futures.push(self.peer.send(
 606                        connection_id,
 607                        proto::UpdateCollaborators {
 608                            collaborators: collaborators.clone(),
 609                        },
 610                    ));
 611                }
 612            }
 613        }
 614        futures::future::try_join_all(send_futures).await?;
 615
 616        Ok(())
 617    }
 618
 619    async fn join_channel(
 620        mut self: Arc<Self>,
 621        request: TypedEnvelope<proto::JoinChannel>,
 622    ) -> tide::Result<()> {
 623        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 624        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 625        if !self
 626            .app_state
 627            .db
 628            .can_user_access_channel(user_id, channel_id)
 629            .await?
 630        {
 631            Err(anyhow!("access denied"))?;
 632        }
 633
 634        self.state_mut().join_channel(request.sender_id, channel_id);
 635        let messages = self
 636            .app_state
 637            .db
 638            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 639            .await?
 640            .into_iter()
 641            .map(|msg| proto::ChannelMessage {
 642                id: msg.id.to_proto(),
 643                body: msg.body,
 644                timestamp: msg.sent_at.unix_timestamp() as u64,
 645                sender_id: msg.sender_id.to_proto(),
 646                nonce: Some(msg.nonce.as_u128().into()),
 647            })
 648            .collect::<Vec<_>>();
 649        self.peer
 650            .respond(
 651                request.receipt(),
 652                proto::JoinChannelResponse {
 653                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 654                    messages,
 655                },
 656            )
 657            .await?;
 658        Ok(())
 659    }
 660
 661    async fn leave_channel(
 662        mut self: Arc<Self>,
 663        request: TypedEnvelope<proto::LeaveChannel>,
 664    ) -> tide::Result<()> {
 665        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 666        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 667        if !self
 668            .app_state
 669            .db
 670            .can_user_access_channel(user_id, channel_id)
 671            .await?
 672        {
 673            Err(anyhow!("access denied"))?;
 674        }
 675
 676        self.state_mut()
 677            .leave_channel(request.sender_id, channel_id);
 678
 679        Ok(())
 680    }
 681
 682    async fn send_channel_message(
 683        self: Arc<Self>,
 684        request: TypedEnvelope<proto::SendChannelMessage>,
 685    ) -> tide::Result<()> {
 686        let receipt = request.receipt();
 687        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 688        let user_id;
 689        let connection_ids;
 690        {
 691            let state = self.state();
 692            user_id = state.user_id_for_connection(request.sender_id)?;
 693            if let Some(ids) = state.channel_connection_ids(channel_id) {
 694                connection_ids = ids;
 695            } else {
 696                return Ok(());
 697            }
 698        }
 699
 700        // Validate the message body.
 701        let body = request.payload.body.trim().to_string();
 702        if body.len() > MAX_MESSAGE_LEN {
 703            self.peer
 704                .respond_with_error(
 705                    receipt,
 706                    proto::Error {
 707                        message: "message is too long".to_string(),
 708                    },
 709                )
 710                .await?;
 711            return Ok(());
 712        }
 713        if body.is_empty() {
 714            self.peer
 715                .respond_with_error(
 716                    receipt,
 717                    proto::Error {
 718                        message: "message can't be blank".to_string(),
 719                    },
 720                )
 721                .await?;
 722            return Ok(());
 723        }
 724
 725        let timestamp = OffsetDateTime::now_utc();
 726        let nonce = if let Some(nonce) = request.payload.nonce {
 727            nonce
 728        } else {
 729            self.peer
 730                .respond_with_error(
 731                    receipt,
 732                    proto::Error {
 733                        message: "nonce can't be blank".to_string(),
 734                    },
 735                )
 736                .await?;
 737            return Ok(());
 738        };
 739
 740        let message_id = self
 741            .app_state
 742            .db
 743            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 744            .await?
 745            .to_proto();
 746        let message = proto::ChannelMessage {
 747            sender_id: user_id.to_proto(),
 748            id: message_id,
 749            body,
 750            timestamp: timestamp.unix_timestamp() as u64,
 751            nonce: Some(nonce),
 752        };
 753        broadcast(request.sender_id, connection_ids, |conn_id| {
 754            self.peer.send(
 755                conn_id,
 756                proto::ChannelMessageSent {
 757                    channel_id: channel_id.to_proto(),
 758                    message: Some(message.clone()),
 759                },
 760            )
 761        })
 762        .await?;
 763        self.peer
 764            .respond(
 765                receipt,
 766                proto::SendChannelMessageResponse {
 767                    message: Some(message),
 768                },
 769            )
 770            .await?;
 771        Ok(())
 772    }
 773
 774    async fn get_channel_messages(
 775        self: Arc<Self>,
 776        request: TypedEnvelope<proto::GetChannelMessages>,
 777    ) -> tide::Result<()> {
 778        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 779        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 780        if !self
 781            .app_state
 782            .db
 783            .can_user_access_channel(user_id, channel_id)
 784            .await?
 785        {
 786            Err(anyhow!("access denied"))?;
 787        }
 788
 789        let messages = self
 790            .app_state
 791            .db
 792            .get_channel_messages(
 793                channel_id,
 794                MESSAGE_COUNT_PER_PAGE,
 795                Some(MessageId::from_proto(request.payload.before_message_id)),
 796            )
 797            .await?
 798            .into_iter()
 799            .map(|msg| proto::ChannelMessage {
 800                id: msg.id.to_proto(),
 801                body: msg.body,
 802                timestamp: msg.sent_at.unix_timestamp() as u64,
 803                sender_id: msg.sender_id.to_proto(),
 804                nonce: Some(msg.nonce.as_u128().into()),
 805            })
 806            .collect::<Vec<_>>();
 807        self.peer
 808            .respond(
 809                request.receipt(),
 810                proto::GetChannelMessagesResponse {
 811                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 812                    messages,
 813                },
 814            )
 815            .await?;
 816        Ok(())
 817    }
 818
 819    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 820        self.store.read()
 821    }
 822
 823    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 824        self.store.write()
 825    }
 826}
 827
 828pub async fn broadcast<F, T>(
 829    sender_id: ConnectionId,
 830    receiver_ids: Vec<ConnectionId>,
 831    mut f: F,
 832) -> anyhow::Result<()>
 833where
 834    F: FnMut(ConnectionId) -> T,
 835    T: Future<Output = anyhow::Result<()>>,
 836{
 837    let futures = receiver_ids
 838        .into_iter()
 839        .filter(|id| *id != sender_id)
 840        .map(|id| f(id));
 841    futures::future::try_join_all(futures).await?;
 842    Ok(())
 843}
 844
 845pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 846    let server = Server::new(app.state().clone(), rpc.clone(), None);
 847    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 848        let server = server.clone();
 849        async move {
 850            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 851
 852            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 853            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 854            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 855            let client_protocol_version: Option<u32> = request
 856                .header("X-Zed-Protocol-Version")
 857                .and_then(|v| v.as_str().parse().ok());
 858
 859            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 860                return Ok(Response::new(StatusCode::UpgradeRequired));
 861            }
 862
 863            let header = match request.header("Sec-Websocket-Key") {
 864                Some(h) => h.as_str(),
 865                None => return Err(anyhow!("expected sec-websocket-key"))?,
 866            };
 867
 868            let user_id = process_auth_header(&request).await?;
 869
 870            let mut response = Response::new(StatusCode::SwitchingProtocols);
 871            response.insert_header(UPGRADE, "websocket");
 872            response.insert_header(CONNECTION, "Upgrade");
 873            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 874            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 875            response.insert_header("Sec-Websocket-Version", "13");
 876
 877            let http_res: &mut tide::http::Response = response.as_mut();
 878            let upgrade_receiver = http_res.recv_upgrade().await;
 879            let addr = request.remote().unwrap_or("unknown").to_string();
 880            task::spawn(async move {
 881                if let Some(stream) = upgrade_receiver.await {
 882                    server
 883                        .handle_connection(
 884                            Connection::new(
 885                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 886                            ),
 887                            addr,
 888                            user_id,
 889                        )
 890                        .await;
 891                }
 892            });
 893
 894            Ok(response)
 895        }
 896    });
 897}
 898
 899fn header_contains_ignore_case<T>(
 900    request: &tide::Request<T>,
 901    header_name: HeaderName,
 902    value: &str,
 903) -> bool {
 904    request
 905        .header(header_name)
 906        .map(|h| {
 907            h.as_str()
 908                .split(',')
 909                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 910        })
 911        .unwrap_or(false)
 912}
 913
 914#[cfg(test)]
 915mod tests {
 916    use super::*;
 917    use crate::{
 918        auth,
 919        db::{tests::TestDb, UserId},
 920        github, AppState, Config,
 921    };
 922    use ::rpc::Peer;
 923    use async_std::task;
 924    use gpui::{ModelHandle, TestAppContext};
 925    use parking_lot::Mutex;
 926    use postage::{mpsc, watch};
 927    use serde_json::json;
 928    use sqlx::types::time::OffsetDateTime;
 929    use std::{
 930        path::Path,
 931        sync::{
 932            atomic::{AtomicBool, Ordering::SeqCst},
 933            Arc,
 934        },
 935        time::Duration,
 936    };
 937    use zed::{
 938        client::{
 939            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
 940            EstablishConnectionError, UserStore,
 941        },
 942        editor::{Editor, EditorSettings, Input},
 943        fs::{FakeFs, Fs as _},
 944        language::{
 945            tree_sitter_rust, Diagnostic, Language, LanguageConfig, LanguageRegistry,
 946            LanguageServerConfig, Point,
 947        },
 948        lsp,
 949        people_panel::JoinWorktree,
 950        project::{ProjectPath, Worktree},
 951        test::test_app_state,
 952        workspace::Workspace,
 953    };
 954
 955    #[gpui::test]
 956    async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
 957        let (window_b, _) = cx_b.add_window(|_| EmptyView);
 958        let lang_registry = Arc::new(LanguageRegistry::new());
 959
 960        // Connect to a server as 2 clients.
 961        let mut server = TestServer::start().await;
 962        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
 963        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
 964
 965        cx_a.foreground().forbid_parking();
 966
 967        // Share a local worktree as client A
 968        let fs = Arc::new(FakeFs::new());
 969        fs.insert_tree(
 970            "/a",
 971            json!({
 972                ".zed.toml": r#"collaborators = ["user_b"]"#,
 973                "a.txt": "a-contents",
 974                "b.txt": "b-contents",
 975            }),
 976        )
 977        .await;
 978        let worktree_a = Worktree::open_local(
 979            client_a.clone(),
 980            "/a".as_ref(),
 981            fs,
 982            lang_registry.clone(),
 983            &mut cx_a.to_async(),
 984        )
 985        .await
 986        .unwrap();
 987        worktree_a
 988            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
 989            .await;
 990        let worktree_id = worktree_a
 991            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
 992            .await
 993            .unwrap();
 994
 995        // Join that worktree as client B, and see that a guest has joined as client A.
 996        let worktree_b = Worktree::open_remote(
 997            client_b.clone(),
 998            worktree_id,
 999            lang_registry.clone(),
1000            &mut cx_b.to_async(),
1001        )
1002        .await
1003        .unwrap();
1004        let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
1005        worktree_a
1006            .condition(&cx_a, |tree, _| {
1007                tree.peers()
1008                    .values()
1009                    .any(|replica_id| *replica_id == replica_id_b)
1010            })
1011            .await;
1012
1013        // Open the same file as client B and client A.
1014        let buffer_b = worktree_b
1015            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1016            .await
1017            .unwrap();
1018        buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1019        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1020        let buffer_a = worktree_a
1021            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1022            .await
1023            .unwrap();
1024
1025        // Create a selection set as client B and see that selection set as client A.
1026        let editor_b = cx_b.add_view(window_b, |cx| {
1027            Editor::for_buffer(buffer_b, |cx| EditorSettings::test(cx), cx)
1028        });
1029        buffer_a
1030            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1031            .await;
1032
1033        // Edit the buffer as client B and see that edit as client A.
1034        editor_b.update(&mut cx_b, |editor, cx| {
1035            editor.handle_input(&Input("ok, ".into()), cx)
1036        });
1037        buffer_a
1038            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1039            .await;
1040
1041        // Remove the selection set as client B, see those selections disappear as client A.
1042        cx_b.update(move |_| drop(editor_b));
1043        buffer_a
1044            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1045            .await;
1046
1047        // Close the buffer as client A, see that the buffer is closed.
1048        cx_a.update(move |_| drop(buffer_a));
1049        worktree_a
1050            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1051            .await;
1052
1053        // Dropping the worktree removes client B from client A's peers.
1054        cx_b.update(move |_| drop(worktree_b));
1055        worktree_a
1056            .condition(&cx_a, |tree, _| tree.peers().is_empty())
1057            .await;
1058    }
1059
1060    #[gpui::test]
1061    async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1062        cx_b.update(zed::people_panel::init);
1063        let mut app_state_a = cx_a.update(test_app_state);
1064        let mut app_state_b = cx_b.update(test_app_state);
1065
1066        // Connect to a server as 2 clients.
1067        let mut server = TestServer::start().await;
1068        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1069        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1070        Arc::get_mut(&mut app_state_a).unwrap().client = client_a;
1071        Arc::get_mut(&mut app_state_a).unwrap().user_store = user_store_a;
1072        Arc::get_mut(&mut app_state_b).unwrap().client = client_b;
1073        Arc::get_mut(&mut app_state_b).unwrap().user_store = user_store_b;
1074
1075        cx_a.foreground().forbid_parking();
1076
1077        // Share a local worktree as client A
1078        let fs = Arc::new(FakeFs::new());
1079        fs.insert_tree(
1080            "/a",
1081            json!({
1082                ".zed.toml": r#"collaborators = ["user_b"]"#,
1083                "a.txt": "a-contents",
1084                "b.txt": "b-contents",
1085            }),
1086        )
1087        .await;
1088        let worktree_a = Worktree::open_local(
1089            app_state_a.client.clone(),
1090            "/a".as_ref(),
1091            fs,
1092            app_state_a.languages.clone(),
1093            &mut cx_a.to_async(),
1094        )
1095        .await
1096        .unwrap();
1097        worktree_a
1098            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1099            .await;
1100
1101        let remote_worktree_id = worktree_a
1102            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1103            .await
1104            .unwrap();
1105
1106        let (window_b, workspace_b) =
1107            cx_b.add_window(|cx| Workspace::new(&app_state_b.as_ref().into(), cx));
1108        cx_b.update(|cx| {
1109            cx.dispatch_action(
1110                window_b,
1111                vec![workspace_b.id()],
1112                &JoinWorktree(remote_worktree_id),
1113            );
1114        });
1115        workspace_b
1116            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1117            .await;
1118
1119        let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1120            let active_pane = workspace.active_pane().read(cx);
1121            assert!(active_pane.active_item().is_none());
1122            workspace.worktrees(cx).first().unwrap().id()
1123        });
1124        workspace_b
1125            .update(&mut cx_b, |workspace, cx| {
1126                workspace.open_entry(
1127                    ProjectPath {
1128                        worktree_id: local_worktree_id_b,
1129                        path: Path::new("a.txt").into(),
1130                    },
1131                    cx,
1132                )
1133            })
1134            .unwrap()
1135            .await;
1136        workspace_b.read_with(&cx_b, |workspace, cx| {
1137            let active_pane = workspace.active_pane().read(cx);
1138            assert!(active_pane.active_item().is_some());
1139        });
1140
1141        worktree_a.update(&mut cx_a, |tree, cx| {
1142            tree.as_local_mut().unwrap().unshare(cx);
1143        });
1144        workspace_b
1145            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1146            .await;
1147        workspace_b.read_with(&cx_b, |workspace, cx| {
1148            let active_pane = workspace.active_pane().read(cx);
1149            assert!(active_pane.active_item().is_none());
1150        });
1151    }
1152
1153    #[gpui::test]
1154    async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1155        mut cx_a: TestAppContext,
1156        mut cx_b: TestAppContext,
1157        mut cx_c: TestAppContext,
1158    ) {
1159        cx_a.foreground().forbid_parking();
1160        let lang_registry = Arc::new(LanguageRegistry::new());
1161
1162        // Connect to a server as 3 clients.
1163        let mut server = TestServer::start().await;
1164        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1165        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1166        let (client_c, _) = server.create_client(&mut cx_c, "user_c").await;
1167
1168        let fs = Arc::new(FakeFs::new());
1169
1170        // Share a worktree as client A.
1171        fs.insert_tree(
1172            "/a",
1173            json!({
1174                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1175                "file1": "",
1176                "file2": ""
1177            }),
1178        )
1179        .await;
1180
1181        let worktree_a = Worktree::open_local(
1182            client_a.clone(),
1183            "/a".as_ref(),
1184            fs.clone(),
1185            lang_registry.clone(),
1186            &mut cx_a.to_async(),
1187        )
1188        .await
1189        .unwrap();
1190        worktree_a
1191            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1192            .await;
1193        let worktree_id = worktree_a
1194            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1195            .await
1196            .unwrap();
1197
1198        // Join that worktree as clients B and C.
1199        let worktree_b = Worktree::open_remote(
1200            client_b.clone(),
1201            worktree_id,
1202            lang_registry.clone(),
1203            &mut cx_b.to_async(),
1204        )
1205        .await
1206        .unwrap();
1207        let worktree_c = Worktree::open_remote(
1208            client_c.clone(),
1209            worktree_id,
1210            lang_registry.clone(),
1211            &mut cx_c.to_async(),
1212        )
1213        .await
1214        .unwrap();
1215
1216        // Open and edit a buffer as both guests B and C.
1217        let buffer_b = worktree_b
1218            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1219            .await
1220            .unwrap();
1221        let buffer_c = worktree_c
1222            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1223            .await
1224            .unwrap();
1225        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1226        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1227
1228        // Open and edit that buffer as the host.
1229        let buffer_a = worktree_a
1230            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1231            .await
1232            .unwrap();
1233
1234        buffer_a
1235            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1236            .await;
1237        buffer_a.update(&mut cx_a, |buf, cx| {
1238            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1239        });
1240
1241        // Wait for edits to propagate
1242        buffer_a
1243            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1244            .await;
1245        buffer_b
1246            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1247            .await;
1248        buffer_c
1249            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1250            .await;
1251
1252        // Edit the buffer as the host and concurrently save as guest B.
1253        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1254        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1255        save_b.await.unwrap();
1256        assert_eq!(
1257            fs.load("/a/file1".as_ref()).await.unwrap(),
1258            "hi-a, i-am-c, i-am-b, i-am-a"
1259        );
1260        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1261        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1262        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1263
1264        // Make changes on host's file system, see those changes on the guests.
1265        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1266            .await
1267            .unwrap();
1268        fs.insert_file(Path::new("/a/file4"), "4".into())
1269            .await
1270            .unwrap();
1271
1272        worktree_b
1273            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1274            .await;
1275        worktree_c
1276            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1277            .await;
1278        worktree_b.read_with(&cx_b, |tree, _| {
1279            assert_eq!(
1280                tree.paths()
1281                    .map(|p| p.to_string_lossy())
1282                    .collect::<Vec<_>>(),
1283                &[".zed.toml", "file1", "file3", "file4"]
1284            )
1285        });
1286        worktree_c.read_with(&cx_c, |tree, _| {
1287            assert_eq!(
1288                tree.paths()
1289                    .map(|p| p.to_string_lossy())
1290                    .collect::<Vec<_>>(),
1291                &[".zed.toml", "file1", "file3", "file4"]
1292            )
1293        });
1294    }
1295
1296    #[gpui::test]
1297    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1298        cx_a.foreground().forbid_parking();
1299        let lang_registry = Arc::new(LanguageRegistry::new());
1300
1301        // Connect to a server as 2 clients.
1302        let mut server = TestServer::start().await;
1303        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1304        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1305
1306        // Share a local worktree as client A
1307        let fs = Arc::new(FakeFs::new());
1308        fs.insert_tree(
1309            "/dir",
1310            json!({
1311                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1312                "a.txt": "a-contents",
1313            }),
1314        )
1315        .await;
1316
1317        let worktree_a = Worktree::open_local(
1318            client_a.clone(),
1319            "/dir".as_ref(),
1320            fs,
1321            lang_registry.clone(),
1322            &mut cx_a.to_async(),
1323        )
1324        .await
1325        .unwrap();
1326        worktree_a
1327            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1328            .await;
1329        let worktree_id = worktree_a
1330            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1331            .await
1332            .unwrap();
1333
1334        // Join that worktree as client B, and see that a guest has joined as client A.
1335        let worktree_b = Worktree::open_remote(
1336            client_b.clone(),
1337            worktree_id,
1338            lang_registry.clone(),
1339            &mut cx_b.to_async(),
1340        )
1341        .await
1342        .unwrap();
1343
1344        let buffer_b = worktree_b
1345            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1346            .await
1347            .unwrap();
1348        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1349
1350        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1351        buffer_b.read_with(&cx_b, |buf, _| {
1352            assert!(buf.is_dirty());
1353            assert!(!buf.has_conflict());
1354        });
1355
1356        buffer_b
1357            .update(&mut cx_b, |buf, cx| buf.save(cx))
1358            .unwrap()
1359            .await
1360            .unwrap();
1361        worktree_b
1362            .condition(&cx_b, |_, cx| {
1363                buffer_b.read(cx).file().unwrap().mtime() != mtime
1364            })
1365            .await;
1366        buffer_b.read_with(&cx_b, |buf, _| {
1367            assert!(!buf.is_dirty());
1368            assert!(!buf.has_conflict());
1369        });
1370
1371        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1372        buffer_b.read_with(&cx_b, |buf, _| {
1373            assert!(buf.is_dirty());
1374            assert!(!buf.has_conflict());
1375        });
1376    }
1377
1378    #[gpui::test]
1379    async fn test_editing_while_guest_opens_buffer(
1380        mut cx_a: TestAppContext,
1381        mut cx_b: TestAppContext,
1382    ) {
1383        cx_a.foreground().forbid_parking();
1384        let lang_registry = Arc::new(LanguageRegistry::new());
1385
1386        // Connect to a server as 2 clients.
1387        let mut server = TestServer::start().await;
1388        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1389        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1390
1391        // Share a local worktree as client A
1392        let fs = Arc::new(FakeFs::new());
1393        fs.insert_tree(
1394            "/dir",
1395            json!({
1396                ".zed.toml": r#"collaborators = ["user_b"]"#,
1397                "a.txt": "a-contents",
1398            }),
1399        )
1400        .await;
1401        let worktree_a = Worktree::open_local(
1402            client_a.clone(),
1403            "/dir".as_ref(),
1404            fs,
1405            lang_registry.clone(),
1406            &mut cx_a.to_async(),
1407        )
1408        .await
1409        .unwrap();
1410        worktree_a
1411            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1412            .await;
1413        let worktree_id = worktree_a
1414            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1415            .await
1416            .unwrap();
1417
1418        // Join that worktree as client B, and see that a guest has joined as client A.
1419        let worktree_b = Worktree::open_remote(
1420            client_b.clone(),
1421            worktree_id,
1422            lang_registry.clone(),
1423            &mut cx_b.to_async(),
1424        )
1425        .await
1426        .unwrap();
1427
1428        let buffer_a = worktree_a
1429            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1430            .await
1431            .unwrap();
1432        let buffer_b = cx_b
1433            .background()
1434            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1435
1436        task::yield_now().await;
1437        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1438
1439        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1440        let buffer_b = buffer_b.await.unwrap();
1441        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1442    }
1443
1444    #[gpui::test]
1445    async fn test_leaving_worktree_while_opening_buffer(
1446        mut cx_a: TestAppContext,
1447        mut cx_b: TestAppContext,
1448    ) {
1449        cx_a.foreground().forbid_parking();
1450        let lang_registry = Arc::new(LanguageRegistry::new());
1451
1452        // Connect to a server as 2 clients.
1453        let mut server = TestServer::start().await;
1454        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1455        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1456
1457        // Share a local worktree as client A
1458        let fs = Arc::new(FakeFs::new());
1459        fs.insert_tree(
1460            "/dir",
1461            json!({
1462                ".zed.toml": r#"collaborators = ["user_b"]"#,
1463                "a.txt": "a-contents",
1464            }),
1465        )
1466        .await;
1467        let worktree_a = Worktree::open_local(
1468            client_a.clone(),
1469            "/dir".as_ref(),
1470            fs,
1471            lang_registry.clone(),
1472            &mut cx_a.to_async(),
1473        )
1474        .await
1475        .unwrap();
1476        worktree_a
1477            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1478            .await;
1479        let worktree_id = worktree_a
1480            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1481            .await
1482            .unwrap();
1483
1484        // Join that worktree as client B, and see that a guest has joined as client A.
1485        let worktree_b = Worktree::open_remote(
1486            client_b.clone(),
1487            worktree_id,
1488            lang_registry.clone(),
1489            &mut cx_b.to_async(),
1490        )
1491        .await
1492        .unwrap();
1493        worktree_a
1494            .condition(&cx_a, |tree, _| tree.peers().len() == 1)
1495            .await;
1496
1497        let buffer_b = cx_b
1498            .background()
1499            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1500        cx_b.update(|_| drop(worktree_b));
1501        drop(buffer_b);
1502        worktree_a
1503            .condition(&cx_a, |tree, _| tree.peers().len() == 0)
1504            .await;
1505    }
1506
1507    #[gpui::test]
1508    async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) {
1509        cx_a.foreground().forbid_parking();
1510        let lang_registry = Arc::new(LanguageRegistry::new());
1511
1512        // Connect to a server as 2 clients.
1513        let mut server = TestServer::start().await;
1514        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1515        let (client_b, _) = server.create_client(&mut cx_a, "user_b").await;
1516
1517        // Share a local worktree as client A
1518        let fs = Arc::new(FakeFs::new());
1519        fs.insert_tree(
1520            "/a",
1521            json!({
1522                ".zed.toml": r#"collaborators = ["user_b"]"#,
1523                "a.txt": "a-contents",
1524                "b.txt": "b-contents",
1525            }),
1526        )
1527        .await;
1528        let worktree_a = Worktree::open_local(
1529            client_a.clone(),
1530            "/a".as_ref(),
1531            fs,
1532            lang_registry.clone(),
1533            &mut cx_a.to_async(),
1534        )
1535        .await
1536        .unwrap();
1537        worktree_a
1538            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1539            .await;
1540        let worktree_id = worktree_a
1541            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1542            .await
1543            .unwrap();
1544
1545        // Join that worktree as client B, and see that a guest has joined as client A.
1546        let _worktree_b = Worktree::open_remote(
1547            client_b.clone(),
1548            worktree_id,
1549            lang_registry.clone(),
1550            &mut cx_b.to_async(),
1551        )
1552        .await
1553        .unwrap();
1554        worktree_a
1555            .condition(&cx_a, |tree, _| tree.peers().len() == 1)
1556            .await;
1557
1558        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1559        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1560        worktree_a
1561            .condition(&cx_a, |tree, _| tree.peers().len() == 0)
1562            .await;
1563    }
1564
1565    #[gpui::test]
1566    async fn test_collaborating_with_diagnostics(
1567        mut cx_a: TestAppContext,
1568        mut cx_b: TestAppContext,
1569    ) {
1570        cx_a.foreground().forbid_parking();
1571        let (language_server_config, mut fake_language_server) =
1572            LanguageServerConfig::fake(cx_a.background()).await;
1573        let mut lang_registry = LanguageRegistry::new();
1574        lang_registry.add(Arc::new(Language::new(
1575            LanguageConfig {
1576                name: "Rust".to_string(),
1577                path_suffixes: vec!["rs".to_string()],
1578                language_server: Some(language_server_config),
1579                ..Default::default()
1580            },
1581            tree_sitter_rust::language(),
1582        )));
1583
1584        let lang_registry = Arc::new(lang_registry);
1585
1586        // Connect to a server as 2 clients.
1587        let mut server = TestServer::start().await;
1588        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1589        let (client_b, _) = server.create_client(&mut cx_a, "user_b").await;
1590
1591        // Share a local worktree as client A
1592        let fs = Arc::new(FakeFs::new());
1593        fs.insert_tree(
1594            "/a",
1595            json!({
1596                ".zed.toml": r#"collaborators = ["user_b"]"#,
1597                "a.rs": "let one = two",
1598                "other.rs": "",
1599            }),
1600        )
1601        .await;
1602        let worktree_a = Worktree::open_local(
1603            client_a.clone(),
1604            "/a".as_ref(),
1605            fs,
1606            lang_registry.clone(),
1607            &mut cx_a.to_async(),
1608        )
1609        .await
1610        .unwrap();
1611        worktree_a
1612            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1613            .await;
1614        let worktree_id = worktree_a
1615            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1616            .await
1617            .unwrap();
1618
1619        // Cause language server to start.
1620        let _ = cx_a
1621            .background()
1622            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1623                worktree.open_buffer("other.rs", cx)
1624            }))
1625            .await
1626            .unwrap();
1627
1628        // Simulate a language server reporting errors for a file.
1629        fake_language_server
1630            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1631                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1632                version: None,
1633                diagnostics: vec![
1634                    lsp::Diagnostic {
1635                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1636                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1637                        message: "message 1".to_string(),
1638                        ..Default::default()
1639                    },
1640                    lsp::Diagnostic {
1641                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1642                        range: lsp::Range::new(
1643                            lsp::Position::new(0, 10),
1644                            lsp::Position::new(0, 13),
1645                        ),
1646                        message: "message 2".to_string(),
1647                        ..Default::default()
1648                    },
1649                ],
1650            })
1651            .await;
1652
1653        // Join the worktree as client B.
1654        let worktree_b = Worktree::open_remote(
1655            client_b.clone(),
1656            worktree_id,
1657            lang_registry.clone(),
1658            &mut cx_b.to_async(),
1659        )
1660        .await
1661        .unwrap();
1662
1663        // Open the file with the errors.
1664        let buffer_b = cx_b
1665            .background()
1666            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1667            .await
1668            .unwrap();
1669
1670        buffer_b.read_with(&cx_b, |buffer, _| {
1671            assert_eq!(
1672                buffer
1673                    .diagnostics_in_range(0..buffer.len())
1674                    .collect::<Vec<_>>(),
1675                &[
1676                    (
1677                        Point::new(0, 4)..Point::new(0, 7),
1678                        &Diagnostic {
1679                            group_id: 0,
1680                            message: "message 1".to_string(),
1681                            severity: lsp::DiagnosticSeverity::ERROR,
1682                            is_primary: true
1683                        }
1684                    ),
1685                    (
1686                        Point::new(0, 10)..Point::new(0, 13),
1687                        &Diagnostic {
1688                            group_id: 1,
1689                            severity: lsp::DiagnosticSeverity::WARNING,
1690                            message: "message 2".to_string(),
1691                            is_primary: true
1692                        }
1693                    )
1694                ]
1695            );
1696        });
1697    }
1698
1699    #[gpui::test]
1700    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1701        cx_a.foreground().forbid_parking();
1702
1703        // Connect to a server as 2 clients.
1704        let mut server = TestServer::start().await;
1705        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1706        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1707
1708        // Create an org that includes these 2 users.
1709        let db = &server.app_state.db;
1710        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1711        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1712            .await
1713            .unwrap();
1714        db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
1715            .await
1716            .unwrap();
1717
1718        // Create a channel that includes all the users.
1719        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1720        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1721            .await
1722            .unwrap();
1723        db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
1724            .await
1725            .unwrap();
1726        db.create_channel_message(
1727            channel_id,
1728            current_user_id(&user_store_b, &cx_b),
1729            "hello A, it's B.",
1730            OffsetDateTime::now_utc(),
1731            1,
1732        )
1733        .await
1734        .unwrap();
1735
1736        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1737        channels_a
1738            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1739            .await;
1740        channels_a.read_with(&cx_a, |list, _| {
1741            assert_eq!(
1742                list.available_channels().unwrap(),
1743                &[ChannelDetails {
1744                    id: channel_id.to_proto(),
1745                    name: "test-channel".to_string()
1746                }]
1747            )
1748        });
1749        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1750            this.get_channel(channel_id.to_proto(), cx).unwrap()
1751        });
1752        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1753        channel_a
1754            .condition(&cx_a, |channel, _| {
1755                channel_messages(channel)
1756                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1757            })
1758            .await;
1759
1760        let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b, client_b, cx));
1761        channels_b
1762            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1763            .await;
1764        channels_b.read_with(&cx_b, |list, _| {
1765            assert_eq!(
1766                list.available_channels().unwrap(),
1767                &[ChannelDetails {
1768                    id: channel_id.to_proto(),
1769                    name: "test-channel".to_string()
1770                }]
1771            )
1772        });
1773
1774        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1775            this.get_channel(channel_id.to_proto(), cx).unwrap()
1776        });
1777        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1778        channel_b
1779            .condition(&cx_b, |channel, _| {
1780                channel_messages(channel)
1781                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1782            })
1783            .await;
1784
1785        channel_a
1786            .update(&mut cx_a, |channel, cx| {
1787                channel
1788                    .send_message("oh, hi B.".to_string(), cx)
1789                    .unwrap()
1790                    .detach();
1791                let task = channel.send_message("sup".to_string(), cx).unwrap();
1792                assert_eq!(
1793                    channel_messages(channel),
1794                    &[
1795                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1796                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1797                        ("user_a".to_string(), "sup".to_string(), true)
1798                    ]
1799                );
1800                task
1801            })
1802            .await
1803            .unwrap();
1804
1805        channel_b
1806            .condition(&cx_b, |channel, _| {
1807                channel_messages(channel)
1808                    == [
1809                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1810                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1811                        ("user_a".to_string(), "sup".to_string(), false),
1812                    ]
1813            })
1814            .await;
1815
1816        assert_eq!(
1817            server
1818                .state()
1819                .await
1820                .channel(channel_id)
1821                .unwrap()
1822                .connection_ids
1823                .len(),
1824            2
1825        );
1826        cx_b.update(|_| drop(channel_b));
1827        server
1828            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1829            .await;
1830
1831        cx_a.update(|_| drop(channel_a));
1832        server
1833            .condition(|state| state.channel(channel_id).is_none())
1834            .await;
1835    }
1836
1837    #[gpui::test]
1838    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1839        cx_a.foreground().forbid_parking();
1840
1841        let mut server = TestServer::start().await;
1842        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1843
1844        let db = &server.app_state.db;
1845        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1846        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1847        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1848            .await
1849            .unwrap();
1850        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1851            .await
1852            .unwrap();
1853
1854        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1855        channels_a
1856            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1857            .await;
1858        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1859            this.get_channel(channel_id.to_proto(), cx).unwrap()
1860        });
1861
1862        // Messages aren't allowed to be too long.
1863        channel_a
1864            .update(&mut cx_a, |channel, cx| {
1865                let long_body = "this is long.\n".repeat(1024);
1866                channel.send_message(long_body, cx).unwrap()
1867            })
1868            .await
1869            .unwrap_err();
1870
1871        // Messages aren't allowed to be blank.
1872        channel_a.update(&mut cx_a, |channel, cx| {
1873            channel.send_message(String::new(), cx).unwrap_err()
1874        });
1875
1876        // Leading and trailing whitespace are trimmed.
1877        channel_a
1878            .update(&mut cx_a, |channel, cx| {
1879                channel
1880                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
1881                    .unwrap()
1882            })
1883            .await
1884            .unwrap();
1885        assert_eq!(
1886            db.get_channel_messages(channel_id, 10, None)
1887                .await
1888                .unwrap()
1889                .iter()
1890                .map(|m| &m.body)
1891                .collect::<Vec<_>>(),
1892            &["surrounded by whitespace"]
1893        );
1894    }
1895
1896    #[gpui::test]
1897    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1898        cx_a.foreground().forbid_parking();
1899
1900        // Connect to a server as 2 clients.
1901        let mut server = TestServer::start().await;
1902        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1903        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1904        let mut status_b = client_b.status();
1905
1906        // Create an org that includes these 2 users.
1907        let db = &server.app_state.db;
1908        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1909        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1910            .await
1911            .unwrap();
1912        db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
1913            .await
1914            .unwrap();
1915
1916        // Create a channel that includes all the users.
1917        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1918        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1919            .await
1920            .unwrap();
1921        db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
1922            .await
1923            .unwrap();
1924        db.create_channel_message(
1925            channel_id,
1926            current_user_id(&user_store_b, &cx_b),
1927            "hello A, it's B.",
1928            OffsetDateTime::now_utc(),
1929            2,
1930        )
1931        .await
1932        .unwrap();
1933
1934        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1935        channels_a
1936            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1937            .await;
1938
1939        channels_a.read_with(&cx_a, |list, _| {
1940            assert_eq!(
1941                list.available_channels().unwrap(),
1942                &[ChannelDetails {
1943                    id: channel_id.to_proto(),
1944                    name: "test-channel".to_string()
1945                }]
1946            )
1947        });
1948        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1949            this.get_channel(channel_id.to_proto(), cx).unwrap()
1950        });
1951        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1952        channel_a
1953            .condition(&cx_a, |channel, _| {
1954                channel_messages(channel)
1955                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1956            })
1957            .await;
1958
1959        let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b.clone(), client_b, cx));
1960        channels_b
1961            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1962            .await;
1963        channels_b.read_with(&cx_b, |list, _| {
1964            assert_eq!(
1965                list.available_channels().unwrap(),
1966                &[ChannelDetails {
1967                    id: channel_id.to_proto(),
1968                    name: "test-channel".to_string()
1969                }]
1970            )
1971        });
1972
1973        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1974            this.get_channel(channel_id.to_proto(), cx).unwrap()
1975        });
1976        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1977        channel_b
1978            .condition(&cx_b, |channel, _| {
1979                channel_messages(channel)
1980                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1981            })
1982            .await;
1983
1984        // Disconnect client B, ensuring we can still access its cached channel data.
1985        server.forbid_connections();
1986        server.disconnect_client(current_user_id(&user_store_b, &cx_b));
1987        while !matches!(
1988            status_b.recv().await,
1989            Some(client::Status::ReconnectionError { .. })
1990        ) {}
1991
1992        channels_b.read_with(&cx_b, |channels, _| {
1993            assert_eq!(
1994                channels.available_channels().unwrap(),
1995                [ChannelDetails {
1996                    id: channel_id.to_proto(),
1997                    name: "test-channel".to_string()
1998                }]
1999            )
2000        });
2001        channel_b.read_with(&cx_b, |channel, _| {
2002            assert_eq!(
2003                channel_messages(channel),
2004                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2005            )
2006        });
2007
2008        // Send a message from client B while it is disconnected.
2009        channel_b
2010            .update(&mut cx_b, |channel, cx| {
2011                let task = channel
2012                    .send_message("can you see this?".to_string(), cx)
2013                    .unwrap();
2014                assert_eq!(
2015                    channel_messages(channel),
2016                    &[
2017                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2018                        ("user_b".to_string(), "can you see this?".to_string(), true)
2019                    ]
2020                );
2021                task
2022            })
2023            .await
2024            .unwrap_err();
2025
2026        // Send a message from client A while B is disconnected.
2027        channel_a
2028            .update(&mut cx_a, |channel, cx| {
2029                channel
2030                    .send_message("oh, hi B.".to_string(), cx)
2031                    .unwrap()
2032                    .detach();
2033                let task = channel.send_message("sup".to_string(), cx).unwrap();
2034                assert_eq!(
2035                    channel_messages(channel),
2036                    &[
2037                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2038                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2039                        ("user_a".to_string(), "sup".to_string(), true)
2040                    ]
2041                );
2042                task
2043            })
2044            .await
2045            .unwrap();
2046
2047        // Give client B a chance to reconnect.
2048        server.allow_connections();
2049        cx_b.foreground().advance_clock(Duration::from_secs(10));
2050
2051        // Verify that B sees the new messages upon reconnection, as well as the message client B
2052        // sent while offline.
2053        channel_b
2054            .condition(&cx_b, |channel, _| {
2055                channel_messages(channel)
2056                    == [
2057                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2058                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2059                        ("user_a".to_string(), "sup".to_string(), false),
2060                        ("user_b".to_string(), "can you see this?".to_string(), false),
2061                    ]
2062            })
2063            .await;
2064
2065        // Ensure client A and B can communicate normally after reconnection.
2066        channel_a
2067            .update(&mut cx_a, |channel, cx| {
2068                channel.send_message("you online?".to_string(), cx).unwrap()
2069            })
2070            .await
2071            .unwrap();
2072        channel_b
2073            .condition(&cx_b, |channel, _| {
2074                channel_messages(channel)
2075                    == [
2076                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2077                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2078                        ("user_a".to_string(), "sup".to_string(), false),
2079                        ("user_b".to_string(), "can you see this?".to_string(), false),
2080                        ("user_a".to_string(), "you online?".to_string(), false),
2081                    ]
2082            })
2083            .await;
2084
2085        channel_b
2086            .update(&mut cx_b, |channel, cx| {
2087                channel.send_message("yep".to_string(), cx).unwrap()
2088            })
2089            .await
2090            .unwrap();
2091        channel_a
2092            .condition(&cx_a, |channel, _| {
2093                channel_messages(channel)
2094                    == [
2095                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2096                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2097                        ("user_a".to_string(), "sup".to_string(), false),
2098                        ("user_b".to_string(), "can you see this?".to_string(), false),
2099                        ("user_a".to_string(), "you online?".to_string(), false),
2100                        ("user_b".to_string(), "yep".to_string(), false),
2101                    ]
2102            })
2103            .await;
2104    }
2105
2106    #[gpui::test]
2107    async fn test_collaborators(
2108        mut cx_a: TestAppContext,
2109        mut cx_b: TestAppContext,
2110        mut cx_c: TestAppContext,
2111    ) {
2112        cx_a.foreground().forbid_parking();
2113        let lang_registry = Arc::new(LanguageRegistry::new());
2114
2115        // Connect to a server as 3 clients.
2116        let mut server = TestServer::start().await;
2117        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
2118        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
2119        let (_client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await;
2120
2121        let fs = Arc::new(FakeFs::new());
2122
2123        // Share a worktree as client A.
2124        fs.insert_tree(
2125            "/a",
2126            json!({
2127                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2128            }),
2129        )
2130        .await;
2131
2132        let worktree_a = Worktree::open_local(
2133            client_a.clone(),
2134            "/a".as_ref(),
2135            fs.clone(),
2136            lang_registry.clone(),
2137            &mut cx_a.to_async(),
2138        )
2139        .await
2140        .unwrap();
2141
2142        user_store_a
2143            .condition(&cx_a, |user_store, _| {
2144                collaborators(user_store) == vec![("user_a", vec![("a", vec![])])]
2145            })
2146            .await;
2147        user_store_b
2148            .condition(&cx_b, |user_store, _| {
2149                collaborators(user_store) == vec![("user_a", vec![("a", vec![])])]
2150            })
2151            .await;
2152        user_store_c
2153            .condition(&cx_c, |user_store, _| {
2154                collaborators(user_store) == vec![("user_a", vec![("a", vec![])])]
2155            })
2156            .await;
2157
2158        let worktree_id = worktree_a
2159            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2160            .await
2161            .unwrap();
2162
2163        let _worktree_b = Worktree::open_remote(
2164            client_b.clone(),
2165            worktree_id,
2166            lang_registry.clone(),
2167            &mut cx_b.to_async(),
2168        )
2169        .await
2170        .unwrap();
2171
2172        user_store_a
2173            .condition(&cx_a, |user_store, _| {
2174                collaborators(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2175            })
2176            .await;
2177        user_store_b
2178            .condition(&cx_b, |user_store, _| {
2179                collaborators(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2180            })
2181            .await;
2182        user_store_c
2183            .condition(&cx_c, |user_store, _| {
2184                collaborators(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2185            })
2186            .await;
2187
2188        cx_a.update(move |_| drop(worktree_a));
2189        user_store_a
2190            .condition(&cx_a, |user_store, _| collaborators(user_store) == vec![])
2191            .await;
2192        user_store_b
2193            .condition(&cx_b, |user_store, _| collaborators(user_store) == vec![])
2194            .await;
2195        user_store_c
2196            .condition(&cx_c, |user_store, _| collaborators(user_store) == vec![])
2197            .await;
2198
2199        fn collaborators(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2200            user_store
2201                .collaborators()
2202                .iter()
2203                .map(|collaborator| {
2204                    let worktrees = collaborator
2205                        .worktrees
2206                        .iter()
2207                        .map(|w| {
2208                            (
2209                                w.root_name.as_str(),
2210                                w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2211                            )
2212                        })
2213                        .collect();
2214                    (collaborator.user.github_login.as_str(), worktrees)
2215                })
2216                .collect()
2217        }
2218    }
2219
2220    struct TestServer {
2221        peer: Arc<Peer>,
2222        app_state: Arc<AppState>,
2223        server: Arc<Server>,
2224        notifications: mpsc::Receiver<()>,
2225        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2226        forbid_connections: Arc<AtomicBool>,
2227        _test_db: TestDb,
2228    }
2229
2230    impl TestServer {
2231        async fn start() -> Self {
2232            let test_db = TestDb::new();
2233            let app_state = Self::build_app_state(&test_db).await;
2234            let peer = Peer::new();
2235            let notifications = mpsc::channel(128);
2236            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2237            Self {
2238                peer,
2239                app_state,
2240                server,
2241                notifications: notifications.1,
2242                connection_killers: Default::default(),
2243                forbid_connections: Default::default(),
2244                _test_db: test_db,
2245            }
2246        }
2247
2248        async fn create_client(
2249            &mut self,
2250            cx: &mut TestAppContext,
2251            name: &str,
2252        ) -> (Arc<Client>, ModelHandle<UserStore>) {
2253            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2254            let client_name = name.to_string();
2255            let mut client = Client::new();
2256            let server = self.server.clone();
2257            let connection_killers = self.connection_killers.clone();
2258            let forbid_connections = self.forbid_connections.clone();
2259            Arc::get_mut(&mut client)
2260                .unwrap()
2261                .override_authenticate(move |cx| {
2262                    cx.spawn(|_| async move {
2263                        let access_token = "the-token".to_string();
2264                        Ok(Credentials {
2265                            user_id: user_id.0 as u64,
2266                            access_token,
2267                        })
2268                    })
2269                })
2270                .override_establish_connection(move |credentials, cx| {
2271                    assert_eq!(credentials.user_id, user_id.0 as u64);
2272                    assert_eq!(credentials.access_token, "the-token");
2273
2274                    let server = server.clone();
2275                    let connection_killers = connection_killers.clone();
2276                    let forbid_connections = forbid_connections.clone();
2277                    let client_name = client_name.clone();
2278                    cx.spawn(move |cx| async move {
2279                        if forbid_connections.load(SeqCst) {
2280                            Err(EstablishConnectionError::other(anyhow!(
2281                                "server is forbidding connections"
2282                            )))
2283                        } else {
2284                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2285                            connection_killers.lock().insert(user_id, kill_conn);
2286                            cx.background()
2287                                .spawn(server.handle_connection(server_conn, client_name, user_id))
2288                                .detach();
2289                            Ok(client_conn)
2290                        }
2291                    })
2292                });
2293
2294            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2295            client
2296                .authenticate_and_connect(&cx.to_async())
2297                .await
2298                .unwrap();
2299
2300            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2301            let mut authed_user =
2302                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2303            while authed_user.recv().await.unwrap().is_none() {}
2304
2305            (client, user_store)
2306        }
2307
2308        fn disconnect_client(&self, user_id: UserId) {
2309            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2310                let _ = kill_conn.try_send(Some(()));
2311            }
2312        }
2313
2314        fn forbid_connections(&self) {
2315            self.forbid_connections.store(true, SeqCst);
2316        }
2317
2318        fn allow_connections(&self) {
2319            self.forbid_connections.store(false, SeqCst);
2320        }
2321
2322        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2323            let mut config = Config::default();
2324            config.session_secret = "a".repeat(32);
2325            config.database_url = test_db.url.clone();
2326            let github_client = github::AppClient::test();
2327            Arc::new(AppState {
2328                db: test_db.db().clone(),
2329                handlebars: Default::default(),
2330                auth_client: auth::build_client("", ""),
2331                repo_client: github::RepoClient::test(&github_client),
2332                github_client,
2333                config,
2334            })
2335        }
2336
2337        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2338            self.server.store.read()
2339        }
2340
2341        async fn condition<F>(&mut self, mut predicate: F)
2342        where
2343            F: FnMut(&Store) -> bool,
2344        {
2345            async_std::future::timeout(Duration::from_millis(500), async {
2346                while !(predicate)(&*self.server.store.read()) {
2347                    self.notifications.recv().await;
2348                }
2349            })
2350            .await
2351            .expect("condition timed out");
2352        }
2353    }
2354
2355    impl Drop for TestServer {
2356        fn drop(&mut self) {
2357            task::block_on(self.peer.reset());
2358        }
2359    }
2360
2361    fn current_user_id(user_store: &ModelHandle<UserStore>, cx: &TestAppContext) -> UserId {
2362        UserId::from_proto(
2363            user_store.read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2364        )
2365    }
2366
2367    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2368        channel
2369            .messages()
2370            .cursor::<()>()
2371            .map(|m| {
2372                (
2373                    m.sender.github_login.clone(),
2374                    m.body.clone(),
2375                    m.is_pending(),
2376                )
2377            })
2378            .collect()
2379    }
2380
2381    struct EmptyView;
2382
2383    impl gpui::Entity for EmptyView {
2384        type Event = ();
2385    }
2386
2387    impl gpui::View for EmptyView {
2388        fn ui_name() -> &'static str {
2389            "empty view"
2390        }
2391
2392        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2393            gpui::Element::boxed(gpui::elements::Empty)
2394        }
2395    }
2396}