rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use futures::{future::BoxFuture, FutureExt};
  12use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  13use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{
  20    any::TypeId,
  21    collections::{HashMap, HashSet},
  22    future::Future,
  23    mem,
  24    sync::Arc,
  25    time::Instant,
  26};
  27use store::{Store, Worktree};
  28use surf::StatusCode;
  29use tide::log;
  30use tide::{
  31    http::headers::{HeaderName, CONNECTION, UPGRADE},
  32    Request, Response,
  33};
  34use time::OffsetDateTime;
  35
  36type MessageHandler = Box<
  37    dyn Send
  38        + Sync
  39        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  40>;
  41
  42pub struct Server {
  43    peer: Arc<Peer>,
  44    store: RwLock<Store>,
  45    app_state: Arc<AppState>,
  46    handlers: HashMap<TypeId, MessageHandler>,
  47    notifications: Option<mpsc::Sender<()>>,
  48}
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_handler(Server::ping)
  69            .add_handler(Server::open_worktree)
  70            .add_handler(Server::close_worktree)
  71            .add_handler(Server::share_worktree)
  72            .add_handler(Server::unshare_worktree)
  73            .add_handler(Server::join_worktree)
  74            .add_handler(Server::leave_worktree)
  75            .add_handler(Server::update_worktree)
  76            .add_handler(Server::open_buffer)
  77            .add_handler(Server::close_buffer)
  78            .add_handler(Server::update_buffer)
  79            .add_handler(Server::buffer_saved)
  80            .add_handler(Server::save_buffer)
  81            .add_handler(Server::get_channels)
  82            .add_handler(Server::get_users)
  83            .add_handler(Server::join_channel)
  84            .add_handler(Server::leave_channel)
  85            .add_handler(Server::send_channel_message)
  86            .add_handler(Server::get_channel_messages);
  87
  88        Arc::new(server)
  89    }
  90
  91    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  92    where
  93        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  94        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  95        M: EnvelopedMessage,
  96    {
  97        let prev_handler = self.handlers.insert(
  98            TypeId::of::<M>(),
  99            Box::new(move |server, envelope| {
 100                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 101                (handler)(server, *envelope).boxed()
 102            }),
 103        );
 104        if prev_handler.is_some() {
 105            panic!("registered a handler for the same message twice");
 106        }
 107        self
 108    }
 109
 110    pub fn handle_connection(
 111        self: &Arc<Self>,
 112        connection: Connection,
 113        addr: String,
 114        user_id: UserId,
 115    ) -> impl Future<Output = ()> {
 116        let mut this = self.clone();
 117        async move {
 118            let (connection_id, handle_io, mut incoming_rx) =
 119                this.peer.add_connection(connection).await;
 120            this.state_mut().add_connection(connection_id, user_id);
 121            if let Err(err) = this.update_collaborators_for_users(&[user_id]).await {
 122                log::error!("error updating collaborators for {:?}: {}", user_id, err);
 123            }
 124
 125            let handle_io = handle_io.fuse();
 126            futures::pin_mut!(handle_io);
 127            loop {
 128                let next_message = incoming_rx.recv().fuse();
 129                futures::pin_mut!(next_message);
 130                futures::select_biased! {
 131                    message = next_message => {
 132                        if let Some(message) = message {
 133                            let start_time = Instant::now();
 134                            log::info!("RPC message received: {}", message.payload_type_name());
 135                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 136                                if let Err(err) = (handler)(this.clone(), message).await {
 137                                    log::error!("error handling message: {:?}", err);
 138                                } else {
 139                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 140                                }
 141
 142                                if let Some(mut notifications) = this.notifications.clone() {
 143                                    let _ = notifications.send(()).await;
 144                                }
 145                            } else {
 146                                log::warn!("unhandled message: {}", message.payload_type_name());
 147                            }
 148                        } else {
 149                            log::info!("rpc connection closed {:?}", addr);
 150                            break;
 151                        }
 152                    }
 153                    handle_io = handle_io => {
 154                        if let Err(err) = handle_io {
 155                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 156                        }
 157                        break;
 158                    }
 159                }
 160            }
 161
 162            if let Err(err) = this.sign_out(connection_id).await {
 163                log::error!("error signing out connection {:?} - {:?}", addr, err);
 164            }
 165        }
 166    }
 167
 168    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 169        self.peer.disconnect(connection_id).await;
 170        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 171
 172        for (worktree_id, worktree) in removed_connection.hosted_worktrees {
 173            if let Some(share) = worktree.share {
 174                broadcast(
 175                    connection_id,
 176                    share.guest_connection_ids.keys().copied().collect(),
 177                    |conn_id| {
 178                        self.peer
 179                            .send(conn_id, proto::UnshareWorktree { worktree_id })
 180                    },
 181                )
 182                .await?;
 183            }
 184        }
 185
 186        for (worktree_id, peer_ids) in removed_connection.guest_worktree_ids {
 187            broadcast(connection_id, peer_ids, |conn_id| {
 188                self.peer.send(
 189                    conn_id,
 190                    proto::RemovePeer {
 191                        worktree_id,
 192                        peer_id: connection_id.0,
 193                    },
 194                )
 195            })
 196            .await?;
 197        }
 198
 199        self.update_collaborators_for_users(removed_connection.collaborator_ids.iter())
 200            .await?;
 201
 202        Ok(())
 203    }
 204
 205    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 206        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 207        Ok(())
 208    }
 209
 210    async fn open_worktree(
 211        mut self: Arc<Server>,
 212        request: TypedEnvelope<proto::OpenWorktree>,
 213    ) -> tide::Result<()> {
 214        let receipt = request.receipt();
 215        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 216
 217        let mut collaborator_user_ids = HashSet::new();
 218        collaborator_user_ids.insert(host_user_id);
 219        for github_login in request.payload.collaborator_logins {
 220            match self.app_state.db.create_user(&github_login, false).await {
 221                Ok(collaborator_user_id) => {
 222                    collaborator_user_ids.insert(collaborator_user_id);
 223                }
 224                Err(err) => {
 225                    let message = err.to_string();
 226                    self.peer
 227                        .respond_with_error(receipt, proto::Error { message })
 228                        .await?;
 229                    return Ok(());
 230                }
 231            }
 232        }
 233
 234        let collaborator_user_ids = collaborator_user_ids.into_iter().collect::<Vec<_>>();
 235        let worktree_id = self.state_mut().add_worktree(Worktree {
 236            host_connection_id: request.sender_id,
 237            collaborator_user_ids: collaborator_user_ids.clone(),
 238            root_name: request.payload.root_name,
 239            share: None,
 240        });
 241
 242        self.peer
 243            .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
 244            .await?;
 245        self.update_collaborators_for_users(&collaborator_user_ids)
 246            .await?;
 247
 248        Ok(())
 249    }
 250
 251    async fn close_worktree(
 252        mut self: Arc<Server>,
 253        request: TypedEnvelope<proto::CloseWorktree>,
 254    ) -> tide::Result<()> {
 255        let worktree_id = request.payload.worktree_id;
 256        let worktree = self
 257            .state_mut()
 258            .remove_worktree(worktree_id, request.sender_id)?;
 259
 260        if let Some(share) = worktree.share {
 261            broadcast(
 262                request.sender_id,
 263                share.guest_connection_ids.keys().copied().collect(),
 264                |conn_id| {
 265                    self.peer
 266                        .send(conn_id, proto::UnshareWorktree { worktree_id })
 267                },
 268            )
 269            .await?;
 270        }
 271        self.update_collaborators_for_users(&worktree.collaborator_user_ids)
 272            .await?;
 273        Ok(())
 274    }
 275
 276    async fn share_worktree(
 277        mut self: Arc<Server>,
 278        mut request: TypedEnvelope<proto::ShareWorktree>,
 279    ) -> tide::Result<()> {
 280        let worktree = request
 281            .payload
 282            .worktree
 283            .as_mut()
 284            .ok_or_else(|| anyhow!("missing worktree"))?;
 285        let entries = mem::take(&mut worktree.entries)
 286            .into_iter()
 287            .map(|entry| (entry.id, entry))
 288            .collect();
 289
 290        let collaborator_user_ids =
 291            self.state_mut()
 292                .share_worktree(worktree.id, request.sender_id, entries);
 293        if let Some(collaborator_user_ids) = collaborator_user_ids {
 294            self.peer
 295                .respond(request.receipt(), proto::ShareWorktreeResponse {})
 296                .await?;
 297            self.update_collaborators_for_users(&collaborator_user_ids)
 298                .await?;
 299        } else {
 300            self.peer
 301                .respond_with_error(
 302                    request.receipt(),
 303                    proto::Error {
 304                        message: "no such worktree".to_string(),
 305                    },
 306                )
 307                .await?;
 308        }
 309        Ok(())
 310    }
 311
 312    async fn unshare_worktree(
 313        mut self: Arc<Server>,
 314        request: TypedEnvelope<proto::UnshareWorktree>,
 315    ) -> tide::Result<()> {
 316        let worktree_id = request.payload.worktree_id;
 317        let worktree = self
 318            .state_mut()
 319            .unshare_worktree(worktree_id, request.sender_id)?;
 320
 321        broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
 322            self.peer
 323                .send(conn_id, proto::UnshareWorktree { worktree_id })
 324        })
 325        .await?;
 326        self.update_collaborators_for_users(&worktree.collaborator_ids)
 327            .await?;
 328
 329        Ok(())
 330    }
 331
 332    async fn join_worktree(
 333        mut self: Arc<Server>,
 334        request: TypedEnvelope<proto::JoinWorktree>,
 335    ) -> tide::Result<()> {
 336        let worktree_id = request.payload.worktree_id;
 337
 338        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 339        let response_data = self
 340            .state_mut()
 341            .join_worktree(request.sender_id, user_id, worktree_id)
 342            .and_then(|joined| {
 343                let share = joined.worktree.share()?;
 344                let peer_count = share.guest_connection_ids.len();
 345                let mut peers = Vec::with_capacity(peer_count);
 346                peers.push(proto::Peer {
 347                    peer_id: joined.worktree.host_connection_id.0,
 348                    replica_id: 0,
 349                });
 350                for (peer_conn_id, peer_replica_id) in &share.guest_connection_ids {
 351                    if *peer_conn_id != request.sender_id {
 352                        peers.push(proto::Peer {
 353                            peer_id: peer_conn_id.0,
 354                            replica_id: *peer_replica_id as u32,
 355                        });
 356                    }
 357                }
 358                let response = proto::JoinWorktreeResponse {
 359                    worktree: Some(proto::Worktree {
 360                        id: worktree_id,
 361                        root_name: joined.worktree.root_name.clone(),
 362                        entries: share.entries.values().cloned().collect(),
 363                    }),
 364                    replica_id: joined.replica_id as u32,
 365                    peers,
 366                };
 367                let connection_ids = joined.worktree.connection_ids();
 368                let collaborator_user_ids = joined.worktree.collaborator_user_ids.clone();
 369                Ok((response, connection_ids, collaborator_user_ids))
 370            });
 371
 372        match response_data {
 373            Ok((response, connection_ids, collaborator_user_ids)) => {
 374                broadcast(request.sender_id, connection_ids, |conn_id| {
 375                    self.peer.send(
 376                        conn_id,
 377                        proto::AddPeer {
 378                            worktree_id,
 379                            peer: Some(proto::Peer {
 380                                peer_id: request.sender_id.0,
 381                                replica_id: response.replica_id,
 382                            }),
 383                        },
 384                    )
 385                })
 386                .await?;
 387                self.peer.respond(request.receipt(), response).await?;
 388                self.update_collaborators_for_users(&collaborator_user_ids)
 389                    .await?;
 390            }
 391            Err(error) => {
 392                self.peer
 393                    .respond_with_error(
 394                        request.receipt(),
 395                        proto::Error {
 396                            message: error.to_string(),
 397                        },
 398                    )
 399                    .await?;
 400            }
 401        }
 402
 403        Ok(())
 404    }
 405
 406    async fn leave_worktree(
 407        mut self: Arc<Server>,
 408        request: TypedEnvelope<proto::LeaveWorktree>,
 409    ) -> tide::Result<()> {
 410        let sender_id = request.sender_id;
 411        let worktree_id = request.payload.worktree_id;
 412        let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
 413        if let Some(worktree) = worktree {
 414            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 415                self.peer.send(
 416                    conn_id,
 417                    proto::RemovePeer {
 418                        worktree_id,
 419                        peer_id: sender_id.0,
 420                    },
 421                )
 422            })
 423            .await?;
 424            self.update_collaborators_for_users(&worktree.collaborator_ids)
 425                .await?;
 426        }
 427        Ok(())
 428    }
 429
 430    async fn update_worktree(
 431        mut self: Arc<Server>,
 432        request: TypedEnvelope<proto::UpdateWorktree>,
 433    ) -> tide::Result<()> {
 434        let connection_ids = self.state_mut().update_worktree(
 435            request.sender_id,
 436            request.payload.worktree_id,
 437            &request.payload.removed_entries,
 438            &request.payload.updated_entries,
 439        )?;
 440
 441        broadcast(request.sender_id, connection_ids, |connection_id| {
 442            self.peer
 443                .forward_send(request.sender_id, connection_id, request.payload.clone())
 444        })
 445        .await?;
 446
 447        Ok(())
 448    }
 449
 450    async fn open_buffer(
 451        self: Arc<Server>,
 452        request: TypedEnvelope<proto::OpenBuffer>,
 453    ) -> tide::Result<()> {
 454        let receipt = request.receipt();
 455        let host_connection_id = self
 456            .state()
 457            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 458        let response = self
 459            .peer
 460            .forward_request(request.sender_id, host_connection_id, request.payload)
 461            .await?;
 462        self.peer.respond(receipt, response).await?;
 463        Ok(())
 464    }
 465
 466    async fn close_buffer(
 467        self: Arc<Server>,
 468        request: TypedEnvelope<proto::CloseBuffer>,
 469    ) -> tide::Result<()> {
 470        let host_connection_id = self
 471            .state()
 472            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 473        self.peer
 474            .forward_send(request.sender_id, host_connection_id, request.payload)
 475            .await?;
 476        Ok(())
 477    }
 478
 479    async fn save_buffer(
 480        self: Arc<Server>,
 481        request: TypedEnvelope<proto::SaveBuffer>,
 482    ) -> tide::Result<()> {
 483        let host;
 484        let guests;
 485        {
 486            let state = self.state();
 487            host = state
 488                .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 489            guests = state
 490                .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
 491        }
 492
 493        let sender = request.sender_id;
 494        let receipt = request.receipt();
 495        let response = self
 496            .peer
 497            .forward_request(sender, host, request.payload.clone())
 498            .await?;
 499
 500        broadcast(host, guests, |conn_id| {
 501            let response = response.clone();
 502            let peer = &self.peer;
 503            async move {
 504                if conn_id == sender {
 505                    peer.respond(receipt, response).await
 506                } else {
 507                    peer.forward_send(host, conn_id, response).await
 508                }
 509            }
 510        })
 511        .await?;
 512
 513        Ok(())
 514    }
 515
 516    async fn update_buffer(
 517        self: Arc<Server>,
 518        request: TypedEnvelope<proto::UpdateBuffer>,
 519    ) -> tide::Result<()> {
 520        let receiver_ids = self
 521            .state()
 522            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 523        broadcast(request.sender_id, receiver_ids, |connection_id| {
 524            self.peer
 525                .forward_send(request.sender_id, connection_id, request.payload.clone())
 526        })
 527        .await?;
 528        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 529        Ok(())
 530    }
 531
 532    async fn buffer_saved(
 533        self: Arc<Server>,
 534        request: TypedEnvelope<proto::BufferSaved>,
 535    ) -> tide::Result<()> {
 536        let receiver_ids = self
 537            .state()
 538            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 539        broadcast(request.sender_id, receiver_ids, |connection_id| {
 540            self.peer
 541                .forward_send(request.sender_id, connection_id, request.payload.clone())
 542        })
 543        .await?;
 544        Ok(())
 545    }
 546
 547    async fn get_channels(
 548        self: Arc<Server>,
 549        request: TypedEnvelope<proto::GetChannels>,
 550    ) -> tide::Result<()> {
 551        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 552        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 553        self.peer
 554            .respond(
 555                request.receipt(),
 556                proto::GetChannelsResponse {
 557                    channels: channels
 558                        .into_iter()
 559                        .map(|chan| proto::Channel {
 560                            id: chan.id.to_proto(),
 561                            name: chan.name,
 562                        })
 563                        .collect(),
 564                },
 565            )
 566            .await?;
 567        Ok(())
 568    }
 569
 570    async fn get_users(
 571        self: Arc<Server>,
 572        request: TypedEnvelope<proto::GetUsers>,
 573    ) -> tide::Result<()> {
 574        let receipt = request.receipt();
 575        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 576        let users = self
 577            .app_state
 578            .db
 579            .get_users_by_ids(user_ids)
 580            .await?
 581            .into_iter()
 582            .map(|user| proto::User {
 583                id: user.id.to_proto(),
 584                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 585                github_login: user.github_login,
 586            })
 587            .collect();
 588        self.peer
 589            .respond(receipt, proto::GetUsersResponse { users })
 590            .await?;
 591        Ok(())
 592    }
 593
 594    async fn update_collaborators_for_users<'a>(
 595        self: &Arc<Server>,
 596        user_ids: impl IntoIterator<Item = &'a UserId>,
 597    ) -> tide::Result<()> {
 598        let mut send_futures = Vec::new();
 599
 600        {
 601            let state = self.state();
 602            for user_id in user_ids {
 603                let collaborators = state.collaborators_for_user(*user_id);
 604                for connection_id in state.connection_ids_for_user(*user_id) {
 605                    send_futures.push(self.peer.send(
 606                        connection_id,
 607                        proto::UpdateCollaborators {
 608                            collaborators: collaborators.clone(),
 609                        },
 610                    ));
 611                }
 612            }
 613        }
 614        futures::future::try_join_all(send_futures).await?;
 615
 616        Ok(())
 617    }
 618
 619    async fn join_channel(
 620        mut self: Arc<Self>,
 621        request: TypedEnvelope<proto::JoinChannel>,
 622    ) -> tide::Result<()> {
 623        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 624        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 625        if !self
 626            .app_state
 627            .db
 628            .can_user_access_channel(user_id, channel_id)
 629            .await?
 630        {
 631            Err(anyhow!("access denied"))?;
 632        }
 633
 634        self.state_mut().join_channel(request.sender_id, channel_id);
 635        let messages = self
 636            .app_state
 637            .db
 638            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 639            .await?
 640            .into_iter()
 641            .map(|msg| proto::ChannelMessage {
 642                id: msg.id.to_proto(),
 643                body: msg.body,
 644                timestamp: msg.sent_at.unix_timestamp() as u64,
 645                sender_id: msg.sender_id.to_proto(),
 646                nonce: Some(msg.nonce.as_u128().into()),
 647            })
 648            .collect::<Vec<_>>();
 649        self.peer
 650            .respond(
 651                request.receipt(),
 652                proto::JoinChannelResponse {
 653                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 654                    messages,
 655                },
 656            )
 657            .await?;
 658        Ok(())
 659    }
 660
 661    async fn leave_channel(
 662        mut self: Arc<Self>,
 663        request: TypedEnvelope<proto::LeaveChannel>,
 664    ) -> tide::Result<()> {
 665        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 666        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 667        if !self
 668            .app_state
 669            .db
 670            .can_user_access_channel(user_id, channel_id)
 671            .await?
 672        {
 673            Err(anyhow!("access denied"))?;
 674        }
 675
 676        self.state_mut()
 677            .leave_channel(request.sender_id, channel_id);
 678
 679        Ok(())
 680    }
 681
 682    async fn send_channel_message(
 683        self: Arc<Self>,
 684        request: TypedEnvelope<proto::SendChannelMessage>,
 685    ) -> tide::Result<()> {
 686        let receipt = request.receipt();
 687        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 688        let user_id;
 689        let connection_ids;
 690        {
 691            let state = self.state();
 692            user_id = state.user_id_for_connection(request.sender_id)?;
 693            if let Some(ids) = state.channel_connection_ids(channel_id) {
 694                connection_ids = ids;
 695            } else {
 696                return Ok(());
 697            }
 698        }
 699
 700        // Validate the message body.
 701        let body = request.payload.body.trim().to_string();
 702        if body.len() > MAX_MESSAGE_LEN {
 703            self.peer
 704                .respond_with_error(
 705                    receipt,
 706                    proto::Error {
 707                        message: "message is too long".to_string(),
 708                    },
 709                )
 710                .await?;
 711            return Ok(());
 712        }
 713        if body.is_empty() {
 714            self.peer
 715                .respond_with_error(
 716                    receipt,
 717                    proto::Error {
 718                        message: "message can't be blank".to_string(),
 719                    },
 720                )
 721                .await?;
 722            return Ok(());
 723        }
 724
 725        let timestamp = OffsetDateTime::now_utc();
 726        let nonce = if let Some(nonce) = request.payload.nonce {
 727            nonce
 728        } else {
 729            self.peer
 730                .respond_with_error(
 731                    receipt,
 732                    proto::Error {
 733                        message: "nonce can't be blank".to_string(),
 734                    },
 735                )
 736                .await?;
 737            return Ok(());
 738        };
 739
 740        let message_id = self
 741            .app_state
 742            .db
 743            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 744            .await?
 745            .to_proto();
 746        let message = proto::ChannelMessage {
 747            sender_id: user_id.to_proto(),
 748            id: message_id,
 749            body,
 750            timestamp: timestamp.unix_timestamp() as u64,
 751            nonce: Some(nonce),
 752        };
 753        broadcast(request.sender_id, connection_ids, |conn_id| {
 754            self.peer.send(
 755                conn_id,
 756                proto::ChannelMessageSent {
 757                    channel_id: channel_id.to_proto(),
 758                    message: Some(message.clone()),
 759                },
 760            )
 761        })
 762        .await?;
 763        self.peer
 764            .respond(
 765                receipt,
 766                proto::SendChannelMessageResponse {
 767                    message: Some(message),
 768                },
 769            )
 770            .await?;
 771        Ok(())
 772    }
 773
 774    async fn get_channel_messages(
 775        self: Arc<Self>,
 776        request: TypedEnvelope<proto::GetChannelMessages>,
 777    ) -> tide::Result<()> {
 778        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 779        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 780        if !self
 781            .app_state
 782            .db
 783            .can_user_access_channel(user_id, channel_id)
 784            .await?
 785        {
 786            Err(anyhow!("access denied"))?;
 787        }
 788
 789        let messages = self
 790            .app_state
 791            .db
 792            .get_channel_messages(
 793                channel_id,
 794                MESSAGE_COUNT_PER_PAGE,
 795                Some(MessageId::from_proto(request.payload.before_message_id)),
 796            )
 797            .await?
 798            .into_iter()
 799            .map(|msg| proto::ChannelMessage {
 800                id: msg.id.to_proto(),
 801                body: msg.body,
 802                timestamp: msg.sent_at.unix_timestamp() as u64,
 803                sender_id: msg.sender_id.to_proto(),
 804                nonce: Some(msg.nonce.as_u128().into()),
 805            })
 806            .collect::<Vec<_>>();
 807        self.peer
 808            .respond(
 809                request.receipt(),
 810                proto::GetChannelMessagesResponse {
 811                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 812                    messages,
 813                },
 814            )
 815            .await?;
 816        Ok(())
 817    }
 818
 819    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 820        self.store.read()
 821    }
 822
 823    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 824        self.store.write()
 825    }
 826}
 827
 828pub async fn broadcast<F, T>(
 829    sender_id: ConnectionId,
 830    receiver_ids: Vec<ConnectionId>,
 831    mut f: F,
 832) -> anyhow::Result<()>
 833where
 834    F: FnMut(ConnectionId) -> T,
 835    T: Future<Output = anyhow::Result<()>>,
 836{
 837    let futures = receiver_ids
 838        .into_iter()
 839        .filter(|id| *id != sender_id)
 840        .map(|id| f(id));
 841    futures::future::try_join_all(futures).await?;
 842    Ok(())
 843}
 844
 845pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 846    let server = Server::new(app.state().clone(), rpc.clone(), None);
 847    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 848        let server = server.clone();
 849        async move {
 850            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 851
 852            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 853            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 854            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 855            let client_protocol_version: Option<u32> = request
 856                .header("X-Zed-Protocol-Version")
 857                .and_then(|v| v.as_str().parse().ok());
 858
 859            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 860                return Ok(Response::new(StatusCode::UpgradeRequired));
 861            }
 862
 863            let header = match request.header("Sec-Websocket-Key") {
 864                Some(h) => h.as_str(),
 865                None => return Err(anyhow!("expected sec-websocket-key"))?,
 866            };
 867
 868            let user_id = process_auth_header(&request).await?;
 869
 870            let mut response = Response::new(StatusCode::SwitchingProtocols);
 871            response.insert_header(UPGRADE, "websocket");
 872            response.insert_header(CONNECTION, "Upgrade");
 873            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 874            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 875            response.insert_header("Sec-Websocket-Version", "13");
 876
 877            let http_res: &mut tide::http::Response = response.as_mut();
 878            let upgrade_receiver = http_res.recv_upgrade().await;
 879            let addr = request.remote().unwrap_or("unknown").to_string();
 880            task::spawn(async move {
 881                if let Some(stream) = upgrade_receiver.await {
 882                    server
 883                        .handle_connection(
 884                            Connection::new(
 885                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 886                            ),
 887                            addr,
 888                            user_id,
 889                        )
 890                        .await;
 891                }
 892            });
 893
 894            Ok(response)
 895        }
 896    });
 897}
 898
 899fn header_contains_ignore_case<T>(
 900    request: &tide::Request<T>,
 901    header_name: HeaderName,
 902    value: &str,
 903) -> bool {
 904    request
 905        .header(header_name)
 906        .map(|h| {
 907            h.as_str()
 908                .split(',')
 909                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 910        })
 911        .unwrap_or(false)
 912}
 913
 914#[cfg(test)]
 915mod tests {
 916    use super::*;
 917    use crate::{
 918        auth,
 919        db::{tests::TestDb, UserId},
 920        github, AppState, Config,
 921    };
 922    use ::rpc::Peer;
 923    use async_std::task;
 924    use gpui::{ModelHandle, TestAppContext};
 925    use parking_lot::Mutex;
 926    use postage::{mpsc, watch};
 927    use serde_json::json;
 928    use sqlx::types::time::OffsetDateTime;
 929    use std::{
 930        path::Path,
 931        sync::{
 932            atomic::{AtomicBool, Ordering::SeqCst},
 933            Arc,
 934        },
 935        time::Duration,
 936    };
 937    use zed::{
 938        client::{
 939            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
 940            EstablishConnectionError, UserStore,
 941        },
 942        editor::{Editor, EditorSettings, Input},
 943        fs::{FakeFs, Fs as _},
 944        language::{
 945            tree_sitter_rust, Diagnostic, Language, LanguageConfig, LanguageRegistry,
 946            LanguageServerConfig, Point,
 947        },
 948        lsp,
 949        people_panel::JoinWorktree,
 950        project::{ProjectPath, Worktree},
 951        workspace::{Workspace, WorkspaceParams},
 952    };
 953
 954    #[gpui::test]
 955    async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
 956        let (window_b, _) = cx_b.add_window(|_| EmptyView);
 957        let lang_registry = Arc::new(LanguageRegistry::new());
 958
 959        // Connect to a server as 2 clients.
 960        let mut server = TestServer::start().await;
 961        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
 962        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
 963
 964        cx_a.foreground().forbid_parking();
 965
 966        // Share a local worktree as client A
 967        let fs = Arc::new(FakeFs::new());
 968        fs.insert_tree(
 969            "/a",
 970            json!({
 971                ".zed.toml": r#"collaborators = ["user_b"]"#,
 972                "a.txt": "a-contents",
 973                "b.txt": "b-contents",
 974            }),
 975        )
 976        .await;
 977        let worktree_a = Worktree::open_local(
 978            client_a.clone(),
 979            "/a".as_ref(),
 980            fs,
 981            lang_registry.clone(),
 982            &mut cx_a.to_async(),
 983        )
 984        .await
 985        .unwrap();
 986        worktree_a
 987            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
 988            .await;
 989        let worktree_id = worktree_a
 990            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
 991            .await
 992            .unwrap();
 993
 994        // Join that worktree as client B, and see that a guest has joined as client A.
 995        let worktree_b = Worktree::open_remote(
 996            client_b.clone(),
 997            worktree_id,
 998            lang_registry.clone(),
 999            &mut cx_b.to_async(),
1000        )
1001        .await
1002        .unwrap();
1003        let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
1004        worktree_a
1005            .condition(&cx_a, |tree, _| {
1006                tree.peers()
1007                    .values()
1008                    .any(|replica_id| *replica_id == replica_id_b)
1009            })
1010            .await;
1011
1012        // Open the same file as client B and client A.
1013        let buffer_b = worktree_b
1014            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1015            .await
1016            .unwrap();
1017        buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1018        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1019        let buffer_a = worktree_a
1020            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1021            .await
1022            .unwrap();
1023
1024        // Create a selection set as client B and see that selection set as client A.
1025        let editor_b = cx_b.add_view(window_b, |cx| {
1026            Editor::for_buffer(buffer_b, |cx| EditorSettings::test(cx), cx)
1027        });
1028        buffer_a
1029            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1030            .await;
1031
1032        // Edit the buffer as client B and see that edit as client A.
1033        editor_b.update(&mut cx_b, |editor, cx| {
1034            editor.handle_input(&Input("ok, ".into()), cx)
1035        });
1036        buffer_a
1037            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1038            .await;
1039
1040        // Remove the selection set as client B, see those selections disappear as client A.
1041        cx_b.update(move |_| drop(editor_b));
1042        buffer_a
1043            .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1044            .await;
1045
1046        // Close the buffer as client A, see that the buffer is closed.
1047        cx_a.update(move |_| drop(buffer_a));
1048        worktree_a
1049            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1050            .await;
1051
1052        // Dropping the worktree removes client B from client A's peers.
1053        cx_b.update(move |_| drop(worktree_b));
1054        worktree_a
1055            .condition(&cx_a, |tree, _| tree.peers().is_empty())
1056            .await;
1057    }
1058
1059    #[gpui::test]
1060    async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1061        cx_b.update(zed::people_panel::init);
1062        let lang_registry = Arc::new(LanguageRegistry::new());
1063
1064        // Connect to a server as 2 clients.
1065        let mut server = TestServer::start().await;
1066        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1067        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1068        let mut workspace_b_params = cx_b.update(WorkspaceParams::test);
1069        workspace_b_params.client = client_b;
1070        workspace_b_params.user_store = user_store_b;
1071
1072        cx_a.foreground().forbid_parking();
1073
1074        // Share a local worktree as client A
1075        let fs = Arc::new(FakeFs::new());
1076        fs.insert_tree(
1077            "/a",
1078            json!({
1079                ".zed.toml": r#"collaborators = ["user_b"]"#,
1080                "a.txt": "a-contents",
1081                "b.txt": "b-contents",
1082            }),
1083        )
1084        .await;
1085        let worktree_a = Worktree::open_local(
1086            client_a.clone(),
1087            "/a".as_ref(),
1088            fs,
1089            lang_registry.clone(),
1090            &mut cx_a.to_async(),
1091        )
1092        .await
1093        .unwrap();
1094        worktree_a
1095            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1096            .await;
1097
1098        let remote_worktree_id = worktree_a
1099            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1100            .await
1101            .unwrap();
1102
1103        let (window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&workspace_b_params, cx));
1104        cx_b.update(|cx| {
1105            cx.dispatch_action(
1106                window_b,
1107                vec![workspace_b.id()],
1108                &JoinWorktree(remote_worktree_id),
1109            );
1110        });
1111        workspace_b
1112            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1113            .await;
1114
1115        let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1116            let active_pane = workspace.active_pane().read(cx);
1117            assert!(active_pane.active_item().is_none());
1118            workspace.worktrees(cx).first().unwrap().id()
1119        });
1120        workspace_b
1121            .update(&mut cx_b, |workspace, cx| {
1122                workspace.open_entry(
1123                    ProjectPath {
1124                        worktree_id: local_worktree_id_b,
1125                        path: Path::new("a.txt").into(),
1126                    },
1127                    cx,
1128                )
1129            })
1130            .unwrap()
1131            .await;
1132        workspace_b.read_with(&cx_b, |workspace, cx| {
1133            let active_pane = workspace.active_pane().read(cx);
1134            assert!(active_pane.active_item().is_some());
1135        });
1136
1137        worktree_a.update(&mut cx_a, |tree, cx| {
1138            tree.as_local_mut().unwrap().unshare(cx);
1139        });
1140        workspace_b
1141            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1142            .await;
1143        workspace_b.read_with(&cx_b, |workspace, cx| {
1144            let active_pane = workspace.active_pane().read(cx);
1145            assert!(active_pane.active_item().is_none());
1146        });
1147    }
1148
1149    #[gpui::test]
1150    async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1151        mut cx_a: TestAppContext,
1152        mut cx_b: TestAppContext,
1153        mut cx_c: TestAppContext,
1154    ) {
1155        cx_a.foreground().forbid_parking();
1156        let lang_registry = Arc::new(LanguageRegistry::new());
1157
1158        // Connect to a server as 3 clients.
1159        let mut server = TestServer::start().await;
1160        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1161        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1162        let (client_c, _) = server.create_client(&mut cx_c, "user_c").await;
1163
1164        let fs = Arc::new(FakeFs::new());
1165
1166        // Share a worktree as client A.
1167        fs.insert_tree(
1168            "/a",
1169            json!({
1170                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1171                "file1": "",
1172                "file2": ""
1173            }),
1174        )
1175        .await;
1176
1177        let worktree_a = Worktree::open_local(
1178            client_a.clone(),
1179            "/a".as_ref(),
1180            fs.clone(),
1181            lang_registry.clone(),
1182            &mut cx_a.to_async(),
1183        )
1184        .await
1185        .unwrap();
1186        worktree_a
1187            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1188            .await;
1189        let worktree_id = worktree_a
1190            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1191            .await
1192            .unwrap();
1193
1194        // Join that worktree as clients B and C.
1195        let worktree_b = Worktree::open_remote(
1196            client_b.clone(),
1197            worktree_id,
1198            lang_registry.clone(),
1199            &mut cx_b.to_async(),
1200        )
1201        .await
1202        .unwrap();
1203        let worktree_c = Worktree::open_remote(
1204            client_c.clone(),
1205            worktree_id,
1206            lang_registry.clone(),
1207            &mut cx_c.to_async(),
1208        )
1209        .await
1210        .unwrap();
1211
1212        // Open and edit a buffer as both guests B and C.
1213        let buffer_b = worktree_b
1214            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1215            .await
1216            .unwrap();
1217        let buffer_c = worktree_c
1218            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1219            .await
1220            .unwrap();
1221        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1222        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1223
1224        // Open and edit that buffer as the host.
1225        let buffer_a = worktree_a
1226            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1227            .await
1228            .unwrap();
1229
1230        buffer_a
1231            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1232            .await;
1233        buffer_a.update(&mut cx_a, |buf, cx| {
1234            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1235        });
1236
1237        // Wait for edits to propagate
1238        buffer_a
1239            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1240            .await;
1241        buffer_b
1242            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1243            .await;
1244        buffer_c
1245            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1246            .await;
1247
1248        // Edit the buffer as the host and concurrently save as guest B.
1249        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1250        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1251        save_b.await.unwrap();
1252        assert_eq!(
1253            fs.load("/a/file1".as_ref()).await.unwrap(),
1254            "hi-a, i-am-c, i-am-b, i-am-a"
1255        );
1256        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1257        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1258        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1259
1260        // Make changes on host's file system, see those changes on the guests.
1261        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1262            .await
1263            .unwrap();
1264        fs.insert_file(Path::new("/a/file4"), "4".into())
1265            .await
1266            .unwrap();
1267
1268        worktree_b
1269            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1270            .await;
1271        worktree_c
1272            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1273            .await;
1274        worktree_b.read_with(&cx_b, |tree, _| {
1275            assert_eq!(
1276                tree.paths()
1277                    .map(|p| p.to_string_lossy())
1278                    .collect::<Vec<_>>(),
1279                &[".zed.toml", "file1", "file3", "file4"]
1280            )
1281        });
1282        worktree_c.read_with(&cx_c, |tree, _| {
1283            assert_eq!(
1284                tree.paths()
1285                    .map(|p| p.to_string_lossy())
1286                    .collect::<Vec<_>>(),
1287                &[".zed.toml", "file1", "file3", "file4"]
1288            )
1289        });
1290    }
1291
1292    #[gpui::test]
1293    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1294        cx_a.foreground().forbid_parking();
1295        let lang_registry = Arc::new(LanguageRegistry::new());
1296
1297        // Connect to a server as 2 clients.
1298        let mut server = TestServer::start().await;
1299        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1300        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1301
1302        // Share a local worktree as client A
1303        let fs = Arc::new(FakeFs::new());
1304        fs.insert_tree(
1305            "/dir",
1306            json!({
1307                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1308                "a.txt": "a-contents",
1309            }),
1310        )
1311        .await;
1312
1313        let worktree_a = Worktree::open_local(
1314            client_a.clone(),
1315            "/dir".as_ref(),
1316            fs,
1317            lang_registry.clone(),
1318            &mut cx_a.to_async(),
1319        )
1320        .await
1321        .unwrap();
1322        worktree_a
1323            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1324            .await;
1325        let worktree_id = worktree_a
1326            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1327            .await
1328            .unwrap();
1329
1330        // Join that worktree as client B, and see that a guest has joined as client A.
1331        let worktree_b = Worktree::open_remote(
1332            client_b.clone(),
1333            worktree_id,
1334            lang_registry.clone(),
1335            &mut cx_b.to_async(),
1336        )
1337        .await
1338        .unwrap();
1339
1340        let buffer_b = worktree_b
1341            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1342            .await
1343            .unwrap();
1344        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1345
1346        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1347        buffer_b.read_with(&cx_b, |buf, _| {
1348            assert!(buf.is_dirty());
1349            assert!(!buf.has_conflict());
1350        });
1351
1352        buffer_b
1353            .update(&mut cx_b, |buf, cx| buf.save(cx))
1354            .unwrap()
1355            .await
1356            .unwrap();
1357        worktree_b
1358            .condition(&cx_b, |_, cx| {
1359                buffer_b.read(cx).file().unwrap().mtime() != mtime
1360            })
1361            .await;
1362        buffer_b.read_with(&cx_b, |buf, _| {
1363            assert!(!buf.is_dirty());
1364            assert!(!buf.has_conflict());
1365        });
1366
1367        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1368        buffer_b.read_with(&cx_b, |buf, _| {
1369            assert!(buf.is_dirty());
1370            assert!(!buf.has_conflict());
1371        });
1372    }
1373
1374    #[gpui::test]
1375    async fn test_editing_while_guest_opens_buffer(
1376        mut cx_a: TestAppContext,
1377        mut cx_b: TestAppContext,
1378    ) {
1379        cx_a.foreground().forbid_parking();
1380        let lang_registry = Arc::new(LanguageRegistry::new());
1381
1382        // Connect to a server as 2 clients.
1383        let mut server = TestServer::start().await;
1384        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1385        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1386
1387        // Share a local worktree as client A
1388        let fs = Arc::new(FakeFs::new());
1389        fs.insert_tree(
1390            "/dir",
1391            json!({
1392                ".zed.toml": r#"collaborators = ["user_b"]"#,
1393                "a.txt": "a-contents",
1394            }),
1395        )
1396        .await;
1397        let worktree_a = Worktree::open_local(
1398            client_a.clone(),
1399            "/dir".as_ref(),
1400            fs,
1401            lang_registry.clone(),
1402            &mut cx_a.to_async(),
1403        )
1404        .await
1405        .unwrap();
1406        worktree_a
1407            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1408            .await;
1409        let worktree_id = worktree_a
1410            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1411            .await
1412            .unwrap();
1413
1414        // Join that worktree as client B, and see that a guest has joined as client A.
1415        let worktree_b = Worktree::open_remote(
1416            client_b.clone(),
1417            worktree_id,
1418            lang_registry.clone(),
1419            &mut cx_b.to_async(),
1420        )
1421        .await
1422        .unwrap();
1423
1424        let buffer_a = worktree_a
1425            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1426            .await
1427            .unwrap();
1428        let buffer_b = cx_b
1429            .background()
1430            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1431
1432        task::yield_now().await;
1433        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1434
1435        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1436        let buffer_b = buffer_b.await.unwrap();
1437        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1438    }
1439
1440    #[gpui::test]
1441    async fn test_leaving_worktree_while_opening_buffer(
1442        mut cx_a: TestAppContext,
1443        mut cx_b: TestAppContext,
1444    ) {
1445        cx_a.foreground().forbid_parking();
1446        let lang_registry = Arc::new(LanguageRegistry::new());
1447
1448        // Connect to a server as 2 clients.
1449        let mut server = TestServer::start().await;
1450        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1451        let (client_b, _) = server.create_client(&mut cx_b, "user_b").await;
1452
1453        // Share a local worktree as client A
1454        let fs = Arc::new(FakeFs::new());
1455        fs.insert_tree(
1456            "/dir",
1457            json!({
1458                ".zed.toml": r#"collaborators = ["user_b"]"#,
1459                "a.txt": "a-contents",
1460            }),
1461        )
1462        .await;
1463        let worktree_a = Worktree::open_local(
1464            client_a.clone(),
1465            "/dir".as_ref(),
1466            fs,
1467            lang_registry.clone(),
1468            &mut cx_a.to_async(),
1469        )
1470        .await
1471        .unwrap();
1472        worktree_a
1473            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1474            .await;
1475        let worktree_id = worktree_a
1476            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1477            .await
1478            .unwrap();
1479
1480        // Join that worktree as client B, and see that a guest has joined as client A.
1481        let worktree_b = Worktree::open_remote(
1482            client_b.clone(),
1483            worktree_id,
1484            lang_registry.clone(),
1485            &mut cx_b.to_async(),
1486        )
1487        .await
1488        .unwrap();
1489        worktree_a
1490            .condition(&cx_a, |tree, _| tree.peers().len() == 1)
1491            .await;
1492
1493        let buffer_b = cx_b
1494            .background()
1495            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1496        cx_b.update(|_| drop(worktree_b));
1497        drop(buffer_b);
1498        worktree_a
1499            .condition(&cx_a, |tree, _| tree.peers().len() == 0)
1500            .await;
1501    }
1502
1503    #[gpui::test]
1504    async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) {
1505        cx_a.foreground().forbid_parking();
1506        let lang_registry = Arc::new(LanguageRegistry::new());
1507
1508        // Connect to a server as 2 clients.
1509        let mut server = TestServer::start().await;
1510        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1511        let (client_b, _) = server.create_client(&mut cx_a, "user_b").await;
1512
1513        // Share a local worktree as client A
1514        let fs = Arc::new(FakeFs::new());
1515        fs.insert_tree(
1516            "/a",
1517            json!({
1518                ".zed.toml": r#"collaborators = ["user_b"]"#,
1519                "a.txt": "a-contents",
1520                "b.txt": "b-contents",
1521            }),
1522        )
1523        .await;
1524        let worktree_a = Worktree::open_local(
1525            client_a.clone(),
1526            "/a".as_ref(),
1527            fs,
1528            lang_registry.clone(),
1529            &mut cx_a.to_async(),
1530        )
1531        .await
1532        .unwrap();
1533        worktree_a
1534            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1535            .await;
1536        let worktree_id = worktree_a
1537            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1538            .await
1539            .unwrap();
1540
1541        // Join that worktree as client B, and see that a guest has joined as client A.
1542        let _worktree_b = Worktree::open_remote(
1543            client_b.clone(),
1544            worktree_id,
1545            lang_registry.clone(),
1546            &mut cx_b.to_async(),
1547        )
1548        .await
1549        .unwrap();
1550        worktree_a
1551            .condition(&cx_a, |tree, _| tree.peers().len() == 1)
1552            .await;
1553
1554        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1555        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1556        worktree_a
1557            .condition(&cx_a, |tree, _| tree.peers().len() == 0)
1558            .await;
1559    }
1560
1561    #[gpui::test]
1562    async fn test_collaborating_with_diagnostics(
1563        mut cx_a: TestAppContext,
1564        mut cx_b: TestAppContext,
1565    ) {
1566        cx_a.foreground().forbid_parking();
1567        let (language_server_config, mut fake_language_server) =
1568            LanguageServerConfig::fake(cx_a.background()).await;
1569        let mut lang_registry = LanguageRegistry::new();
1570        lang_registry.add(Arc::new(Language::new(
1571            LanguageConfig {
1572                name: "Rust".to_string(),
1573                path_suffixes: vec!["rs".to_string()],
1574                language_server: Some(language_server_config),
1575                ..Default::default()
1576            },
1577            tree_sitter_rust::language(),
1578        )));
1579
1580        let lang_registry = Arc::new(lang_registry);
1581
1582        // Connect to a server as 2 clients.
1583        let mut server = TestServer::start().await;
1584        let (client_a, _) = server.create_client(&mut cx_a, "user_a").await;
1585        let (client_b, _) = server.create_client(&mut cx_a, "user_b").await;
1586
1587        // Share a local worktree as client A
1588        let fs = Arc::new(FakeFs::new());
1589        fs.insert_tree(
1590            "/a",
1591            json!({
1592                ".zed.toml": r#"collaborators = ["user_b"]"#,
1593                "a.rs": "let one = two",
1594                "other.rs": "",
1595            }),
1596        )
1597        .await;
1598        let worktree_a = Worktree::open_local(
1599            client_a.clone(),
1600            "/a".as_ref(),
1601            fs,
1602            lang_registry.clone(),
1603            &mut cx_a.to_async(),
1604        )
1605        .await
1606        .unwrap();
1607        worktree_a
1608            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1609            .await;
1610        let worktree_id = worktree_a
1611            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1612            .await
1613            .unwrap();
1614
1615        // Cause language server to start.
1616        let _ = cx_a
1617            .background()
1618            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1619                worktree.open_buffer("other.rs", cx)
1620            }))
1621            .await
1622            .unwrap();
1623
1624        // Simulate a language server reporting errors for a file.
1625        fake_language_server
1626            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1627                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1628                version: None,
1629                diagnostics: vec![
1630                    lsp::Diagnostic {
1631                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1632                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1633                        message: "message 1".to_string(),
1634                        ..Default::default()
1635                    },
1636                    lsp::Diagnostic {
1637                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1638                        range: lsp::Range::new(
1639                            lsp::Position::new(0, 10),
1640                            lsp::Position::new(0, 13),
1641                        ),
1642                        message: "message 2".to_string(),
1643                        ..Default::default()
1644                    },
1645                ],
1646            })
1647            .await;
1648
1649        // Join the worktree as client B.
1650        let worktree_b = Worktree::open_remote(
1651            client_b.clone(),
1652            worktree_id,
1653            lang_registry.clone(),
1654            &mut cx_b.to_async(),
1655        )
1656        .await
1657        .unwrap();
1658
1659        // Open the file with the errors.
1660        let buffer_b = cx_b
1661            .background()
1662            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1663            .await
1664            .unwrap();
1665
1666        buffer_b.read_with(&cx_b, |buffer, _| {
1667            assert_eq!(
1668                buffer
1669                    .diagnostics_in_range(0..buffer.len())
1670                    .collect::<Vec<_>>(),
1671                &[
1672                    (
1673                        Point::new(0, 4)..Point::new(0, 7),
1674                        &Diagnostic {
1675                            group_id: 0,
1676                            message: "message 1".to_string(),
1677                            severity: lsp::DiagnosticSeverity::ERROR,
1678                            is_primary: true
1679                        }
1680                    ),
1681                    (
1682                        Point::new(0, 10)..Point::new(0, 13),
1683                        &Diagnostic {
1684                            group_id: 1,
1685                            severity: lsp::DiagnosticSeverity::WARNING,
1686                            message: "message 2".to_string(),
1687                            is_primary: true
1688                        }
1689                    )
1690                ]
1691            );
1692        });
1693    }
1694
1695    #[gpui::test]
1696    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1697        cx_a.foreground().forbid_parking();
1698
1699        // Connect to a server as 2 clients.
1700        let mut server = TestServer::start().await;
1701        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1702        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1703
1704        // Create an org that includes these 2 users.
1705        let db = &server.app_state.db;
1706        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1707        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1708            .await
1709            .unwrap();
1710        db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
1711            .await
1712            .unwrap();
1713
1714        // Create a channel that includes all the users.
1715        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1716        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1717            .await
1718            .unwrap();
1719        db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
1720            .await
1721            .unwrap();
1722        db.create_channel_message(
1723            channel_id,
1724            current_user_id(&user_store_b, &cx_b),
1725            "hello A, it's B.",
1726            OffsetDateTime::now_utc(),
1727            1,
1728        )
1729        .await
1730        .unwrap();
1731
1732        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1733        channels_a
1734            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1735            .await;
1736        channels_a.read_with(&cx_a, |list, _| {
1737            assert_eq!(
1738                list.available_channels().unwrap(),
1739                &[ChannelDetails {
1740                    id: channel_id.to_proto(),
1741                    name: "test-channel".to_string()
1742                }]
1743            )
1744        });
1745        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1746            this.get_channel(channel_id.to_proto(), cx).unwrap()
1747        });
1748        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1749        channel_a
1750            .condition(&cx_a, |channel, _| {
1751                channel_messages(channel)
1752                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1753            })
1754            .await;
1755
1756        let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b, client_b, cx));
1757        channels_b
1758            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1759            .await;
1760        channels_b.read_with(&cx_b, |list, _| {
1761            assert_eq!(
1762                list.available_channels().unwrap(),
1763                &[ChannelDetails {
1764                    id: channel_id.to_proto(),
1765                    name: "test-channel".to_string()
1766                }]
1767            )
1768        });
1769
1770        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1771            this.get_channel(channel_id.to_proto(), cx).unwrap()
1772        });
1773        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1774        channel_b
1775            .condition(&cx_b, |channel, _| {
1776                channel_messages(channel)
1777                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1778            })
1779            .await;
1780
1781        channel_a
1782            .update(&mut cx_a, |channel, cx| {
1783                channel
1784                    .send_message("oh, hi B.".to_string(), cx)
1785                    .unwrap()
1786                    .detach();
1787                let task = channel.send_message("sup".to_string(), cx).unwrap();
1788                assert_eq!(
1789                    channel_messages(channel),
1790                    &[
1791                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1792                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1793                        ("user_a".to_string(), "sup".to_string(), true)
1794                    ]
1795                );
1796                task
1797            })
1798            .await
1799            .unwrap();
1800
1801        channel_b
1802            .condition(&cx_b, |channel, _| {
1803                channel_messages(channel)
1804                    == [
1805                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1806                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1807                        ("user_a".to_string(), "sup".to_string(), false),
1808                    ]
1809            })
1810            .await;
1811
1812        assert_eq!(
1813            server
1814                .state()
1815                .await
1816                .channel(channel_id)
1817                .unwrap()
1818                .connection_ids
1819                .len(),
1820            2
1821        );
1822        cx_b.update(|_| drop(channel_b));
1823        server
1824            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1825            .await;
1826
1827        cx_a.update(|_| drop(channel_a));
1828        server
1829            .condition(|state| state.channel(channel_id).is_none())
1830            .await;
1831    }
1832
1833    #[gpui::test]
1834    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1835        cx_a.foreground().forbid_parking();
1836
1837        let mut server = TestServer::start().await;
1838        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1839
1840        let db = &server.app_state.db;
1841        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1842        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1843        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1844            .await
1845            .unwrap();
1846        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1847            .await
1848            .unwrap();
1849
1850        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1851        channels_a
1852            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1853            .await;
1854        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1855            this.get_channel(channel_id.to_proto(), cx).unwrap()
1856        });
1857
1858        // Messages aren't allowed to be too long.
1859        channel_a
1860            .update(&mut cx_a, |channel, cx| {
1861                let long_body = "this is long.\n".repeat(1024);
1862                channel.send_message(long_body, cx).unwrap()
1863            })
1864            .await
1865            .unwrap_err();
1866
1867        // Messages aren't allowed to be blank.
1868        channel_a.update(&mut cx_a, |channel, cx| {
1869            channel.send_message(String::new(), cx).unwrap_err()
1870        });
1871
1872        // Leading and trailing whitespace are trimmed.
1873        channel_a
1874            .update(&mut cx_a, |channel, cx| {
1875                channel
1876                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
1877                    .unwrap()
1878            })
1879            .await
1880            .unwrap();
1881        assert_eq!(
1882            db.get_channel_messages(channel_id, 10, None)
1883                .await
1884                .unwrap()
1885                .iter()
1886                .map(|m| &m.body)
1887                .collect::<Vec<_>>(),
1888            &["surrounded by whitespace"]
1889        );
1890    }
1891
1892    #[gpui::test]
1893    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1894        cx_a.foreground().forbid_parking();
1895
1896        // Connect to a server as 2 clients.
1897        let mut server = TestServer::start().await;
1898        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
1899        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
1900        let mut status_b = client_b.status();
1901
1902        // Create an org that includes these 2 users.
1903        let db = &server.app_state.db;
1904        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1905        db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
1906            .await
1907            .unwrap();
1908        db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
1909            .await
1910            .unwrap();
1911
1912        // Create a channel that includes all the users.
1913        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1914        db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
1915            .await
1916            .unwrap();
1917        db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
1918            .await
1919            .unwrap();
1920        db.create_channel_message(
1921            channel_id,
1922            current_user_id(&user_store_b, &cx_b),
1923            "hello A, it's B.",
1924            OffsetDateTime::now_utc(),
1925            2,
1926        )
1927        .await
1928        .unwrap();
1929
1930        let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
1931        channels_a
1932            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1933            .await;
1934
1935        channels_a.read_with(&cx_a, |list, _| {
1936            assert_eq!(
1937                list.available_channels().unwrap(),
1938                &[ChannelDetails {
1939                    id: channel_id.to_proto(),
1940                    name: "test-channel".to_string()
1941                }]
1942            )
1943        });
1944        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1945            this.get_channel(channel_id.to_proto(), cx).unwrap()
1946        });
1947        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1948        channel_a
1949            .condition(&cx_a, |channel, _| {
1950                channel_messages(channel)
1951                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1952            })
1953            .await;
1954
1955        let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b.clone(), client_b, cx));
1956        channels_b
1957            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1958            .await;
1959        channels_b.read_with(&cx_b, |list, _| {
1960            assert_eq!(
1961                list.available_channels().unwrap(),
1962                &[ChannelDetails {
1963                    id: channel_id.to_proto(),
1964                    name: "test-channel".to_string()
1965                }]
1966            )
1967        });
1968
1969        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1970            this.get_channel(channel_id.to_proto(), cx).unwrap()
1971        });
1972        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1973        channel_b
1974            .condition(&cx_b, |channel, _| {
1975                channel_messages(channel)
1976                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1977            })
1978            .await;
1979
1980        // Disconnect client B, ensuring we can still access its cached channel data.
1981        server.forbid_connections();
1982        server.disconnect_client(current_user_id(&user_store_b, &cx_b));
1983        while !matches!(
1984            status_b.recv().await,
1985            Some(client::Status::ReconnectionError { .. })
1986        ) {}
1987
1988        channels_b.read_with(&cx_b, |channels, _| {
1989            assert_eq!(
1990                channels.available_channels().unwrap(),
1991                [ChannelDetails {
1992                    id: channel_id.to_proto(),
1993                    name: "test-channel".to_string()
1994                }]
1995            )
1996        });
1997        channel_b.read_with(&cx_b, |channel, _| {
1998            assert_eq!(
1999                channel_messages(channel),
2000                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2001            )
2002        });
2003
2004        // Send a message from client B while it is disconnected.
2005        channel_b
2006            .update(&mut cx_b, |channel, cx| {
2007                let task = channel
2008                    .send_message("can you see this?".to_string(), cx)
2009                    .unwrap();
2010                assert_eq!(
2011                    channel_messages(channel),
2012                    &[
2013                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2014                        ("user_b".to_string(), "can you see this?".to_string(), true)
2015                    ]
2016                );
2017                task
2018            })
2019            .await
2020            .unwrap_err();
2021
2022        // Send a message from client A while B is disconnected.
2023        channel_a
2024            .update(&mut cx_a, |channel, cx| {
2025                channel
2026                    .send_message("oh, hi B.".to_string(), cx)
2027                    .unwrap()
2028                    .detach();
2029                let task = channel.send_message("sup".to_string(), cx).unwrap();
2030                assert_eq!(
2031                    channel_messages(channel),
2032                    &[
2033                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2034                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2035                        ("user_a".to_string(), "sup".to_string(), true)
2036                    ]
2037                );
2038                task
2039            })
2040            .await
2041            .unwrap();
2042
2043        // Give client B a chance to reconnect.
2044        server.allow_connections();
2045        cx_b.foreground().advance_clock(Duration::from_secs(10));
2046
2047        // Verify that B sees the new messages upon reconnection, as well as the message client B
2048        // sent while offline.
2049        channel_b
2050            .condition(&cx_b, |channel, _| {
2051                channel_messages(channel)
2052                    == [
2053                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2054                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2055                        ("user_a".to_string(), "sup".to_string(), false),
2056                        ("user_b".to_string(), "can you see this?".to_string(), false),
2057                    ]
2058            })
2059            .await;
2060
2061        // Ensure client A and B can communicate normally after reconnection.
2062        channel_a
2063            .update(&mut cx_a, |channel, cx| {
2064                channel.send_message("you online?".to_string(), cx).unwrap()
2065            })
2066            .await
2067            .unwrap();
2068        channel_b
2069            .condition(&cx_b, |channel, _| {
2070                channel_messages(channel)
2071                    == [
2072                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2073                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2074                        ("user_a".to_string(), "sup".to_string(), false),
2075                        ("user_b".to_string(), "can you see this?".to_string(), false),
2076                        ("user_a".to_string(), "you online?".to_string(), false),
2077                    ]
2078            })
2079            .await;
2080
2081        channel_b
2082            .update(&mut cx_b, |channel, cx| {
2083                channel.send_message("yep".to_string(), cx).unwrap()
2084            })
2085            .await
2086            .unwrap();
2087        channel_a
2088            .condition(&cx_a, |channel, _| {
2089                channel_messages(channel)
2090                    == [
2091                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2092                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2093                        ("user_a".to_string(), "sup".to_string(), false),
2094                        ("user_b".to_string(), "can you see this?".to_string(), false),
2095                        ("user_a".to_string(), "you online?".to_string(), false),
2096                        ("user_b".to_string(), "yep".to_string(), false),
2097                    ]
2098            })
2099            .await;
2100    }
2101
2102    #[gpui::test]
2103    async fn test_collaborators(
2104        mut cx_a: TestAppContext,
2105        mut cx_b: TestAppContext,
2106        mut cx_c: TestAppContext,
2107    ) {
2108        cx_a.foreground().forbid_parking();
2109        let lang_registry = Arc::new(LanguageRegistry::new());
2110
2111        // Connect to a server as 3 clients.
2112        let mut server = TestServer::start().await;
2113        let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
2114        let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
2115        let (_client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await;
2116
2117        let fs = Arc::new(FakeFs::new());
2118
2119        // Share a worktree as client A.
2120        fs.insert_tree(
2121            "/a",
2122            json!({
2123                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2124            }),
2125        )
2126        .await;
2127
2128        let worktree_a = Worktree::open_local(
2129            client_a.clone(),
2130            "/a".as_ref(),
2131            fs.clone(),
2132            lang_registry.clone(),
2133            &mut cx_a.to_async(),
2134        )
2135        .await
2136        .unwrap();
2137
2138        user_store_a
2139            .condition(&cx_a, |user_store, _| {
2140                collaborators(user_store) == vec![("user_a", vec![("a", vec![])])]
2141            })
2142            .await;
2143        user_store_b
2144            .condition(&cx_b, |user_store, _| {
2145                collaborators(user_store) == vec![("user_a", vec![("a", vec![])])]
2146            })
2147            .await;
2148        user_store_c
2149            .condition(&cx_c, |user_store, _| {
2150                collaborators(user_store) == vec![("user_a", vec![("a", vec![])])]
2151            })
2152            .await;
2153
2154        let worktree_id = worktree_a
2155            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2156            .await
2157            .unwrap();
2158
2159        let _worktree_b = Worktree::open_remote(
2160            client_b.clone(),
2161            worktree_id,
2162            lang_registry.clone(),
2163            &mut cx_b.to_async(),
2164        )
2165        .await
2166        .unwrap();
2167
2168        user_store_a
2169            .condition(&cx_a, |user_store, _| {
2170                collaborators(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2171            })
2172            .await;
2173        user_store_b
2174            .condition(&cx_b, |user_store, _| {
2175                collaborators(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2176            })
2177            .await;
2178        user_store_c
2179            .condition(&cx_c, |user_store, _| {
2180                collaborators(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2181            })
2182            .await;
2183
2184        cx_a.update(move |_| drop(worktree_a));
2185        user_store_a
2186            .condition(&cx_a, |user_store, _| collaborators(user_store) == vec![])
2187            .await;
2188        user_store_b
2189            .condition(&cx_b, |user_store, _| collaborators(user_store) == vec![])
2190            .await;
2191        user_store_c
2192            .condition(&cx_c, |user_store, _| collaborators(user_store) == vec![])
2193            .await;
2194
2195        fn collaborators(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2196            user_store
2197                .collaborators()
2198                .iter()
2199                .map(|collaborator| {
2200                    let worktrees = collaborator
2201                        .worktrees
2202                        .iter()
2203                        .map(|w| {
2204                            (
2205                                w.root_name.as_str(),
2206                                w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2207                            )
2208                        })
2209                        .collect();
2210                    (collaborator.user.github_login.as_str(), worktrees)
2211                })
2212                .collect()
2213        }
2214    }
2215
2216    struct TestServer {
2217        peer: Arc<Peer>,
2218        app_state: Arc<AppState>,
2219        server: Arc<Server>,
2220        notifications: mpsc::Receiver<()>,
2221        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2222        forbid_connections: Arc<AtomicBool>,
2223        _test_db: TestDb,
2224    }
2225
2226    impl TestServer {
2227        async fn start() -> Self {
2228            let test_db = TestDb::new();
2229            let app_state = Self::build_app_state(&test_db).await;
2230            let peer = Peer::new();
2231            let notifications = mpsc::channel(128);
2232            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2233            Self {
2234                peer,
2235                app_state,
2236                server,
2237                notifications: notifications.1,
2238                connection_killers: Default::default(),
2239                forbid_connections: Default::default(),
2240                _test_db: test_db,
2241            }
2242        }
2243
2244        async fn create_client(
2245            &mut self,
2246            cx: &mut TestAppContext,
2247            name: &str,
2248        ) -> (Arc<Client>, ModelHandle<UserStore>) {
2249            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2250            let client_name = name.to_string();
2251            let mut client = Client::new();
2252            let server = self.server.clone();
2253            let connection_killers = self.connection_killers.clone();
2254            let forbid_connections = self.forbid_connections.clone();
2255            Arc::get_mut(&mut client)
2256                .unwrap()
2257                .override_authenticate(move |cx| {
2258                    cx.spawn(|_| async move {
2259                        let access_token = "the-token".to_string();
2260                        Ok(Credentials {
2261                            user_id: user_id.0 as u64,
2262                            access_token,
2263                        })
2264                    })
2265                })
2266                .override_establish_connection(move |credentials, cx| {
2267                    assert_eq!(credentials.user_id, user_id.0 as u64);
2268                    assert_eq!(credentials.access_token, "the-token");
2269
2270                    let server = server.clone();
2271                    let connection_killers = connection_killers.clone();
2272                    let forbid_connections = forbid_connections.clone();
2273                    let client_name = client_name.clone();
2274                    cx.spawn(move |cx| async move {
2275                        if forbid_connections.load(SeqCst) {
2276                            Err(EstablishConnectionError::other(anyhow!(
2277                                "server is forbidding connections"
2278                            )))
2279                        } else {
2280                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2281                            connection_killers.lock().insert(user_id, kill_conn);
2282                            cx.background()
2283                                .spawn(server.handle_connection(server_conn, client_name, user_id))
2284                                .detach();
2285                            Ok(client_conn)
2286                        }
2287                    })
2288                });
2289
2290            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2291            client
2292                .authenticate_and_connect(&cx.to_async())
2293                .await
2294                .unwrap();
2295
2296            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2297            let mut authed_user =
2298                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2299            while authed_user.recv().await.unwrap().is_none() {}
2300
2301            (client, user_store)
2302        }
2303
2304        fn disconnect_client(&self, user_id: UserId) {
2305            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2306                let _ = kill_conn.try_send(Some(()));
2307            }
2308        }
2309
2310        fn forbid_connections(&self) {
2311            self.forbid_connections.store(true, SeqCst);
2312        }
2313
2314        fn allow_connections(&self) {
2315            self.forbid_connections.store(false, SeqCst);
2316        }
2317
2318        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2319            let mut config = Config::default();
2320            config.session_secret = "a".repeat(32);
2321            config.database_url = test_db.url.clone();
2322            let github_client = github::AppClient::test();
2323            Arc::new(AppState {
2324                db: test_db.db().clone(),
2325                handlebars: Default::default(),
2326                auth_client: auth::build_client("", ""),
2327                repo_client: github::RepoClient::test(&github_client),
2328                github_client,
2329                config,
2330            })
2331        }
2332
2333        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2334            self.server.store.read()
2335        }
2336
2337        async fn condition<F>(&mut self, mut predicate: F)
2338        where
2339            F: FnMut(&Store) -> bool,
2340        {
2341            async_std::future::timeout(Duration::from_millis(500), async {
2342                while !(predicate)(&*self.server.store.read()) {
2343                    self.notifications.recv().await;
2344                }
2345            })
2346            .await
2347            .expect("condition timed out");
2348        }
2349    }
2350
2351    impl Drop for TestServer {
2352        fn drop(&mut self) {
2353            task::block_on(self.peer.reset());
2354        }
2355    }
2356
2357    fn current_user_id(user_store: &ModelHandle<UserStore>, cx: &TestAppContext) -> UserId {
2358        UserId::from_proto(
2359            user_store.read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2360        )
2361    }
2362
2363    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2364        channel
2365            .messages()
2366            .cursor::<()>()
2367            .map(|m| {
2368                (
2369                    m.sender.github_login.clone(),
2370                    m.body.clone(),
2371                    m.is_pending(),
2372                )
2373            })
2374            .collect()
2375    }
2376
2377    struct EmptyView;
2378
2379    impl gpui::Entity for EmptyView {
2380        type Event = ();
2381    }
2382
2383    impl gpui::View for EmptyView {
2384        fn ui_name() -> &'static str {
2385            "empty view"
2386        }
2387
2388        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2389            gpui::Element::boxed(gpui::elements::Empty)
2390        }
2391    }
2392}