rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46
  47impl Server {
  48    pub fn new(
  49        app_state: Arc<AppState>,
  50        peer: Arc<Peer>,
  51        notifications: Option<mpsc::Sender<()>>,
  52    ) -> Arc<Self> {
  53        let mut server = Self {
  54            peer,
  55            app_state,
  56            store: Default::default(),
  57            handlers: Default::default(),
  58            notifications,
  59        };
  60
  61        server
  62            .add_handler(Server::ping)
  63            .add_handler(Server::open_worktree)
  64            .add_handler(Server::close_worktree)
  65            .add_handler(Server::share_worktree)
  66            .add_handler(Server::unshare_worktree)
  67            .add_handler(Server::join_worktree)
  68            .add_handler(Server::leave_worktree)
  69            .add_handler(Server::update_worktree)
  70            .add_handler(Server::open_buffer)
  71            .add_handler(Server::close_buffer)
  72            .add_handler(Server::update_buffer)
  73            .add_handler(Server::buffer_saved)
  74            .add_handler(Server::save_buffer)
  75            .add_handler(Server::get_channels)
  76            .add_handler(Server::get_users)
  77            .add_handler(Server::join_channel)
  78            .add_handler(Server::leave_channel)
  79            .add_handler(Server::send_channel_message)
  80            .add_handler(Server::get_channel_messages);
  81
  82        Arc::new(server)
  83    }
  84
  85    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  86    where
  87        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  88        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  89        M: EnvelopedMessage,
  90    {
  91        let prev_handler = self.handlers.insert(
  92            TypeId::of::<M>(),
  93            Box::new(move |server, envelope| {
  94                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
  95                (handler)(server, *envelope).boxed()
  96            }),
  97        );
  98        if prev_handler.is_some() {
  99            panic!("registered a handler for the same message twice");
 100        }
 101        self
 102    }
 103
 104    pub fn handle_connection(
 105        self: &Arc<Self>,
 106        connection: Connection,
 107        addr: String,
 108        user_id: UserId,
 109        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 110    ) -> impl Future<Output = ()> {
 111        let mut this = self.clone();
 112        async move {
 113            let (connection_id, handle_io, mut incoming_rx) =
 114                this.peer.add_connection(connection).await;
 115
 116            if let Some(send_connection_id) = send_connection_id.as_mut() {
 117                let _ = send_connection_id.send(connection_id).await;
 118            }
 119
 120            this.state_mut().add_connection(connection_id, user_id);
 121            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 122                log::error!("error updating contacts for {:?}: {}", user_id, err);
 123            }
 124
 125            let handle_io = handle_io.fuse();
 126            futures::pin_mut!(handle_io);
 127            loop {
 128                let next_message = incoming_rx.recv().fuse();
 129                futures::pin_mut!(next_message);
 130                futures::select_biased! {
 131                    message = next_message => {
 132                        if let Some(message) = message {
 133                            let start_time = Instant::now();
 134                            log::info!("RPC message received: {}", message.payload_type_name());
 135                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 136                                if let Err(err) = (handler)(this.clone(), message).await {
 137                                    log::error!("error handling message: {:?}", err);
 138                                } else {
 139                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 140                                }
 141
 142                                if let Some(mut notifications) = this.notifications.clone() {
 143                                    let _ = notifications.send(()).await;
 144                                }
 145                            } else {
 146                                log::warn!("unhandled message: {}", message.payload_type_name());
 147                            }
 148                        } else {
 149                            log::info!("rpc connection closed {:?}", addr);
 150                            break;
 151                        }
 152                    }
 153                    handle_io = handle_io => {
 154                        if let Err(err) = handle_io {
 155                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 156                        }
 157                        break;
 158                    }
 159                }
 160            }
 161
 162            if let Err(err) = this.sign_out(connection_id).await {
 163                log::error!("error signing out connection {:?} - {:?}", addr, err);
 164            }
 165        }
 166    }
 167
 168    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 169        self.peer.disconnect(connection_id).await;
 170        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 171
 172        for (worktree_id, worktree) in removed_connection.hosted_worktrees {
 173            if let Some(share) = worktree.share {
 174                broadcast(
 175                    connection_id,
 176                    share.guests.keys().copied().collect(),
 177                    |conn_id| {
 178                        self.peer
 179                            .send(conn_id, proto::UnshareWorktree { worktree_id })
 180                    },
 181                )
 182                .await?;
 183            }
 184        }
 185
 186        for (worktree_id, peer_ids) in removed_connection.guest_worktree_ids {
 187            broadcast(connection_id, peer_ids, |conn_id| {
 188                self.peer.send(
 189                    conn_id,
 190                    proto::RemoveCollaborator {
 191                        worktree_id,
 192                        peer_id: connection_id.0,
 193                    },
 194                )
 195            })
 196            .await?;
 197        }
 198
 199        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 200            .await?;
 201
 202        Ok(())
 203    }
 204
 205    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 206        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 207        Ok(())
 208    }
 209
 210    async fn open_worktree(
 211        mut self: Arc<Server>,
 212        request: TypedEnvelope<proto::OpenWorktree>,
 213    ) -> tide::Result<()> {
 214        let receipt = request.receipt();
 215        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 216
 217        let mut contact_user_ids = HashSet::default();
 218        contact_user_ids.insert(host_user_id);
 219        for github_login in request.payload.authorized_logins {
 220            match self.app_state.db.create_user(&github_login, false).await {
 221                Ok(contact_user_id) => {
 222                    contact_user_ids.insert(contact_user_id);
 223                }
 224                Err(err) => {
 225                    let message = err.to_string();
 226                    self.peer
 227                        .respond_with_error(receipt, proto::Error { message })
 228                        .await?;
 229                    return Ok(());
 230                }
 231            }
 232        }
 233
 234        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 235        let worktree_id = self.state_mut().add_worktree(Worktree {
 236            host_connection_id: request.sender_id,
 237            host_user_id,
 238            authorized_user_ids: contact_user_ids.clone(),
 239            root_name: request.payload.root_name,
 240            share: None,
 241        });
 242
 243        self.peer
 244            .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
 245            .await?;
 246        self.update_contacts_for_users(&contact_user_ids).await?;
 247
 248        Ok(())
 249    }
 250
 251    async fn close_worktree(
 252        mut self: Arc<Server>,
 253        request: TypedEnvelope<proto::CloseWorktree>,
 254    ) -> tide::Result<()> {
 255        let worktree_id = request.payload.worktree_id;
 256        let worktree = self
 257            .state_mut()
 258            .remove_worktree(worktree_id, request.sender_id)?;
 259
 260        if let Some(share) = worktree.share {
 261            broadcast(
 262                request.sender_id,
 263                share.guests.keys().copied().collect(),
 264                |conn_id| {
 265                    self.peer
 266                        .send(conn_id, proto::UnshareWorktree { worktree_id })
 267                },
 268            )
 269            .await?;
 270        }
 271        self.update_contacts_for_users(&worktree.authorized_user_ids)
 272            .await?;
 273        Ok(())
 274    }
 275
 276    async fn share_worktree(
 277        mut self: Arc<Server>,
 278        mut request: TypedEnvelope<proto::ShareWorktree>,
 279    ) -> tide::Result<()> {
 280        let worktree = request
 281            .payload
 282            .worktree
 283            .as_mut()
 284            .ok_or_else(|| anyhow!("missing worktree"))?;
 285        let entries = mem::take(&mut worktree.entries)
 286            .into_iter()
 287            .map(|entry| (entry.id, entry))
 288            .collect();
 289
 290        let contact_user_ids =
 291            self.state_mut()
 292                .share_worktree(worktree.id, request.sender_id, entries);
 293        if let Some(contact_user_ids) = contact_user_ids {
 294            self.peer
 295                .respond(request.receipt(), proto::ShareWorktreeResponse {})
 296                .await?;
 297            self.update_contacts_for_users(&contact_user_ids).await?;
 298        } else {
 299            self.peer
 300                .respond_with_error(
 301                    request.receipt(),
 302                    proto::Error {
 303                        message: "no such worktree".to_string(),
 304                    },
 305                )
 306                .await?;
 307        }
 308        Ok(())
 309    }
 310
 311    async fn unshare_worktree(
 312        mut self: Arc<Server>,
 313        request: TypedEnvelope<proto::UnshareWorktree>,
 314    ) -> tide::Result<()> {
 315        let worktree_id = request.payload.worktree_id;
 316        let worktree = self
 317            .state_mut()
 318            .unshare_worktree(worktree_id, request.sender_id)?;
 319
 320        broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
 321            self.peer
 322                .send(conn_id, proto::UnshareWorktree { worktree_id })
 323        })
 324        .await?;
 325        self.update_contacts_for_users(&worktree.authorized_user_ids)
 326            .await?;
 327
 328        Ok(())
 329    }
 330
 331    async fn join_worktree(
 332        mut self: Arc<Server>,
 333        request: TypedEnvelope<proto::JoinWorktree>,
 334    ) -> tide::Result<()> {
 335        let worktree_id = request.payload.worktree_id;
 336
 337        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 338        let response_data = self
 339            .state_mut()
 340            .join_worktree(request.sender_id, user_id, worktree_id)
 341            .and_then(|joined| {
 342                let share = joined.worktree.share()?;
 343                let peer_count = share.guests.len();
 344                let mut collaborators = Vec::with_capacity(peer_count);
 345                collaborators.push(proto::Collaborator {
 346                    peer_id: joined.worktree.host_connection_id.0,
 347                    replica_id: 0,
 348                    user_id: joined.worktree.host_user_id.to_proto(),
 349                });
 350                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 351                    if *peer_conn_id != request.sender_id {
 352                        collaborators.push(proto::Collaborator {
 353                            peer_id: peer_conn_id.0,
 354                            replica_id: *peer_replica_id as u32,
 355                            user_id: peer_user_id.to_proto(),
 356                        });
 357                    }
 358                }
 359                let response = proto::JoinWorktreeResponse {
 360                    worktree: Some(proto::Worktree {
 361                        id: worktree_id,
 362                        root_name: joined.worktree.root_name.clone(),
 363                        entries: share.entries.values().cloned().collect(),
 364                    }),
 365                    replica_id: joined.replica_id as u32,
 366                    collaborators,
 367                };
 368                let connection_ids = joined.worktree.connection_ids();
 369                let contact_user_ids = joined.worktree.authorized_user_ids.clone();
 370                Ok((response, connection_ids, contact_user_ids))
 371            });
 372
 373        match response_data {
 374            Ok((response, connection_ids, contact_user_ids)) => {
 375                broadcast(request.sender_id, connection_ids, |conn_id| {
 376                    self.peer.send(
 377                        conn_id,
 378                        proto::AddCollaborator {
 379                            worktree_id,
 380                            collaborator: Some(proto::Collaborator {
 381                                peer_id: request.sender_id.0,
 382                                replica_id: response.replica_id,
 383                                user_id: user_id.to_proto(),
 384                            }),
 385                        },
 386                    )
 387                })
 388                .await?;
 389                self.peer.respond(request.receipt(), response).await?;
 390                self.update_contacts_for_users(&contact_user_ids).await?;
 391            }
 392            Err(error) => {
 393                self.peer
 394                    .respond_with_error(
 395                        request.receipt(),
 396                        proto::Error {
 397                            message: error.to_string(),
 398                        },
 399                    )
 400                    .await?;
 401            }
 402        }
 403
 404        Ok(())
 405    }
 406
 407    async fn leave_worktree(
 408        mut self: Arc<Server>,
 409        request: TypedEnvelope<proto::LeaveWorktree>,
 410    ) -> tide::Result<()> {
 411        let sender_id = request.sender_id;
 412        let worktree_id = request.payload.worktree_id;
 413        let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
 414        if let Some(worktree) = worktree {
 415            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 416                self.peer.send(
 417                    conn_id,
 418                    proto::RemoveCollaborator {
 419                        worktree_id,
 420                        peer_id: sender_id.0,
 421                    },
 422                )
 423            })
 424            .await?;
 425            self.update_contacts_for_users(&worktree.authorized_user_ids)
 426                .await?;
 427        }
 428        Ok(())
 429    }
 430
 431    async fn update_worktree(
 432        mut self: Arc<Server>,
 433        request: TypedEnvelope<proto::UpdateWorktree>,
 434    ) -> tide::Result<()> {
 435        let connection_ids = self.state_mut().update_worktree(
 436            request.sender_id,
 437            request.payload.worktree_id,
 438            &request.payload.removed_entries,
 439            &request.payload.updated_entries,
 440        )?;
 441
 442        broadcast(request.sender_id, connection_ids, |connection_id| {
 443            self.peer
 444                .forward_send(request.sender_id, connection_id, request.payload.clone())
 445        })
 446        .await?;
 447
 448        Ok(())
 449    }
 450
 451    async fn open_buffer(
 452        self: Arc<Server>,
 453        request: TypedEnvelope<proto::OpenBuffer>,
 454    ) -> tide::Result<()> {
 455        let receipt = request.receipt();
 456        let host_connection_id = self
 457            .state()
 458            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 459        let response = self
 460            .peer
 461            .forward_request(request.sender_id, host_connection_id, request.payload)
 462            .await?;
 463        self.peer.respond(receipt, response).await?;
 464        Ok(())
 465    }
 466
 467    async fn close_buffer(
 468        self: Arc<Server>,
 469        request: TypedEnvelope<proto::CloseBuffer>,
 470    ) -> tide::Result<()> {
 471        let host_connection_id = self
 472            .state()
 473            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 474        self.peer
 475            .forward_send(request.sender_id, host_connection_id, request.payload)
 476            .await?;
 477        Ok(())
 478    }
 479
 480    async fn save_buffer(
 481        self: Arc<Server>,
 482        request: TypedEnvelope<proto::SaveBuffer>,
 483    ) -> tide::Result<()> {
 484        let host;
 485        let guests;
 486        {
 487            let state = self.state();
 488            host = state
 489                .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 490            guests = state
 491                .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
 492        }
 493
 494        let sender = request.sender_id;
 495        let receipt = request.receipt();
 496        let response = self
 497            .peer
 498            .forward_request(sender, host, request.payload.clone())
 499            .await?;
 500
 501        broadcast(host, guests, |conn_id| {
 502            let response = response.clone();
 503            let peer = &self.peer;
 504            async move {
 505                if conn_id == sender {
 506                    peer.respond(receipt, response).await
 507                } else {
 508                    peer.forward_send(host, conn_id, response).await
 509                }
 510            }
 511        })
 512        .await?;
 513
 514        Ok(())
 515    }
 516
 517    async fn update_buffer(
 518        self: Arc<Server>,
 519        request: TypedEnvelope<proto::UpdateBuffer>,
 520    ) -> tide::Result<()> {
 521        let receiver_ids = self
 522            .state()
 523            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 524        broadcast(request.sender_id, receiver_ids, |connection_id| {
 525            self.peer
 526                .forward_send(request.sender_id, connection_id, request.payload.clone())
 527        })
 528        .await?;
 529        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 530        Ok(())
 531    }
 532
 533    async fn buffer_saved(
 534        self: Arc<Server>,
 535        request: TypedEnvelope<proto::BufferSaved>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = self
 538            .state()
 539            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 540        broadcast(request.sender_id, receiver_ids, |connection_id| {
 541            self.peer
 542                .forward_send(request.sender_id, connection_id, request.payload.clone())
 543        })
 544        .await?;
 545        Ok(())
 546    }
 547
 548    async fn get_channels(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<proto::GetChannels>,
 551    ) -> tide::Result<()> {
 552        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 553        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 554        self.peer
 555            .respond(
 556                request.receipt(),
 557                proto::GetChannelsResponse {
 558                    channels: channels
 559                        .into_iter()
 560                        .map(|chan| proto::Channel {
 561                            id: chan.id.to_proto(),
 562                            name: chan.name,
 563                        })
 564                        .collect(),
 565                },
 566            )
 567            .await?;
 568        Ok(())
 569    }
 570
 571    async fn get_users(
 572        self: Arc<Server>,
 573        request: TypedEnvelope<proto::GetUsers>,
 574    ) -> tide::Result<()> {
 575        let receipt = request.receipt();
 576        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 577        let users = self
 578            .app_state
 579            .db
 580            .get_users_by_ids(user_ids)
 581            .await?
 582            .into_iter()
 583            .map(|user| proto::User {
 584                id: user.id.to_proto(),
 585                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 586                github_login: user.github_login,
 587            })
 588            .collect();
 589        self.peer
 590            .respond(receipt, proto::GetUsersResponse { users })
 591            .await?;
 592        Ok(())
 593    }
 594
 595    async fn update_contacts_for_users<'a>(
 596        self: &Arc<Server>,
 597        user_ids: impl IntoIterator<Item = &'a UserId>,
 598    ) -> tide::Result<()> {
 599        let mut send_futures = Vec::new();
 600
 601        {
 602            let state = self.state();
 603            for user_id in user_ids {
 604                let contacts = state.contacts_for_user(*user_id);
 605                for connection_id in state.connection_ids_for_user(*user_id) {
 606                    send_futures.push(self.peer.send(
 607                        connection_id,
 608                        proto::UpdateContacts {
 609                            contacts: contacts.clone(),
 610                        },
 611                    ));
 612                }
 613            }
 614        }
 615        futures::future::try_join_all(send_futures).await?;
 616
 617        Ok(())
 618    }
 619
 620    async fn join_channel(
 621        mut self: Arc<Self>,
 622        request: TypedEnvelope<proto::JoinChannel>,
 623    ) -> tide::Result<()> {
 624        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 625        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 626        if !self
 627            .app_state
 628            .db
 629            .can_user_access_channel(user_id, channel_id)
 630            .await?
 631        {
 632            Err(anyhow!("access denied"))?;
 633        }
 634
 635        self.state_mut().join_channel(request.sender_id, channel_id);
 636        let messages = self
 637            .app_state
 638            .db
 639            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 640            .await?
 641            .into_iter()
 642            .map(|msg| proto::ChannelMessage {
 643                id: msg.id.to_proto(),
 644                body: msg.body,
 645                timestamp: msg.sent_at.unix_timestamp() as u64,
 646                sender_id: msg.sender_id.to_proto(),
 647                nonce: Some(msg.nonce.as_u128().into()),
 648            })
 649            .collect::<Vec<_>>();
 650        self.peer
 651            .respond(
 652                request.receipt(),
 653                proto::JoinChannelResponse {
 654                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 655                    messages,
 656                },
 657            )
 658            .await?;
 659        Ok(())
 660    }
 661
 662    async fn leave_channel(
 663        mut self: Arc<Self>,
 664        request: TypedEnvelope<proto::LeaveChannel>,
 665    ) -> tide::Result<()> {
 666        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 667        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 668        if !self
 669            .app_state
 670            .db
 671            .can_user_access_channel(user_id, channel_id)
 672            .await?
 673        {
 674            Err(anyhow!("access denied"))?;
 675        }
 676
 677        self.state_mut()
 678            .leave_channel(request.sender_id, channel_id);
 679
 680        Ok(())
 681    }
 682
 683    async fn send_channel_message(
 684        self: Arc<Self>,
 685        request: TypedEnvelope<proto::SendChannelMessage>,
 686    ) -> tide::Result<()> {
 687        let receipt = request.receipt();
 688        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 689        let user_id;
 690        let connection_ids;
 691        {
 692            let state = self.state();
 693            user_id = state.user_id_for_connection(request.sender_id)?;
 694            if let Some(ids) = state.channel_connection_ids(channel_id) {
 695                connection_ids = ids;
 696            } else {
 697                return Ok(());
 698            }
 699        }
 700
 701        // Validate the message body.
 702        let body = request.payload.body.trim().to_string();
 703        if body.len() > MAX_MESSAGE_LEN {
 704            self.peer
 705                .respond_with_error(
 706                    receipt,
 707                    proto::Error {
 708                        message: "message is too long".to_string(),
 709                    },
 710                )
 711                .await?;
 712            return Ok(());
 713        }
 714        if body.is_empty() {
 715            self.peer
 716                .respond_with_error(
 717                    receipt,
 718                    proto::Error {
 719                        message: "message can't be blank".to_string(),
 720                    },
 721                )
 722                .await?;
 723            return Ok(());
 724        }
 725
 726        let timestamp = OffsetDateTime::now_utc();
 727        let nonce = if let Some(nonce) = request.payload.nonce {
 728            nonce
 729        } else {
 730            self.peer
 731                .respond_with_error(
 732                    receipt,
 733                    proto::Error {
 734                        message: "nonce can't be blank".to_string(),
 735                    },
 736                )
 737                .await?;
 738            return Ok(());
 739        };
 740
 741        let message_id = self
 742            .app_state
 743            .db
 744            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 745            .await?
 746            .to_proto();
 747        let message = proto::ChannelMessage {
 748            sender_id: user_id.to_proto(),
 749            id: message_id,
 750            body,
 751            timestamp: timestamp.unix_timestamp() as u64,
 752            nonce: Some(nonce),
 753        };
 754        broadcast(request.sender_id, connection_ids, |conn_id| {
 755            self.peer.send(
 756                conn_id,
 757                proto::ChannelMessageSent {
 758                    channel_id: channel_id.to_proto(),
 759                    message: Some(message.clone()),
 760                },
 761            )
 762        })
 763        .await?;
 764        self.peer
 765            .respond(
 766                receipt,
 767                proto::SendChannelMessageResponse {
 768                    message: Some(message),
 769                },
 770            )
 771            .await?;
 772        Ok(())
 773    }
 774
 775    async fn get_channel_messages(
 776        self: Arc<Self>,
 777        request: TypedEnvelope<proto::GetChannelMessages>,
 778    ) -> tide::Result<()> {
 779        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 780        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 781        if !self
 782            .app_state
 783            .db
 784            .can_user_access_channel(user_id, channel_id)
 785            .await?
 786        {
 787            Err(anyhow!("access denied"))?;
 788        }
 789
 790        let messages = self
 791            .app_state
 792            .db
 793            .get_channel_messages(
 794                channel_id,
 795                MESSAGE_COUNT_PER_PAGE,
 796                Some(MessageId::from_proto(request.payload.before_message_id)),
 797            )
 798            .await?
 799            .into_iter()
 800            .map(|msg| proto::ChannelMessage {
 801                id: msg.id.to_proto(),
 802                body: msg.body,
 803                timestamp: msg.sent_at.unix_timestamp() as u64,
 804                sender_id: msg.sender_id.to_proto(),
 805                nonce: Some(msg.nonce.as_u128().into()),
 806            })
 807            .collect::<Vec<_>>();
 808        self.peer
 809            .respond(
 810                request.receipt(),
 811                proto::GetChannelMessagesResponse {
 812                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 813                    messages,
 814                },
 815            )
 816            .await?;
 817        Ok(())
 818    }
 819
 820    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 821        self.store.read()
 822    }
 823
 824    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 825        self.store.write()
 826    }
 827}
 828
 829pub async fn broadcast<F, T>(
 830    sender_id: ConnectionId,
 831    receiver_ids: Vec<ConnectionId>,
 832    mut f: F,
 833) -> anyhow::Result<()>
 834where
 835    F: FnMut(ConnectionId) -> T,
 836    T: Future<Output = anyhow::Result<()>>,
 837{
 838    let futures = receiver_ids
 839        .into_iter()
 840        .filter(|id| *id != sender_id)
 841        .map(|id| f(id));
 842    futures::future::try_join_all(futures).await?;
 843    Ok(())
 844}
 845
 846pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 847    let server = Server::new(app.state().clone(), rpc.clone(), None);
 848    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 849        let server = server.clone();
 850        async move {
 851            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 852
 853            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 854            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 855            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 856            let client_protocol_version: Option<u32> = request
 857                .header("X-Zed-Protocol-Version")
 858                .and_then(|v| v.as_str().parse().ok());
 859
 860            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 861                return Ok(Response::new(StatusCode::UpgradeRequired));
 862            }
 863
 864            let header = match request.header("Sec-Websocket-Key") {
 865                Some(h) => h.as_str(),
 866                None => return Err(anyhow!("expected sec-websocket-key"))?,
 867            };
 868
 869            let user_id = process_auth_header(&request).await?;
 870
 871            let mut response = Response::new(StatusCode::SwitchingProtocols);
 872            response.insert_header(UPGRADE, "websocket");
 873            response.insert_header(CONNECTION, "Upgrade");
 874            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 875            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 876            response.insert_header("Sec-Websocket-Version", "13");
 877
 878            let http_res: &mut tide::http::Response = response.as_mut();
 879            let upgrade_receiver = http_res.recv_upgrade().await;
 880            let addr = request.remote().unwrap_or("unknown").to_string();
 881            task::spawn(async move {
 882                if let Some(stream) = upgrade_receiver.await {
 883                    server
 884                        .handle_connection(
 885                            Connection::new(
 886                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 887                            ),
 888                            addr,
 889                            user_id,
 890                            None,
 891                        )
 892                        .await;
 893                }
 894            });
 895
 896            Ok(response)
 897        }
 898    });
 899}
 900
 901fn header_contains_ignore_case<T>(
 902    request: &tide::Request<T>,
 903    header_name: HeaderName,
 904    value: &str,
 905) -> bool {
 906    request
 907        .header(header_name)
 908        .map(|h| {
 909            h.as_str()
 910                .split(',')
 911                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 912        })
 913        .unwrap_or(false)
 914}
 915
 916#[cfg(test)]
 917mod tests {
 918    use super::*;
 919    use crate::{
 920        auth,
 921        db::{tests::TestDb, UserId},
 922        github, AppState, Config,
 923    };
 924    use ::rpc::Peer;
 925    use async_std::task;
 926    use gpui::{ModelHandle, TestAppContext};
 927    use parking_lot::Mutex;
 928    use postage::{mpsc, watch};
 929    use rpc::PeerId;
 930    use serde_json::json;
 931    use sqlx::types::time::OffsetDateTime;
 932    use std::{
 933        ops::Deref,
 934        path::Path,
 935        rc::Rc,
 936        sync::{
 937            atomic::{AtomicBool, Ordering::SeqCst},
 938            Arc,
 939        },
 940        time::Duration,
 941    };
 942    use zed::{
 943        client::{
 944            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
 945            EstablishConnectionError, UserStore,
 946        },
 947        contacts_panel::JoinWorktree,
 948        editor::{Editor, EditorSettings, Input, MultiBuffer},
 949        fs::{FakeFs, Fs as _},
 950        language::{
 951            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
 952            LanguageRegistry, LanguageServerConfig, Point,
 953        },
 954        lsp,
 955        project::{ProjectPath, Worktree},
 956        test::test_app_state,
 957        workspace::Workspace,
 958    };
 959
 960    #[gpui::test]
 961    async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
 962        let (window_b, _) = cx_b.add_window(|_| EmptyView);
 963        let lang_registry = Arc::new(LanguageRegistry::new());
 964
 965        // Connect to a server as 2 clients.
 966        let mut server = TestServer::start().await;
 967        let client_a = server.create_client(&mut cx_a, "user_a").await;
 968        let client_b = server.create_client(&mut cx_b, "user_b").await;
 969
 970        cx_a.foreground().forbid_parking();
 971
 972        // Share a local worktree as client A
 973        let fs = Arc::new(FakeFs::new());
 974        fs.insert_tree(
 975            "/a",
 976            json!({
 977                ".zed.toml": r#"collaborators = ["user_b"]"#,
 978                "a.txt": "a-contents",
 979                "b.txt": "b-contents",
 980            }),
 981        )
 982        .await;
 983        let worktree_a = Worktree::open_local(
 984            client_a.clone(),
 985            client_a.user_store.clone(),
 986            "/a".as_ref(),
 987            fs,
 988            lang_registry.clone(),
 989            &mut cx_a.to_async(),
 990        )
 991        .await
 992        .unwrap();
 993        worktree_a
 994            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
 995            .await;
 996        let worktree_id = worktree_a
 997            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
 998            .await
 999            .unwrap();
1000
1001        // Join that worktree as client B, and see that a guest has joined as client A.
1002        let worktree_b = Worktree::open_remote(
1003            client_b.clone(),
1004            worktree_id,
1005            lang_registry.clone(),
1006            client_b.user_store.clone(),
1007            &mut cx_b.to_async(),
1008        )
1009        .await
1010        .unwrap();
1011
1012        let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| {
1013            assert_eq!(
1014                tree.collaborators()
1015                    .get(&client_a.peer_id)
1016                    .unwrap()
1017                    .user
1018                    .github_login,
1019                "user_a"
1020            );
1021            tree.replica_id()
1022        });
1023        worktree_a
1024            .condition(&cx_a, |tree, _| {
1025                tree.collaborators()
1026                    .get(&client_b.peer_id)
1027                    .map_or(false, |collaborator| {
1028                        collaborator.replica_id == replica_id_b
1029                            && collaborator.user.github_login == "user_b"
1030                    })
1031            })
1032            .await;
1033
1034        // Open the same file as client B and client A.
1035        let buffer_b = worktree_b
1036            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1037            .await
1038            .unwrap();
1039        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1040        buffer_b.read_with(&cx_b, |buf, cx| {
1041            assert_eq!(buf.read(cx).text(), "b-contents")
1042        });
1043        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1044        let buffer_a = worktree_a
1045            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1046            .await
1047            .unwrap();
1048
1049        let editor_b = cx_b.add_view(window_b, |cx| {
1050            Editor::for_buffer(buffer_b, Rc::new(|cx| EditorSettings::test(cx)), cx)
1051        });
1052        // TODO
1053        // // Create a selection set as client B and see that selection set as client A.
1054        // buffer_a
1055        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1056        //     .await;
1057
1058        // Edit the buffer as client B and see that edit as client A.
1059        editor_b.update(&mut cx_b, |editor, cx| {
1060            editor.handle_input(&Input("ok, ".into()), cx)
1061        });
1062        buffer_a
1063            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1064            .await;
1065
1066        // TODO
1067        // // Remove the selection set as client B, see those selections disappear as client A.
1068        cx_b.update(move |_| drop(editor_b));
1069        // buffer_a
1070        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1071        //     .await;
1072
1073        // Close the buffer as client A, see that the buffer is closed.
1074        cx_a.update(move |_| drop(buffer_a));
1075        worktree_a
1076            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1077            .await;
1078
1079        // Dropping the worktree removes client B from client A's collaborators.
1080        cx_b.update(move |_| drop(worktree_b));
1081        worktree_a
1082            .condition(&cx_a, |tree, _| tree.collaborators().is_empty())
1083            .await;
1084    }
1085
1086    #[gpui::test]
1087    async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1088        cx_b.update(zed::contacts_panel::init);
1089        let mut app_state_a = cx_a.update(test_app_state);
1090        let mut app_state_b = cx_b.update(test_app_state);
1091
1092        // Connect to a server as 2 clients.
1093        let mut server = TestServer::start().await;
1094        let client_a = server.create_client(&mut cx_a, "user_a").await;
1095        let client_b = server.create_client(&mut cx_b, "user_b").await;
1096        Arc::get_mut(&mut app_state_a).unwrap().client = client_a.clone();
1097        Arc::get_mut(&mut app_state_a).unwrap().user_store = client_a.user_store.clone();
1098        Arc::get_mut(&mut app_state_b).unwrap().client = client_b.clone();
1099        Arc::get_mut(&mut app_state_b).unwrap().user_store = client_b.user_store.clone();
1100
1101        cx_a.foreground().forbid_parking();
1102
1103        // Share a local worktree as client A
1104        let fs = Arc::new(FakeFs::new());
1105        fs.insert_tree(
1106            "/a",
1107            json!({
1108                ".zed.toml": r#"collaborators = ["user_b"]"#,
1109                "a.txt": "a-contents",
1110                "b.txt": "b-contents",
1111            }),
1112        )
1113        .await;
1114        let worktree_a = Worktree::open_local(
1115            app_state_a.client.clone(),
1116            app_state_a.user_store.clone(),
1117            "/a".as_ref(),
1118            fs,
1119            app_state_a.languages.clone(),
1120            &mut cx_a.to_async(),
1121        )
1122        .await
1123        .unwrap();
1124        worktree_a
1125            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1126            .await;
1127
1128        let remote_worktree_id = worktree_a
1129            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1130            .await
1131            .unwrap();
1132
1133        let (window_b, workspace_b) =
1134            cx_b.add_window(|cx| Workspace::new(&app_state_b.as_ref().into(), cx));
1135        cx_b.update(|cx| {
1136            cx.dispatch_action(
1137                window_b,
1138                vec![workspace_b.id()],
1139                &JoinWorktree(remote_worktree_id),
1140            );
1141        });
1142        workspace_b
1143            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1144            .await;
1145
1146        let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1147            let active_pane = workspace.active_pane().read(cx);
1148            assert!(active_pane.active_item().is_none());
1149            workspace.worktrees(cx).first().unwrap().id()
1150        });
1151        workspace_b
1152            .update(&mut cx_b, |workspace, cx| {
1153                workspace.open_entry(
1154                    ProjectPath {
1155                        worktree_id: local_worktree_id_b,
1156                        path: Path::new("a.txt").into(),
1157                    },
1158                    cx,
1159                )
1160            })
1161            .unwrap()
1162            .await;
1163        workspace_b.read_with(&cx_b, |workspace, cx| {
1164            let active_pane = workspace.active_pane().read(cx);
1165            assert!(active_pane.active_item().is_some());
1166        });
1167
1168        worktree_a.update(&mut cx_a, |tree, cx| {
1169            tree.as_local_mut().unwrap().unshare(cx);
1170        });
1171        workspace_b
1172            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1173            .await;
1174        workspace_b.read_with(&cx_b, |workspace, cx| {
1175            let active_pane = workspace.active_pane().read(cx);
1176            assert!(active_pane.active_item().is_none());
1177        });
1178    }
1179
1180    #[gpui::test]
1181    async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1182        mut cx_a: TestAppContext,
1183        mut cx_b: TestAppContext,
1184        mut cx_c: TestAppContext,
1185    ) {
1186        cx_a.foreground().forbid_parking();
1187        let lang_registry = Arc::new(LanguageRegistry::new());
1188
1189        // Connect to a server as 3 clients.
1190        let mut server = TestServer::start().await;
1191        let client_a = server.create_client(&mut cx_a, "user_a").await;
1192        let client_b = server.create_client(&mut cx_b, "user_b").await;
1193        let client_c = server.create_client(&mut cx_c, "user_c").await;
1194
1195        let fs = Arc::new(FakeFs::new());
1196
1197        // Share a worktree as client A.
1198        fs.insert_tree(
1199            "/a",
1200            json!({
1201                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1202                "file1": "",
1203                "file2": ""
1204            }),
1205        )
1206        .await;
1207
1208        let worktree_a = Worktree::open_local(
1209            client_a.clone(),
1210            client_a.user_store.clone(),
1211            "/a".as_ref(),
1212            fs.clone(),
1213            lang_registry.clone(),
1214            &mut cx_a.to_async(),
1215        )
1216        .await
1217        .unwrap();
1218        worktree_a
1219            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1220            .await;
1221        let worktree_id = worktree_a
1222            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1223            .await
1224            .unwrap();
1225
1226        // Join that worktree as clients B and C.
1227        let worktree_b = Worktree::open_remote(
1228            client_b.clone(),
1229            worktree_id,
1230            lang_registry.clone(),
1231            client_b.user_store.clone(),
1232            &mut cx_b.to_async(),
1233        )
1234        .await
1235        .unwrap();
1236        let worktree_c = Worktree::open_remote(
1237            client_c.clone(),
1238            worktree_id,
1239            lang_registry.clone(),
1240            client_c.user_store.clone(),
1241            &mut cx_c.to_async(),
1242        )
1243        .await
1244        .unwrap();
1245
1246        // Open and edit a buffer as both guests B and C.
1247        let buffer_b = worktree_b
1248            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1249            .await
1250            .unwrap();
1251        let buffer_c = worktree_c
1252            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1253            .await
1254            .unwrap();
1255        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1256        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1257
1258        // Open and edit that buffer as the host.
1259        let buffer_a = worktree_a
1260            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1261            .await
1262            .unwrap();
1263
1264        buffer_a
1265            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1266            .await;
1267        buffer_a.update(&mut cx_a, |buf, cx| {
1268            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1269        });
1270
1271        // Wait for edits to propagate
1272        buffer_a
1273            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1274            .await;
1275        buffer_b
1276            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1277            .await;
1278        buffer_c
1279            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1280            .await;
1281
1282        // Edit the buffer as the host and concurrently save as guest B.
1283        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1284        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1285        save_b.await.unwrap();
1286        assert_eq!(
1287            fs.load("/a/file1".as_ref()).await.unwrap(),
1288            "hi-a, i-am-c, i-am-b, i-am-a"
1289        );
1290        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1291        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1292        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1293
1294        // Make changes on host's file system, see those changes on the guests.
1295        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1296            .await
1297            .unwrap();
1298        fs.insert_file(Path::new("/a/file4"), "4".into())
1299            .await
1300            .unwrap();
1301
1302        worktree_b
1303            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1304            .await;
1305        worktree_c
1306            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1307            .await;
1308        worktree_b.read_with(&cx_b, |tree, _| {
1309            assert_eq!(
1310                tree.paths()
1311                    .map(|p| p.to_string_lossy())
1312                    .collect::<Vec<_>>(),
1313                &[".zed.toml", "file1", "file3", "file4"]
1314            )
1315        });
1316        worktree_c.read_with(&cx_c, |tree, _| {
1317            assert_eq!(
1318                tree.paths()
1319                    .map(|p| p.to_string_lossy())
1320                    .collect::<Vec<_>>(),
1321                &[".zed.toml", "file1", "file3", "file4"]
1322            )
1323        });
1324    }
1325
1326    #[gpui::test]
1327    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1328        cx_a.foreground().forbid_parking();
1329        let lang_registry = Arc::new(LanguageRegistry::new());
1330
1331        // Connect to a server as 2 clients.
1332        let mut server = TestServer::start().await;
1333        let client_a = server.create_client(&mut cx_a, "user_a").await;
1334        let client_b = server.create_client(&mut cx_b, "user_b").await;
1335
1336        // Share a local worktree as client A
1337        let fs = Arc::new(FakeFs::new());
1338        fs.insert_tree(
1339            "/dir",
1340            json!({
1341                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1342                "a.txt": "a-contents",
1343            }),
1344        )
1345        .await;
1346
1347        let worktree_a = Worktree::open_local(
1348            client_a.clone(),
1349            client_a.user_store.clone(),
1350            "/dir".as_ref(),
1351            fs,
1352            lang_registry.clone(),
1353            &mut cx_a.to_async(),
1354        )
1355        .await
1356        .unwrap();
1357        worktree_a
1358            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1359            .await;
1360        let worktree_id = worktree_a
1361            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1362            .await
1363            .unwrap();
1364
1365        // Join that worktree as client B, and see that a guest has joined as client A.
1366        let worktree_b = Worktree::open_remote(
1367            client_b.clone(),
1368            worktree_id,
1369            lang_registry.clone(),
1370            client_b.user_store.clone(),
1371            &mut cx_b.to_async(),
1372        )
1373        .await
1374        .unwrap();
1375
1376        let buffer_b = worktree_b
1377            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1378            .await
1379            .unwrap();
1380        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1381
1382        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1383        buffer_b.read_with(&cx_b, |buf, _| {
1384            assert!(buf.is_dirty());
1385            assert!(!buf.has_conflict());
1386        });
1387
1388        buffer_b
1389            .update(&mut cx_b, |buf, cx| buf.save(cx))
1390            .unwrap()
1391            .await
1392            .unwrap();
1393        worktree_b
1394            .condition(&cx_b, |_, cx| {
1395                buffer_b.read(cx).file().unwrap().mtime() != mtime
1396            })
1397            .await;
1398        buffer_b.read_with(&cx_b, |buf, _| {
1399            assert!(!buf.is_dirty());
1400            assert!(!buf.has_conflict());
1401        });
1402
1403        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1404        buffer_b.read_with(&cx_b, |buf, _| {
1405            assert!(buf.is_dirty());
1406            assert!(!buf.has_conflict());
1407        });
1408    }
1409
1410    #[gpui::test]
1411    async fn test_editing_while_guest_opens_buffer(
1412        mut cx_a: TestAppContext,
1413        mut cx_b: TestAppContext,
1414    ) {
1415        cx_a.foreground().forbid_parking();
1416        let lang_registry = Arc::new(LanguageRegistry::new());
1417
1418        // Connect to a server as 2 clients.
1419        let mut server = TestServer::start().await;
1420        let client_a = server.create_client(&mut cx_a, "user_a").await;
1421        let client_b = server.create_client(&mut cx_b, "user_b").await;
1422
1423        // Share a local worktree as client A
1424        let fs = Arc::new(FakeFs::new());
1425        fs.insert_tree(
1426            "/dir",
1427            json!({
1428                ".zed.toml": r#"collaborators = ["user_b"]"#,
1429                "a.txt": "a-contents",
1430            }),
1431        )
1432        .await;
1433        let worktree_a = Worktree::open_local(
1434            client_a.clone(),
1435            client_a.user_store.clone(),
1436            "/dir".as_ref(),
1437            fs,
1438            lang_registry.clone(),
1439            &mut cx_a.to_async(),
1440        )
1441        .await
1442        .unwrap();
1443        worktree_a
1444            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1445            .await;
1446        let worktree_id = worktree_a
1447            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1448            .await
1449            .unwrap();
1450
1451        // Join that worktree as client B, and see that a guest has joined as client A.
1452        let worktree_b = Worktree::open_remote(
1453            client_b.clone(),
1454            worktree_id,
1455            lang_registry.clone(),
1456            client_b.user_store.clone(),
1457            &mut cx_b.to_async(),
1458        )
1459        .await
1460        .unwrap();
1461
1462        let buffer_a = worktree_a
1463            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1464            .await
1465            .unwrap();
1466        let buffer_b = cx_b
1467            .background()
1468            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1469
1470        task::yield_now().await;
1471        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1472
1473        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1474        let buffer_b = buffer_b.await.unwrap();
1475        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1476    }
1477
1478    #[gpui::test]
1479    async fn test_leaving_worktree_while_opening_buffer(
1480        mut cx_a: TestAppContext,
1481        mut cx_b: TestAppContext,
1482    ) {
1483        cx_a.foreground().forbid_parking();
1484        let lang_registry = Arc::new(LanguageRegistry::new());
1485
1486        // Connect to a server as 2 clients.
1487        let mut server = TestServer::start().await;
1488        let client_a = server.create_client(&mut cx_a, "user_a").await;
1489        let client_b = server.create_client(&mut cx_b, "user_b").await;
1490
1491        // Share a local worktree as client A
1492        let fs = Arc::new(FakeFs::new());
1493        fs.insert_tree(
1494            "/dir",
1495            json!({
1496                ".zed.toml": r#"collaborators = ["user_b"]"#,
1497                "a.txt": "a-contents",
1498            }),
1499        )
1500        .await;
1501        let worktree_a = Worktree::open_local(
1502            client_a.clone(),
1503            client_a.user_store.clone(),
1504            "/dir".as_ref(),
1505            fs,
1506            lang_registry.clone(),
1507            &mut cx_a.to_async(),
1508        )
1509        .await
1510        .unwrap();
1511        worktree_a
1512            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1513            .await;
1514        let worktree_id = worktree_a
1515            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1516            .await
1517            .unwrap();
1518
1519        // Join that worktree as client B, and see that a guest has joined as client A.
1520        let worktree_b = Worktree::open_remote(
1521            client_b.clone(),
1522            worktree_id,
1523            lang_registry.clone(),
1524            client_b.user_store.clone(),
1525            &mut cx_b.to_async(),
1526        )
1527        .await
1528        .unwrap();
1529        worktree_a
1530            .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1531            .await;
1532
1533        let buffer_b = cx_b
1534            .background()
1535            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1536        cx_b.update(|_| drop(worktree_b));
1537        drop(buffer_b);
1538        worktree_a
1539            .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1540            .await;
1541    }
1542
1543    #[gpui::test]
1544    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1545        cx_a.foreground().forbid_parking();
1546        let lang_registry = Arc::new(LanguageRegistry::new());
1547
1548        // Connect to a server as 2 clients.
1549        let mut server = TestServer::start().await;
1550        let client_a = server.create_client(&mut cx_a, "user_a").await;
1551        let client_b = server.create_client(&mut cx_b, "user_b").await;
1552
1553        // Share a local worktree as client A
1554        let fs = Arc::new(FakeFs::new());
1555        fs.insert_tree(
1556            "/a",
1557            json!({
1558                ".zed.toml": r#"collaborators = ["user_b"]"#,
1559                "a.txt": "a-contents",
1560                "b.txt": "b-contents",
1561            }),
1562        )
1563        .await;
1564        let worktree_a = Worktree::open_local(
1565            client_a.clone(),
1566            client_a.user_store.clone(),
1567            "/a".as_ref(),
1568            fs,
1569            lang_registry.clone(),
1570            &mut cx_a.to_async(),
1571        )
1572        .await
1573        .unwrap();
1574        worktree_a
1575            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1576            .await;
1577        let worktree_id = worktree_a
1578            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1579            .await
1580            .unwrap();
1581
1582        // Join that worktree as client B, and see that a guest has joined as client A.
1583        let _worktree_b = Worktree::open_remote(
1584            client_b.clone(),
1585            worktree_id,
1586            lang_registry.clone(),
1587            client_b.user_store.clone(),
1588            &mut cx_b.to_async(),
1589        )
1590        .await
1591        .unwrap();
1592        worktree_a
1593            .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1594            .await;
1595
1596        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1597        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1598        worktree_a
1599            .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1600            .await;
1601    }
1602
1603    #[gpui::test]
1604    async fn test_collaborating_with_diagnostics(
1605        mut cx_a: TestAppContext,
1606        mut cx_b: TestAppContext,
1607    ) {
1608        cx_a.foreground().forbid_parking();
1609        let (language_server_config, mut fake_language_server) =
1610            LanguageServerConfig::fake(cx_a.background()).await;
1611        let mut lang_registry = LanguageRegistry::new();
1612        lang_registry.add(Arc::new(Language::new(
1613            LanguageConfig {
1614                name: "Rust".to_string(),
1615                path_suffixes: vec!["rs".to_string()],
1616                language_server: Some(language_server_config),
1617                ..Default::default()
1618            },
1619            Some(tree_sitter_rust::language()),
1620        )));
1621
1622        let lang_registry = Arc::new(lang_registry);
1623
1624        // Connect to a server as 2 clients.
1625        let mut server = TestServer::start().await;
1626        let client_a = server.create_client(&mut cx_a, "user_a").await;
1627        let client_b = server.create_client(&mut cx_b, "user_b").await;
1628
1629        // Share a local worktree as client A
1630        let fs = Arc::new(FakeFs::new());
1631        fs.insert_tree(
1632            "/a",
1633            json!({
1634                ".zed.toml": r#"collaborators = ["user_b"]"#,
1635                "a.rs": "let one = two",
1636                "other.rs": "",
1637            }),
1638        )
1639        .await;
1640        let worktree_a = Worktree::open_local(
1641            client_a.clone(),
1642            client_a.user_store.clone(),
1643            "/a".as_ref(),
1644            fs,
1645            lang_registry.clone(),
1646            &mut cx_a.to_async(),
1647        )
1648        .await
1649        .unwrap();
1650        worktree_a
1651            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1652            .await;
1653        let worktree_id = worktree_a
1654            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1655            .await
1656            .unwrap();
1657
1658        // Cause language server to start.
1659        let _ = cx_a
1660            .background()
1661            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1662                worktree.open_buffer("other.rs", cx)
1663            }))
1664            .await
1665            .unwrap();
1666
1667        // Simulate a language server reporting errors for a file.
1668        fake_language_server
1669            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1670                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1671                version: None,
1672                diagnostics: vec![
1673                    lsp::Diagnostic {
1674                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1675                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1676                        message: "message 1".to_string(),
1677                        ..Default::default()
1678                    },
1679                    lsp::Diagnostic {
1680                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1681                        range: lsp::Range::new(
1682                            lsp::Position::new(0, 10),
1683                            lsp::Position::new(0, 13),
1684                        ),
1685                        message: "message 2".to_string(),
1686                        ..Default::default()
1687                    },
1688                ],
1689            })
1690            .await;
1691
1692        // Join the worktree as client B.
1693        let worktree_b = Worktree::open_remote(
1694            client_b.clone(),
1695            worktree_id,
1696            lang_registry.clone(),
1697            client_b.user_store.clone(),
1698            &mut cx_b.to_async(),
1699        )
1700        .await
1701        .unwrap();
1702
1703        // Open the file with the errors.
1704        let buffer_b = cx_b
1705            .background()
1706            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1707            .await
1708            .unwrap();
1709
1710        buffer_b.read_with(&cx_b, |buffer, _| {
1711            assert_eq!(
1712                buffer
1713                    .snapshot()
1714                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1715                    .collect::<Vec<_>>(),
1716                &[
1717                    DiagnosticEntry {
1718                        range: Point::new(0, 4)..Point::new(0, 7),
1719                        diagnostic: Diagnostic {
1720                            group_id: 0,
1721                            message: "message 1".to_string(),
1722                            severity: lsp::DiagnosticSeverity::ERROR,
1723                            is_primary: true,
1724                            ..Default::default()
1725                        }
1726                    },
1727                    DiagnosticEntry {
1728                        range: Point::new(0, 10)..Point::new(0, 13),
1729                        diagnostic: Diagnostic {
1730                            group_id: 1,
1731                            severity: lsp::DiagnosticSeverity::WARNING,
1732                            message: "message 2".to_string(),
1733                            is_primary: true,
1734                            ..Default::default()
1735                        }
1736                    }
1737                ]
1738            );
1739        });
1740    }
1741
1742    #[gpui::test]
1743    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1744        cx_a.foreground().forbid_parking();
1745
1746        // Connect to a server as 2 clients.
1747        let mut server = TestServer::start().await;
1748        let client_a = server.create_client(&mut cx_a, "user_a").await;
1749        let client_b = server.create_client(&mut cx_b, "user_b").await;
1750
1751        // Create an org that includes these 2 users.
1752        let db = &server.app_state.db;
1753        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1754        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1755            .await
1756            .unwrap();
1757        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1758            .await
1759            .unwrap();
1760
1761        // Create a channel that includes all the users.
1762        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1763        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1764            .await
1765            .unwrap();
1766        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1767            .await
1768            .unwrap();
1769        db.create_channel_message(
1770            channel_id,
1771            client_b.current_user_id(&cx_b),
1772            "hello A, it's B.",
1773            OffsetDateTime::now_utc(),
1774            1,
1775        )
1776        .await
1777        .unwrap();
1778
1779        let channels_a = cx_a
1780            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1781        channels_a
1782            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1783            .await;
1784        channels_a.read_with(&cx_a, |list, _| {
1785            assert_eq!(
1786                list.available_channels().unwrap(),
1787                &[ChannelDetails {
1788                    id: channel_id.to_proto(),
1789                    name: "test-channel".to_string()
1790                }]
1791            )
1792        });
1793        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1794            this.get_channel(channel_id.to_proto(), cx).unwrap()
1795        });
1796        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1797        channel_a
1798            .condition(&cx_a, |channel, _| {
1799                channel_messages(channel)
1800                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1801            })
1802            .await;
1803
1804        let channels_b = cx_b
1805            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1806        channels_b
1807            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1808            .await;
1809        channels_b.read_with(&cx_b, |list, _| {
1810            assert_eq!(
1811                list.available_channels().unwrap(),
1812                &[ChannelDetails {
1813                    id: channel_id.to_proto(),
1814                    name: "test-channel".to_string()
1815                }]
1816            )
1817        });
1818
1819        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1820            this.get_channel(channel_id.to_proto(), cx).unwrap()
1821        });
1822        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1823        channel_b
1824            .condition(&cx_b, |channel, _| {
1825                channel_messages(channel)
1826                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1827            })
1828            .await;
1829
1830        channel_a
1831            .update(&mut cx_a, |channel, cx| {
1832                channel
1833                    .send_message("oh, hi B.".to_string(), cx)
1834                    .unwrap()
1835                    .detach();
1836                let task = channel.send_message("sup".to_string(), cx).unwrap();
1837                assert_eq!(
1838                    channel_messages(channel),
1839                    &[
1840                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1841                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1842                        ("user_a".to_string(), "sup".to_string(), true)
1843                    ]
1844                );
1845                task
1846            })
1847            .await
1848            .unwrap();
1849
1850        channel_b
1851            .condition(&cx_b, |channel, _| {
1852                channel_messages(channel)
1853                    == [
1854                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1855                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1856                        ("user_a".to_string(), "sup".to_string(), false),
1857                    ]
1858            })
1859            .await;
1860
1861        assert_eq!(
1862            server
1863                .state()
1864                .await
1865                .channel(channel_id)
1866                .unwrap()
1867                .connection_ids
1868                .len(),
1869            2
1870        );
1871        cx_b.update(|_| drop(channel_b));
1872        server
1873            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1874            .await;
1875
1876        cx_a.update(|_| drop(channel_a));
1877        server
1878            .condition(|state| state.channel(channel_id).is_none())
1879            .await;
1880    }
1881
1882    #[gpui::test]
1883    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1884        cx_a.foreground().forbid_parking();
1885
1886        let mut server = TestServer::start().await;
1887        let client_a = server.create_client(&mut cx_a, "user_a").await;
1888
1889        let db = &server.app_state.db;
1890        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1891        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1892        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1893            .await
1894            .unwrap();
1895        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1896            .await
1897            .unwrap();
1898
1899        let channels_a = cx_a
1900            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1901        channels_a
1902            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1903            .await;
1904        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1905            this.get_channel(channel_id.to_proto(), cx).unwrap()
1906        });
1907
1908        // Messages aren't allowed to be too long.
1909        channel_a
1910            .update(&mut cx_a, |channel, cx| {
1911                let long_body = "this is long.\n".repeat(1024);
1912                channel.send_message(long_body, cx).unwrap()
1913            })
1914            .await
1915            .unwrap_err();
1916
1917        // Messages aren't allowed to be blank.
1918        channel_a.update(&mut cx_a, |channel, cx| {
1919            channel.send_message(String::new(), cx).unwrap_err()
1920        });
1921
1922        // Leading and trailing whitespace are trimmed.
1923        channel_a
1924            .update(&mut cx_a, |channel, cx| {
1925                channel
1926                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
1927                    .unwrap()
1928            })
1929            .await
1930            .unwrap();
1931        assert_eq!(
1932            db.get_channel_messages(channel_id, 10, None)
1933                .await
1934                .unwrap()
1935                .iter()
1936                .map(|m| &m.body)
1937                .collect::<Vec<_>>(),
1938            &["surrounded by whitespace"]
1939        );
1940    }
1941
1942    #[gpui::test]
1943    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1944        cx_a.foreground().forbid_parking();
1945
1946        // Connect to a server as 2 clients.
1947        let mut server = TestServer::start().await;
1948        let client_a = server.create_client(&mut cx_a, "user_a").await;
1949        let client_b = server.create_client(&mut cx_b, "user_b").await;
1950        let mut status_b = client_b.status();
1951
1952        // Create an org that includes these 2 users.
1953        let db = &server.app_state.db;
1954        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1955        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1956            .await
1957            .unwrap();
1958        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1959            .await
1960            .unwrap();
1961
1962        // Create a channel that includes all the users.
1963        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1964        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1965            .await
1966            .unwrap();
1967        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1968            .await
1969            .unwrap();
1970        db.create_channel_message(
1971            channel_id,
1972            client_b.current_user_id(&cx_b),
1973            "hello A, it's B.",
1974            OffsetDateTime::now_utc(),
1975            2,
1976        )
1977        .await
1978        .unwrap();
1979
1980        let channels_a = cx_a
1981            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1982        channels_a
1983            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1984            .await;
1985
1986        channels_a.read_with(&cx_a, |list, _| {
1987            assert_eq!(
1988                list.available_channels().unwrap(),
1989                &[ChannelDetails {
1990                    id: channel_id.to_proto(),
1991                    name: "test-channel".to_string()
1992                }]
1993            )
1994        });
1995        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1996            this.get_channel(channel_id.to_proto(), cx).unwrap()
1997        });
1998        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1999        channel_a
2000            .condition(&cx_a, |channel, _| {
2001                channel_messages(channel)
2002                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2003            })
2004            .await;
2005
2006        let channels_b = cx_b
2007            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2008        channels_b
2009            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2010            .await;
2011        channels_b.read_with(&cx_b, |list, _| {
2012            assert_eq!(
2013                list.available_channels().unwrap(),
2014                &[ChannelDetails {
2015                    id: channel_id.to_proto(),
2016                    name: "test-channel".to_string()
2017                }]
2018            )
2019        });
2020
2021        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2022            this.get_channel(channel_id.to_proto(), cx).unwrap()
2023        });
2024        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2025        channel_b
2026            .condition(&cx_b, |channel, _| {
2027                channel_messages(channel)
2028                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2029            })
2030            .await;
2031
2032        // Disconnect client B, ensuring we can still access its cached channel data.
2033        server.forbid_connections();
2034        server.disconnect_client(client_b.current_user_id(&cx_b));
2035        while !matches!(
2036            status_b.recv().await,
2037            Some(client::Status::ReconnectionError { .. })
2038        ) {}
2039
2040        channels_b.read_with(&cx_b, |channels, _| {
2041            assert_eq!(
2042                channels.available_channels().unwrap(),
2043                [ChannelDetails {
2044                    id: channel_id.to_proto(),
2045                    name: "test-channel".to_string()
2046                }]
2047            )
2048        });
2049        channel_b.read_with(&cx_b, |channel, _| {
2050            assert_eq!(
2051                channel_messages(channel),
2052                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2053            )
2054        });
2055
2056        // Send a message from client B while it is disconnected.
2057        channel_b
2058            .update(&mut cx_b, |channel, cx| {
2059                let task = channel
2060                    .send_message("can you see this?".to_string(), cx)
2061                    .unwrap();
2062                assert_eq!(
2063                    channel_messages(channel),
2064                    &[
2065                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2066                        ("user_b".to_string(), "can you see this?".to_string(), true)
2067                    ]
2068                );
2069                task
2070            })
2071            .await
2072            .unwrap_err();
2073
2074        // Send a message from client A while B is disconnected.
2075        channel_a
2076            .update(&mut cx_a, |channel, cx| {
2077                channel
2078                    .send_message("oh, hi B.".to_string(), cx)
2079                    .unwrap()
2080                    .detach();
2081                let task = channel.send_message("sup".to_string(), cx).unwrap();
2082                assert_eq!(
2083                    channel_messages(channel),
2084                    &[
2085                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2086                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2087                        ("user_a".to_string(), "sup".to_string(), true)
2088                    ]
2089                );
2090                task
2091            })
2092            .await
2093            .unwrap();
2094
2095        // Give client B a chance to reconnect.
2096        server.allow_connections();
2097        cx_b.foreground().advance_clock(Duration::from_secs(10));
2098
2099        // Verify that B sees the new messages upon reconnection, as well as the message client B
2100        // sent while offline.
2101        channel_b
2102            .condition(&cx_b, |channel, _| {
2103                channel_messages(channel)
2104                    == [
2105                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2106                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2107                        ("user_a".to_string(), "sup".to_string(), false),
2108                        ("user_b".to_string(), "can you see this?".to_string(), false),
2109                    ]
2110            })
2111            .await;
2112
2113        // Ensure client A and B can communicate normally after reconnection.
2114        channel_a
2115            .update(&mut cx_a, |channel, cx| {
2116                channel.send_message("you online?".to_string(), cx).unwrap()
2117            })
2118            .await
2119            .unwrap();
2120        channel_b
2121            .condition(&cx_b, |channel, _| {
2122                channel_messages(channel)
2123                    == [
2124                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2125                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2126                        ("user_a".to_string(), "sup".to_string(), false),
2127                        ("user_b".to_string(), "can you see this?".to_string(), false),
2128                        ("user_a".to_string(), "you online?".to_string(), false),
2129                    ]
2130            })
2131            .await;
2132
2133        channel_b
2134            .update(&mut cx_b, |channel, cx| {
2135                channel.send_message("yep".to_string(), cx).unwrap()
2136            })
2137            .await
2138            .unwrap();
2139        channel_a
2140            .condition(&cx_a, |channel, _| {
2141                channel_messages(channel)
2142                    == [
2143                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2144                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2145                        ("user_a".to_string(), "sup".to_string(), false),
2146                        ("user_b".to_string(), "can you see this?".to_string(), false),
2147                        ("user_a".to_string(), "you online?".to_string(), false),
2148                        ("user_b".to_string(), "yep".to_string(), false),
2149                    ]
2150            })
2151            .await;
2152    }
2153
2154    #[gpui::test]
2155    async fn test_contacts(
2156        mut cx_a: TestAppContext,
2157        mut cx_b: TestAppContext,
2158        mut cx_c: TestAppContext,
2159    ) {
2160        cx_a.foreground().forbid_parking();
2161        let lang_registry = Arc::new(LanguageRegistry::new());
2162
2163        // Connect to a server as 3 clients.
2164        let mut server = TestServer::start().await;
2165        let client_a = server.create_client(&mut cx_a, "user_a").await;
2166        let client_b = server.create_client(&mut cx_b, "user_b").await;
2167        let client_c = server.create_client(&mut cx_c, "user_c").await;
2168
2169        let fs = Arc::new(FakeFs::new());
2170
2171        // Share a worktree as client A.
2172        fs.insert_tree(
2173            "/a",
2174            json!({
2175                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2176            }),
2177        )
2178        .await;
2179
2180        let worktree_a = Worktree::open_local(
2181            client_a.clone(),
2182            client_a.user_store.clone(),
2183            "/a".as_ref(),
2184            fs.clone(),
2185            lang_registry.clone(),
2186            &mut cx_a.to_async(),
2187        )
2188        .await
2189        .unwrap();
2190
2191        client_a
2192            .user_store
2193            .condition(&cx_a, |user_store, _| {
2194                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2195            })
2196            .await;
2197        client_b
2198            .user_store
2199            .condition(&cx_b, |user_store, _| {
2200                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2201            })
2202            .await;
2203        client_c
2204            .user_store
2205            .condition(&cx_c, |user_store, _| {
2206                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2207            })
2208            .await;
2209
2210        let worktree_id = worktree_a
2211            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2212            .await
2213            .unwrap();
2214
2215        let _worktree_b = Worktree::open_remote(
2216            client_b.clone(),
2217            worktree_id,
2218            lang_registry.clone(),
2219            client_b.user_store.clone(),
2220            &mut cx_b.to_async(),
2221        )
2222        .await
2223        .unwrap();
2224
2225        client_a
2226            .user_store
2227            .condition(&cx_a, |user_store, _| {
2228                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2229            })
2230            .await;
2231        client_b
2232            .user_store
2233            .condition(&cx_b, |user_store, _| {
2234                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2235            })
2236            .await;
2237        client_c
2238            .user_store
2239            .condition(&cx_c, |user_store, _| {
2240                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2241            })
2242            .await;
2243
2244        worktree_a
2245            .condition(&cx_a, |worktree, _| {
2246                worktree.collaborators().contains_key(&client_b.peer_id)
2247            })
2248            .await;
2249
2250        cx_a.update(move |_| drop(worktree_a));
2251        client_a
2252            .user_store
2253            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2254            .await;
2255        client_b
2256            .user_store
2257            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2258            .await;
2259        client_c
2260            .user_store
2261            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2262            .await;
2263
2264        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2265            user_store
2266                .contacts()
2267                .iter()
2268                .map(|contact| {
2269                    let worktrees = contact
2270                        .worktrees
2271                        .iter()
2272                        .map(|w| {
2273                            (
2274                                w.root_name.as_str(),
2275                                w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2276                            )
2277                        })
2278                        .collect();
2279                    (contact.user.github_login.as_str(), worktrees)
2280                })
2281                .collect()
2282        }
2283    }
2284
2285    struct TestServer {
2286        peer: Arc<Peer>,
2287        app_state: Arc<AppState>,
2288        server: Arc<Server>,
2289        notifications: mpsc::Receiver<()>,
2290        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2291        forbid_connections: Arc<AtomicBool>,
2292        _test_db: TestDb,
2293    }
2294
2295    impl TestServer {
2296        async fn start() -> Self {
2297            let test_db = TestDb::new();
2298            let app_state = Self::build_app_state(&test_db).await;
2299            let peer = Peer::new();
2300            let notifications = mpsc::channel(128);
2301            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2302            Self {
2303                peer,
2304                app_state,
2305                server,
2306                notifications: notifications.1,
2307                connection_killers: Default::default(),
2308                forbid_connections: Default::default(),
2309                _test_db: test_db,
2310            }
2311        }
2312
2313        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2314            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2315            let client_name = name.to_string();
2316            let mut client = Client::new();
2317            let server = self.server.clone();
2318            let connection_killers = self.connection_killers.clone();
2319            let forbid_connections = self.forbid_connections.clone();
2320            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2321
2322            Arc::get_mut(&mut client)
2323                .unwrap()
2324                .override_authenticate(move |cx| {
2325                    cx.spawn(|_| async move {
2326                        let access_token = "the-token".to_string();
2327                        Ok(Credentials {
2328                            user_id: user_id.0 as u64,
2329                            access_token,
2330                        })
2331                    })
2332                })
2333                .override_establish_connection(move |credentials, cx| {
2334                    assert_eq!(credentials.user_id, user_id.0 as u64);
2335                    assert_eq!(credentials.access_token, "the-token");
2336
2337                    let server = server.clone();
2338                    let connection_killers = connection_killers.clone();
2339                    let forbid_connections = forbid_connections.clone();
2340                    let client_name = client_name.clone();
2341                    let connection_id_tx = connection_id_tx.clone();
2342                    cx.spawn(move |cx| async move {
2343                        if forbid_connections.load(SeqCst) {
2344                            Err(EstablishConnectionError::other(anyhow!(
2345                                "server is forbidding connections"
2346                            )))
2347                        } else {
2348                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2349                            connection_killers.lock().insert(user_id, kill_conn);
2350                            cx.background()
2351                                .spawn(server.handle_connection(
2352                                    server_conn,
2353                                    client_name,
2354                                    user_id,
2355                                    Some(connection_id_tx),
2356                                ))
2357                                .detach();
2358                            Ok(client_conn)
2359                        }
2360                    })
2361                });
2362
2363            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2364            client
2365                .authenticate_and_connect(&cx.to_async())
2366                .await
2367                .unwrap();
2368
2369            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2370            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2371            let mut authed_user =
2372                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2373            while authed_user.recv().await.unwrap().is_none() {}
2374
2375            TestClient {
2376                client,
2377                peer_id,
2378                user_store,
2379            }
2380        }
2381
2382        fn disconnect_client(&self, user_id: UserId) {
2383            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2384                let _ = kill_conn.try_send(Some(()));
2385            }
2386        }
2387
2388        fn forbid_connections(&self) {
2389            self.forbid_connections.store(true, SeqCst);
2390        }
2391
2392        fn allow_connections(&self) {
2393            self.forbid_connections.store(false, SeqCst);
2394        }
2395
2396        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2397            let mut config = Config::default();
2398            config.session_secret = "a".repeat(32);
2399            config.database_url = test_db.url.clone();
2400            let github_client = github::AppClient::test();
2401            Arc::new(AppState {
2402                db: test_db.db().clone(),
2403                handlebars: Default::default(),
2404                auth_client: auth::build_client("", ""),
2405                repo_client: github::RepoClient::test(&github_client),
2406                github_client,
2407                config,
2408            })
2409        }
2410
2411        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2412            self.server.store.read()
2413        }
2414
2415        async fn condition<F>(&mut self, mut predicate: F)
2416        where
2417            F: FnMut(&Store) -> bool,
2418        {
2419            async_std::future::timeout(Duration::from_millis(500), async {
2420                while !(predicate)(&*self.server.store.read()) {
2421                    self.notifications.recv().await;
2422                }
2423            })
2424            .await
2425            .expect("condition timed out");
2426        }
2427    }
2428
2429    impl Drop for TestServer {
2430        fn drop(&mut self) {
2431            task::block_on(self.peer.reset());
2432        }
2433    }
2434
2435    struct TestClient {
2436        client: Arc<Client>,
2437        pub peer_id: PeerId,
2438        pub user_store: ModelHandle<UserStore>,
2439    }
2440
2441    impl Deref for TestClient {
2442        type Target = Arc<Client>;
2443
2444        fn deref(&self) -> &Self::Target {
2445            &self.client
2446        }
2447    }
2448
2449    impl TestClient {
2450        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2451            UserId::from_proto(
2452                self.user_store
2453                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2454            )
2455        }
2456    }
2457
2458    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2459        channel
2460            .messages()
2461            .cursor::<()>()
2462            .map(|m| {
2463                (
2464                    m.sender.github_login.clone(),
2465                    m.body.clone(),
2466                    m.is_pending(),
2467                )
2468            })
2469            .collect()
2470    }
2471
2472    struct EmptyView;
2473
2474    impl gpui::Entity for EmptyView {
2475        type Event = ();
2476    }
2477
2478    impl gpui::View for EmptyView {
2479        fn ui_name() -> &'static str {
2480            "empty view"
2481        }
2482
2483        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2484            gpui::Element::boxed(gpui::elements::Empty)
2485        }
2486    }
2487}