rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46
  47impl Server {
  48    pub fn new(
  49        app_state: Arc<AppState>,
  50        peer: Arc<Peer>,
  51        notifications: Option<mpsc::Sender<()>>,
  52    ) -> Arc<Self> {
  53        let mut server = Self {
  54            peer,
  55            app_state,
  56            store: Default::default(),
  57            handlers: Default::default(),
  58            notifications,
  59        };
  60
  61        server
  62            .add_handler(Server::ping)
  63            .add_handler(Server::register_worktree)
  64            .add_handler(Server::unregister_worktree)
  65            .add_handler(Server::share_worktree)
  66            .add_handler(Server::unshare_worktree)
  67            .add_handler(Server::join_worktree)
  68            .add_handler(Server::leave_worktree)
  69            .add_handler(Server::update_worktree)
  70            .add_handler(Server::open_buffer)
  71            .add_handler(Server::close_buffer)
  72            .add_handler(Server::update_buffer)
  73            .add_handler(Server::buffer_saved)
  74            .add_handler(Server::save_buffer)
  75            .add_handler(Server::get_channels)
  76            .add_handler(Server::get_users)
  77            .add_handler(Server::join_channel)
  78            .add_handler(Server::leave_channel)
  79            .add_handler(Server::send_channel_message)
  80            .add_handler(Server::get_channel_messages);
  81
  82        Arc::new(server)
  83    }
  84
  85    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  86    where
  87        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  88        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  89        M: EnvelopedMessage,
  90    {
  91        let prev_handler = self.handlers.insert(
  92            TypeId::of::<M>(),
  93            Box::new(move |server, envelope| {
  94                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
  95                (handler)(server, *envelope).boxed()
  96            }),
  97        );
  98        if prev_handler.is_some() {
  99            panic!("registered a handler for the same message twice");
 100        }
 101        self
 102    }
 103
 104    pub fn handle_connection(
 105        self: &Arc<Self>,
 106        connection: Connection,
 107        addr: String,
 108        user_id: UserId,
 109        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 110    ) -> impl Future<Output = ()> {
 111        let mut this = self.clone();
 112        async move {
 113            let (connection_id, handle_io, mut incoming_rx) =
 114                this.peer.add_connection(connection).await;
 115
 116            if let Some(send_connection_id) = send_connection_id.as_mut() {
 117                let _ = send_connection_id.send(connection_id).await;
 118            }
 119
 120            this.state_mut().add_connection(connection_id, user_id);
 121            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 122                log::error!("error updating contacts for {:?}: {}", user_id, err);
 123            }
 124
 125            let handle_io = handle_io.fuse();
 126            futures::pin_mut!(handle_io);
 127            loop {
 128                let next_message = incoming_rx.recv().fuse();
 129                futures::pin_mut!(next_message);
 130                futures::select_biased! {
 131                    message = next_message => {
 132                        if let Some(message) = message {
 133                            let start_time = Instant::now();
 134                            log::info!("RPC message received: {}", message.payload_type_name());
 135                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 136                                if let Err(err) = (handler)(this.clone(), message).await {
 137                                    log::error!("error handling message: {:?}", err);
 138                                } else {
 139                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 140                                }
 141
 142                                if let Some(mut notifications) = this.notifications.clone() {
 143                                    let _ = notifications.send(()).await;
 144                                }
 145                            } else {
 146                                log::warn!("unhandled message: {}", message.payload_type_name());
 147                            }
 148                        } else {
 149                            log::info!("rpc connection closed {:?}", addr);
 150                            break;
 151                        }
 152                    }
 153                    handle_io = handle_io => {
 154                        if let Err(err) = handle_io {
 155                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 156                        }
 157                        break;
 158                    }
 159                }
 160            }
 161
 162            if let Err(err) = this.sign_out(connection_id).await {
 163                log::error!("error signing out connection {:?} - {:?}", addr, err);
 164            }
 165        }
 166    }
 167
 168    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 169        self.peer.disconnect(connection_id).await;
 170        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 171
 172        for (project_id, project) in removed_connection.hosted_projects {
 173            if let Some(share) = project.share {
 174                broadcast(
 175                    connection_id,
 176                    share.guests.keys().copied().collect(),
 177                    |conn_id| {
 178                        self.peer
 179                            .send(conn_id, proto::UnshareProject { project_id })
 180                    },
 181                )
 182                .await?;
 183            }
 184        }
 185
 186        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 187            broadcast(connection_id, peer_ids, |conn_id| {
 188                self.peer.send(
 189                    conn_id,
 190                    proto::RemoveProjectCollaborator {
 191                        project_id,
 192                        peer_id: connection_id.0,
 193                    },
 194                )
 195            })
 196            .await?;
 197        }
 198
 199        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 200            .await?;
 201
 202        Ok(())
 203    }
 204
 205    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 206        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 207        Ok(())
 208    }
 209
 210    async fn register_worktree(
 211        mut self: Arc<Server>,
 212        request: TypedEnvelope<proto::RegisterWorktree>,
 213    ) -> tide::Result<()> {
 214        let receipt = request.receipt();
 215        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 216
 217        let mut contact_user_ids = HashSet::default();
 218        contact_user_ids.insert(host_user_id);
 219        for github_login in request.payload.authorized_logins {
 220            match self.app_state.db.create_user(&github_login, false).await {
 221                Ok(contact_user_id) => {
 222                    contact_user_ids.insert(contact_user_id);
 223                }
 224                Err(err) => {
 225                    let message = err.to_string();
 226                    self.peer
 227                        .respond_with_error(receipt, proto::Error { message })
 228                        .await?;
 229                    return Ok(());
 230                }
 231            }
 232        }
 233
 234        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 235        let ok = self.state_mut().register_worktree(
 236            request.project_id,
 237            request.worktree_id,
 238            Worktree {
 239                authorized_user_ids: contact_user_ids.clone(),
 240                root_name: request.payload.root_name,
 241            },
 242        );
 243
 244        if ok {
 245            self.peer.respond(receipt, proto::Ack {}).await?;
 246            self.update_contacts_for_users(&contact_user_ids).await?;
 247        } else {
 248            self.peer
 249                .respond_with_error(
 250                    receipt,
 251                    proto::Error {
 252                        message: "no such project".to_string(),
 253                    },
 254                )
 255                .await?;
 256        }
 257
 258        Ok(())
 259    }
 260
 261    async fn unregister_worktree(
 262        mut self: Arc<Server>,
 263        request: TypedEnvelope<proto::UnregisterWorktree>,
 264    ) -> tide::Result<()> {
 265        let project_id = request.payload.project_id;
 266        let worktree_id = request.payload.worktree_id;
 267        let worktree =
 268            self.state_mut()
 269                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 270
 271        if let Some(share) = worktree.share {
 272            broadcast(
 273                request.sender_id,
 274                share.guests.keys().copied().collect(),
 275                |conn_id| {
 276                    self.peer.send(
 277                        conn_id,
 278                        proto::UnregisterWorktree {
 279                            project_id,
 280                            worktree_id,
 281                        },
 282                    )
 283                },
 284            )
 285            .await?;
 286        }
 287        self.update_contacts_for_users(&worktree.authorized_user_ids)
 288            .await?;
 289        Ok(())
 290    }
 291
 292    async fn share_worktree(
 293        mut self: Arc<Server>,
 294        mut request: TypedEnvelope<proto::ShareWorktree>,
 295    ) -> tide::Result<()> {
 296        let worktree = request
 297            .payload
 298            .worktree
 299            .as_mut()
 300            .ok_or_else(|| anyhow!("missing worktree"))?;
 301        let entries = mem::take(&mut worktree.entries)
 302            .into_iter()
 303            .map(|entry| (entry.id, entry))
 304            .collect();
 305
 306        let contact_user_ids =
 307            self.state_mut()
 308                .share_worktree(worktree.id, request.sender_id, entries);
 309        if let Some(contact_user_ids) = contact_user_ids {
 310            self.peer
 311                .respond(request.receipt(), proto::ShareWorktreeResponse {})
 312                .await?;
 313            self.update_contacts_for_users(&contact_user_ids).await?;
 314        } else {
 315            self.peer
 316                .respond_with_error(
 317                    request.receipt(),
 318                    proto::Error {
 319                        message: "no such worktree".to_string(),
 320                    },
 321                )
 322                .await?;
 323        }
 324        Ok(())
 325    }
 326
 327    async fn unshare_worktree(
 328        mut self: Arc<Server>,
 329        request: TypedEnvelope<proto::UnshareWorktree>,
 330    ) -> tide::Result<()> {
 331        let worktree_id = request.payload.worktree_id;
 332        let worktree = self
 333            .state_mut()
 334            .unshare_worktree(worktree_id, request.sender_id)?;
 335
 336        broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
 337            self.peer
 338                .send(conn_id, proto::UnshareWorktree { worktree_id })
 339        })
 340        .await?;
 341        self.update_contacts_for_users(&worktree.authorized_user_ids)
 342            .await?;
 343
 344        Ok(())
 345    }
 346
 347    async fn join_worktree(
 348        mut self: Arc<Server>,
 349        request: TypedEnvelope<proto::JoinWorktree>,
 350    ) -> tide::Result<()> {
 351        let worktree_id = request.payload.worktree_id;
 352
 353        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 354        let response_data = self
 355            .state_mut()
 356            .join_worktree(request.sender_id, user_id, worktree_id)
 357            .and_then(|joined| {
 358                let share = joined.worktree.share()?;
 359                let peer_count = share.guests.len();
 360                let mut collaborators = Vec::with_capacity(peer_count);
 361                collaborators.push(proto::Collaborator {
 362                    peer_id: joined.worktree.host_connection_id.0,
 363                    replica_id: 0,
 364                    user_id: joined.worktree.host_user_id.to_proto(),
 365                });
 366                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 367                    if *peer_conn_id != request.sender_id {
 368                        collaborators.push(proto::Collaborator {
 369                            peer_id: peer_conn_id.0,
 370                            replica_id: *peer_replica_id as u32,
 371                            user_id: peer_user_id.to_proto(),
 372                        });
 373                    }
 374                }
 375                let response = proto::JoinWorktreeResponse {
 376                    worktree: Some(proto::Worktree {
 377                        id: worktree_id,
 378                        root_name: joined.worktree.root_name.clone(),
 379                        entries: share.entries.values().cloned().collect(),
 380                    }),
 381                    replica_id: joined.replica_id as u32,
 382                    collaborators,
 383                };
 384                let connection_ids = joined.worktree.connection_ids();
 385                let contact_user_ids = joined.worktree.authorized_user_ids.clone();
 386                Ok((response, connection_ids, contact_user_ids))
 387            });
 388
 389        match response_data {
 390            Ok((response, connection_ids, contact_user_ids)) => {
 391                broadcast(request.sender_id, connection_ids, |conn_id| {
 392                    self.peer.send(
 393                        conn_id,
 394                        proto::AddCollaborator {
 395                            worktree_id,
 396                            collaborator: Some(proto::Collaborator {
 397                                peer_id: request.sender_id.0,
 398                                replica_id: response.replica_id,
 399                                user_id: user_id.to_proto(),
 400                            }),
 401                        },
 402                    )
 403                })
 404                .await?;
 405                self.peer.respond(request.receipt(), response).await?;
 406                self.update_contacts_for_users(&contact_user_ids).await?;
 407            }
 408            Err(error) => {
 409                self.peer
 410                    .respond_with_error(
 411                        request.receipt(),
 412                        proto::Error {
 413                            message: error.to_string(),
 414                        },
 415                    )
 416                    .await?;
 417            }
 418        }
 419
 420        Ok(())
 421    }
 422
 423    async fn leave_worktree(
 424        mut self: Arc<Server>,
 425        request: TypedEnvelope<proto::LeaveWorktree>,
 426    ) -> tide::Result<()> {
 427        let sender_id = request.sender_id;
 428        let worktree_id = request.payload.worktree_id;
 429        let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
 430        if let Some(worktree) = worktree {
 431            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 432                self.peer.send(
 433                    conn_id,
 434                    proto::RemoveCollaborator {
 435                        worktree_id,
 436                        peer_id: sender_id.0,
 437                    },
 438                )
 439            })
 440            .await?;
 441            self.update_contacts_for_users(&worktree.authorized_user_ids)
 442                .await?;
 443        }
 444        Ok(())
 445    }
 446
 447    async fn update_worktree(
 448        mut self: Arc<Server>,
 449        request: TypedEnvelope<proto::UpdateWorktree>,
 450    ) -> tide::Result<()> {
 451        let connection_ids = self.state_mut().update_worktree(
 452            request.sender_id,
 453            request.payload.worktree_id,
 454            &request.payload.removed_entries,
 455            &request.payload.updated_entries,
 456        )?;
 457
 458        broadcast(request.sender_id, connection_ids, |connection_id| {
 459            self.peer
 460                .forward_send(request.sender_id, connection_id, request.payload.clone())
 461        })
 462        .await?;
 463
 464        Ok(())
 465    }
 466
 467    async fn open_buffer(
 468        self: Arc<Server>,
 469        request: TypedEnvelope<proto::OpenBuffer>,
 470    ) -> tide::Result<()> {
 471        let receipt = request.receipt();
 472        let host_connection_id = self
 473            .state()
 474            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 475        let response = self
 476            .peer
 477            .forward_request(request.sender_id, host_connection_id, request.payload)
 478            .await?;
 479        self.peer.respond(receipt, response).await?;
 480        Ok(())
 481    }
 482
 483    async fn close_buffer(
 484        self: Arc<Server>,
 485        request: TypedEnvelope<proto::CloseBuffer>,
 486    ) -> tide::Result<()> {
 487        let host_connection_id = self
 488            .state()
 489            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 490        self.peer
 491            .forward_send(request.sender_id, host_connection_id, request.payload)
 492            .await?;
 493        Ok(())
 494    }
 495
 496    async fn save_buffer(
 497        self: Arc<Server>,
 498        request: TypedEnvelope<proto::SaveBuffer>,
 499    ) -> tide::Result<()> {
 500        let host;
 501        let guests;
 502        {
 503            let state = self.state();
 504            host = state
 505                .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 506            guests = state
 507                .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
 508        }
 509
 510        let sender = request.sender_id;
 511        let receipt = request.receipt();
 512        let response = self
 513            .peer
 514            .forward_request(sender, host, request.payload.clone())
 515            .await?;
 516
 517        broadcast(host, guests, |conn_id| {
 518            let response = response.clone();
 519            let peer = &self.peer;
 520            async move {
 521                if conn_id == sender {
 522                    peer.respond(receipt, response).await
 523                } else {
 524                    peer.forward_send(host, conn_id, response).await
 525                }
 526            }
 527        })
 528        .await?;
 529
 530        Ok(())
 531    }
 532
 533    async fn update_buffer(
 534        self: Arc<Server>,
 535        request: TypedEnvelope<proto::UpdateBuffer>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = self
 538            .state()
 539            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 540        broadcast(request.sender_id, receiver_ids, |connection_id| {
 541            self.peer
 542                .forward_send(request.sender_id, connection_id, request.payload.clone())
 543        })
 544        .await?;
 545        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 546        Ok(())
 547    }
 548
 549    async fn buffer_saved(
 550        self: Arc<Server>,
 551        request: TypedEnvelope<proto::BufferSaved>,
 552    ) -> tide::Result<()> {
 553        let receiver_ids = self
 554            .state()
 555            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 556        broadcast(request.sender_id, receiver_ids, |connection_id| {
 557            self.peer
 558                .forward_send(request.sender_id, connection_id, request.payload.clone())
 559        })
 560        .await?;
 561        Ok(())
 562    }
 563
 564    async fn get_channels(
 565        self: Arc<Server>,
 566        request: TypedEnvelope<proto::GetChannels>,
 567    ) -> tide::Result<()> {
 568        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 569        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 570        self.peer
 571            .respond(
 572                request.receipt(),
 573                proto::GetChannelsResponse {
 574                    channels: channels
 575                        .into_iter()
 576                        .map(|chan| proto::Channel {
 577                            id: chan.id.to_proto(),
 578                            name: chan.name,
 579                        })
 580                        .collect(),
 581                },
 582            )
 583            .await?;
 584        Ok(())
 585    }
 586
 587    async fn get_users(
 588        self: Arc<Server>,
 589        request: TypedEnvelope<proto::GetUsers>,
 590    ) -> tide::Result<()> {
 591        let receipt = request.receipt();
 592        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 593        let users = self
 594            .app_state
 595            .db
 596            .get_users_by_ids(user_ids)
 597            .await?
 598            .into_iter()
 599            .map(|user| proto::User {
 600                id: user.id.to_proto(),
 601                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 602                github_login: user.github_login,
 603            })
 604            .collect();
 605        self.peer
 606            .respond(receipt, proto::GetUsersResponse { users })
 607            .await?;
 608        Ok(())
 609    }
 610
 611    async fn update_contacts_for_users<'a>(
 612        self: &Arc<Server>,
 613        user_ids: impl IntoIterator<Item = &'a UserId>,
 614    ) -> tide::Result<()> {
 615        let mut send_futures = Vec::new();
 616
 617        {
 618            let state = self.state();
 619            for user_id in user_ids {
 620                let contacts = state.contacts_for_user(*user_id);
 621                for connection_id in state.connection_ids_for_user(*user_id) {
 622                    send_futures.push(self.peer.send(
 623                        connection_id,
 624                        proto::UpdateContacts {
 625                            contacts: contacts.clone(),
 626                        },
 627                    ));
 628                }
 629            }
 630        }
 631        futures::future::try_join_all(send_futures).await?;
 632
 633        Ok(())
 634    }
 635
 636    async fn join_channel(
 637        mut self: Arc<Self>,
 638        request: TypedEnvelope<proto::JoinChannel>,
 639    ) -> tide::Result<()> {
 640        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 641        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 642        if !self
 643            .app_state
 644            .db
 645            .can_user_access_channel(user_id, channel_id)
 646            .await?
 647        {
 648            Err(anyhow!("access denied"))?;
 649        }
 650
 651        self.state_mut().join_channel(request.sender_id, channel_id);
 652        let messages = self
 653            .app_state
 654            .db
 655            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 656            .await?
 657            .into_iter()
 658            .map(|msg| proto::ChannelMessage {
 659                id: msg.id.to_proto(),
 660                body: msg.body,
 661                timestamp: msg.sent_at.unix_timestamp() as u64,
 662                sender_id: msg.sender_id.to_proto(),
 663                nonce: Some(msg.nonce.as_u128().into()),
 664            })
 665            .collect::<Vec<_>>();
 666        self.peer
 667            .respond(
 668                request.receipt(),
 669                proto::JoinChannelResponse {
 670                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 671                    messages,
 672                },
 673            )
 674            .await?;
 675        Ok(())
 676    }
 677
 678    async fn leave_channel(
 679        mut self: Arc<Self>,
 680        request: TypedEnvelope<proto::LeaveChannel>,
 681    ) -> tide::Result<()> {
 682        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 683        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 684        if !self
 685            .app_state
 686            .db
 687            .can_user_access_channel(user_id, channel_id)
 688            .await?
 689        {
 690            Err(anyhow!("access denied"))?;
 691        }
 692
 693        self.state_mut()
 694            .leave_channel(request.sender_id, channel_id);
 695
 696        Ok(())
 697    }
 698
 699    async fn send_channel_message(
 700        self: Arc<Self>,
 701        request: TypedEnvelope<proto::SendChannelMessage>,
 702    ) -> tide::Result<()> {
 703        let receipt = request.receipt();
 704        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 705        let user_id;
 706        let connection_ids;
 707        {
 708            let state = self.state();
 709            user_id = state.user_id_for_connection(request.sender_id)?;
 710            if let Some(ids) = state.channel_connection_ids(channel_id) {
 711                connection_ids = ids;
 712            } else {
 713                return Ok(());
 714            }
 715        }
 716
 717        // Validate the message body.
 718        let body = request.payload.body.trim().to_string();
 719        if body.len() > MAX_MESSAGE_LEN {
 720            self.peer
 721                .respond_with_error(
 722                    receipt,
 723                    proto::Error {
 724                        message: "message is too long".to_string(),
 725                    },
 726                )
 727                .await?;
 728            return Ok(());
 729        }
 730        if body.is_empty() {
 731            self.peer
 732                .respond_with_error(
 733                    receipt,
 734                    proto::Error {
 735                        message: "message can't be blank".to_string(),
 736                    },
 737                )
 738                .await?;
 739            return Ok(());
 740        }
 741
 742        let timestamp = OffsetDateTime::now_utc();
 743        let nonce = if let Some(nonce) = request.payload.nonce {
 744            nonce
 745        } else {
 746            self.peer
 747                .respond_with_error(
 748                    receipt,
 749                    proto::Error {
 750                        message: "nonce can't be blank".to_string(),
 751                    },
 752                )
 753                .await?;
 754            return Ok(());
 755        };
 756
 757        let message_id = self
 758            .app_state
 759            .db
 760            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 761            .await?
 762            .to_proto();
 763        let message = proto::ChannelMessage {
 764            sender_id: user_id.to_proto(),
 765            id: message_id,
 766            body,
 767            timestamp: timestamp.unix_timestamp() as u64,
 768            nonce: Some(nonce),
 769        };
 770        broadcast(request.sender_id, connection_ids, |conn_id| {
 771            self.peer.send(
 772                conn_id,
 773                proto::ChannelMessageSent {
 774                    channel_id: channel_id.to_proto(),
 775                    message: Some(message.clone()),
 776                },
 777            )
 778        })
 779        .await?;
 780        self.peer
 781            .respond(
 782                receipt,
 783                proto::SendChannelMessageResponse {
 784                    message: Some(message),
 785                },
 786            )
 787            .await?;
 788        Ok(())
 789    }
 790
 791    async fn get_channel_messages(
 792        self: Arc<Self>,
 793        request: TypedEnvelope<proto::GetChannelMessages>,
 794    ) -> tide::Result<()> {
 795        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 796        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 797        if !self
 798            .app_state
 799            .db
 800            .can_user_access_channel(user_id, channel_id)
 801            .await?
 802        {
 803            Err(anyhow!("access denied"))?;
 804        }
 805
 806        let messages = self
 807            .app_state
 808            .db
 809            .get_channel_messages(
 810                channel_id,
 811                MESSAGE_COUNT_PER_PAGE,
 812                Some(MessageId::from_proto(request.payload.before_message_id)),
 813            )
 814            .await?
 815            .into_iter()
 816            .map(|msg| proto::ChannelMessage {
 817                id: msg.id.to_proto(),
 818                body: msg.body,
 819                timestamp: msg.sent_at.unix_timestamp() as u64,
 820                sender_id: msg.sender_id.to_proto(),
 821                nonce: Some(msg.nonce.as_u128().into()),
 822            })
 823            .collect::<Vec<_>>();
 824        self.peer
 825            .respond(
 826                request.receipt(),
 827                proto::GetChannelMessagesResponse {
 828                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 829                    messages,
 830                },
 831            )
 832            .await?;
 833        Ok(())
 834    }
 835
 836    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 837        self.store.read()
 838    }
 839
 840    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 841        self.store.write()
 842    }
 843}
 844
 845pub async fn broadcast<F, T>(
 846    sender_id: ConnectionId,
 847    receiver_ids: Vec<ConnectionId>,
 848    mut f: F,
 849) -> anyhow::Result<()>
 850where
 851    F: FnMut(ConnectionId) -> T,
 852    T: Future<Output = anyhow::Result<()>>,
 853{
 854    let futures = receiver_ids
 855        .into_iter()
 856        .filter(|id| *id != sender_id)
 857        .map(|id| f(id));
 858    futures::future::try_join_all(futures).await?;
 859    Ok(())
 860}
 861
 862pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 863    let server = Server::new(app.state().clone(), rpc.clone(), None);
 864    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 865        let server = server.clone();
 866        async move {
 867            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 868
 869            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 870            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 871            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 872            let client_protocol_version: Option<u32> = request
 873                .header("X-Zed-Protocol-Version")
 874                .and_then(|v| v.as_str().parse().ok());
 875
 876            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 877                return Ok(Response::new(StatusCode::UpgradeRequired));
 878            }
 879
 880            let header = match request.header("Sec-Websocket-Key") {
 881                Some(h) => h.as_str(),
 882                None => return Err(anyhow!("expected sec-websocket-key"))?,
 883            };
 884
 885            let user_id = process_auth_header(&request).await?;
 886
 887            let mut response = Response::new(StatusCode::SwitchingProtocols);
 888            response.insert_header(UPGRADE, "websocket");
 889            response.insert_header(CONNECTION, "Upgrade");
 890            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 891            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 892            response.insert_header("Sec-Websocket-Version", "13");
 893
 894            let http_res: &mut tide::http::Response = response.as_mut();
 895            let upgrade_receiver = http_res.recv_upgrade().await;
 896            let addr = request.remote().unwrap_or("unknown").to_string();
 897            task::spawn(async move {
 898                if let Some(stream) = upgrade_receiver.await {
 899                    server
 900                        .handle_connection(
 901                            Connection::new(
 902                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 903                            ),
 904                            addr,
 905                            user_id,
 906                            None,
 907                        )
 908                        .await;
 909                }
 910            });
 911
 912            Ok(response)
 913        }
 914    });
 915}
 916
 917fn header_contains_ignore_case<T>(
 918    request: &tide::Request<T>,
 919    header_name: HeaderName,
 920    value: &str,
 921) -> bool {
 922    request
 923        .header(header_name)
 924        .map(|h| {
 925            h.as_str()
 926                .split(',')
 927                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 928        })
 929        .unwrap_or(false)
 930}
 931
 932#[cfg(test)]
 933mod tests {
 934    use super::*;
 935    use crate::{
 936        auth,
 937        db::{tests::TestDb, UserId},
 938        github, AppState, Config,
 939    };
 940    use ::rpc::Peer;
 941    use async_std::task;
 942    use gpui::{ModelHandle, TestAppContext};
 943    use parking_lot::Mutex;
 944    use postage::{mpsc, watch};
 945    use rpc::PeerId;
 946    use serde_json::json;
 947    use sqlx::types::time::OffsetDateTime;
 948    use std::{
 949        ops::Deref,
 950        path::Path,
 951        sync::{
 952            atomic::{AtomicBool, Ordering::SeqCst},
 953            Arc,
 954        },
 955        time::Duration,
 956    };
 957    use zed::{
 958        client::{
 959            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
 960            EstablishConnectionError, UserStore,
 961        },
 962        contacts_panel::JoinWorktree,
 963        editor::{Editor, EditorSettings, Input, MultiBuffer},
 964        fs::{FakeFs, Fs as _},
 965        language::{
 966            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
 967            LanguageRegistry, LanguageServerConfig, Point,
 968        },
 969        lsp,
 970        project::{ProjectPath, Worktree},
 971        test::test_app_state,
 972        workspace::Workspace,
 973    };
 974
 975    #[gpui::test]
 976    async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
 977        let (window_b, _) = cx_b.add_window(|_| EmptyView);
 978        let lang_registry = Arc::new(LanguageRegistry::new());
 979
 980        // Connect to a server as 2 clients.
 981        let mut server = TestServer::start().await;
 982        let client_a = server.create_client(&mut cx_a, "user_a").await;
 983        let client_b = server.create_client(&mut cx_b, "user_b").await;
 984
 985        cx_a.foreground().forbid_parking();
 986
 987        // Share a local worktree as client A
 988        let fs = Arc::new(FakeFs::new());
 989        fs.insert_tree(
 990            "/a",
 991            json!({
 992                ".zed.toml": r#"collaborators = ["user_b"]"#,
 993                "a.txt": "a-contents",
 994                "b.txt": "b-contents",
 995            }),
 996        )
 997        .await;
 998        let worktree_a = Worktree::open_local(
 999            client_a.clone(),
1000            client_a.user_store.clone(),
1001            "/a".as_ref(),
1002            fs,
1003            lang_registry.clone(),
1004            &mut cx_a.to_async(),
1005        )
1006        .await
1007        .unwrap();
1008        worktree_a
1009            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1010            .await;
1011        let worktree_id = worktree_a
1012            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1013            .await
1014            .unwrap();
1015
1016        // Join that worktree as client B, and see that a guest has joined as client A.
1017        let worktree_b = Worktree::open_remote(
1018            client_b.clone(),
1019            worktree_id,
1020            lang_registry.clone(),
1021            client_b.user_store.clone(),
1022            &mut cx_b.to_async(),
1023        )
1024        .await
1025        .unwrap();
1026
1027        let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| {
1028            assert_eq!(
1029                tree.collaborators()
1030                    .get(&client_a.peer_id)
1031                    .unwrap()
1032                    .user
1033                    .github_login,
1034                "user_a"
1035            );
1036            tree.replica_id()
1037        });
1038        worktree_a
1039            .condition(&cx_a, |tree, _| {
1040                tree.collaborators()
1041                    .get(&client_b.peer_id)
1042                    .map_or(false, |collaborator| {
1043                        collaborator.replica_id == replica_id_b
1044                            && collaborator.user.github_login == "user_b"
1045                    })
1046            })
1047            .await;
1048
1049        // Open the same file as client B and client A.
1050        let buffer_b = worktree_b
1051            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1052            .await
1053            .unwrap();
1054        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1055        buffer_b.read_with(&cx_b, |buf, cx| {
1056            assert_eq!(buf.read(cx).text(), "b-contents")
1057        });
1058        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1059        let buffer_a = worktree_a
1060            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1061            .await
1062            .unwrap();
1063
1064        let editor_b = cx_b.add_view(window_b, |cx| {
1065            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1066        });
1067        // TODO
1068        // // Create a selection set as client B and see that selection set as client A.
1069        // buffer_a
1070        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1071        //     .await;
1072
1073        // Edit the buffer as client B and see that edit as client A.
1074        editor_b.update(&mut cx_b, |editor, cx| {
1075            editor.handle_input(&Input("ok, ".into()), cx)
1076        });
1077        buffer_a
1078            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1079            .await;
1080
1081        // TODO
1082        // // Remove the selection set as client B, see those selections disappear as client A.
1083        cx_b.update(move |_| drop(editor_b));
1084        // buffer_a
1085        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1086        //     .await;
1087
1088        // Close the buffer as client A, see that the buffer is closed.
1089        cx_a.update(move |_| drop(buffer_a));
1090        worktree_a
1091            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1092            .await;
1093
1094        // Dropping the worktree removes client B from client A's collaborators.
1095        cx_b.update(move |_| drop(worktree_b));
1096        worktree_a
1097            .condition(&cx_a, |tree, _| tree.collaborators().is_empty())
1098            .await;
1099    }
1100
1101    #[gpui::test]
1102    async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1103        cx_b.update(zed::contacts_panel::init);
1104        let mut app_state_a = cx_a.update(test_app_state);
1105        let mut app_state_b = cx_b.update(test_app_state);
1106
1107        // Connect to a server as 2 clients.
1108        let mut server = TestServer::start().await;
1109        let client_a = server.create_client(&mut cx_a, "user_a").await;
1110        let client_b = server.create_client(&mut cx_b, "user_b").await;
1111        Arc::get_mut(&mut app_state_a).unwrap().client = client_a.clone();
1112        Arc::get_mut(&mut app_state_a).unwrap().user_store = client_a.user_store.clone();
1113        Arc::get_mut(&mut app_state_b).unwrap().client = client_b.clone();
1114        Arc::get_mut(&mut app_state_b).unwrap().user_store = client_b.user_store.clone();
1115
1116        cx_a.foreground().forbid_parking();
1117
1118        // Share a local worktree as client A
1119        let fs = Arc::new(FakeFs::new());
1120        fs.insert_tree(
1121            "/a",
1122            json!({
1123                ".zed.toml": r#"collaborators = ["user_b"]"#,
1124                "a.txt": "a-contents",
1125                "b.txt": "b-contents",
1126            }),
1127        )
1128        .await;
1129        let worktree_a = Worktree::open_local(
1130            app_state_a.client.clone(),
1131            app_state_a.user_store.clone(),
1132            "/a".as_ref(),
1133            fs,
1134            app_state_a.languages.clone(),
1135            &mut cx_a.to_async(),
1136        )
1137        .await
1138        .unwrap();
1139        worktree_a
1140            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1141            .await;
1142
1143        let remote_worktree_id = worktree_a
1144            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1145            .await
1146            .unwrap();
1147
1148        let (window_b, workspace_b) =
1149            cx_b.add_window(|cx| Workspace::new(&app_state_b.as_ref().into(), cx));
1150        cx_b.update(|cx| {
1151            cx.dispatch_action(
1152                window_b,
1153                vec![workspace_b.id()],
1154                &JoinWorktree(remote_worktree_id),
1155            );
1156        });
1157        workspace_b
1158            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1159            .await;
1160
1161        let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1162            let active_pane = workspace.active_pane().read(cx);
1163            assert!(active_pane.active_item().is_none());
1164            workspace.worktrees(cx).first().unwrap().id()
1165        });
1166        workspace_b
1167            .update(&mut cx_b, |workspace, cx| {
1168                workspace.open_entry(
1169                    ProjectPath {
1170                        worktree_id: local_worktree_id_b,
1171                        path: Path::new("a.txt").into(),
1172                    },
1173                    cx,
1174                )
1175            })
1176            .unwrap()
1177            .await
1178            .unwrap();
1179        workspace_b.read_with(&cx_b, |workspace, cx| {
1180            let active_pane = workspace.active_pane().read(cx);
1181            assert!(active_pane.active_item().is_some());
1182        });
1183
1184        worktree_a.update(&mut cx_a, |tree, cx| {
1185            tree.as_local_mut().unwrap().unshare(cx);
1186        });
1187        workspace_b
1188            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1189            .await;
1190        workspace_b.read_with(&cx_b, |workspace, cx| {
1191            let active_pane = workspace.active_pane().read(cx);
1192            assert!(active_pane.active_item().is_none());
1193        });
1194    }
1195
1196    #[gpui::test]
1197    async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1198        mut cx_a: TestAppContext,
1199        mut cx_b: TestAppContext,
1200        mut cx_c: TestAppContext,
1201    ) {
1202        cx_a.foreground().forbid_parking();
1203        let lang_registry = Arc::new(LanguageRegistry::new());
1204
1205        // Connect to a server as 3 clients.
1206        let mut server = TestServer::start().await;
1207        let client_a = server.create_client(&mut cx_a, "user_a").await;
1208        let client_b = server.create_client(&mut cx_b, "user_b").await;
1209        let client_c = server.create_client(&mut cx_c, "user_c").await;
1210
1211        let fs = Arc::new(FakeFs::new());
1212
1213        // Share a worktree as client A.
1214        fs.insert_tree(
1215            "/a",
1216            json!({
1217                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1218                "file1": "",
1219                "file2": ""
1220            }),
1221        )
1222        .await;
1223
1224        let worktree_a = Worktree::open_local(
1225            client_a.clone(),
1226            client_a.user_store.clone(),
1227            "/a".as_ref(),
1228            fs.clone(),
1229            lang_registry.clone(),
1230            &mut cx_a.to_async(),
1231        )
1232        .await
1233        .unwrap();
1234        worktree_a
1235            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1236            .await;
1237        let worktree_id = worktree_a
1238            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1239            .await
1240            .unwrap();
1241
1242        // Join that worktree as clients B and C.
1243        let worktree_b = Worktree::open_remote(
1244            client_b.clone(),
1245            worktree_id,
1246            lang_registry.clone(),
1247            client_b.user_store.clone(),
1248            &mut cx_b.to_async(),
1249        )
1250        .await
1251        .unwrap();
1252        let worktree_c = Worktree::open_remote(
1253            client_c.clone(),
1254            worktree_id,
1255            lang_registry.clone(),
1256            client_c.user_store.clone(),
1257            &mut cx_c.to_async(),
1258        )
1259        .await
1260        .unwrap();
1261
1262        // Open and edit a buffer as both guests B and C.
1263        let buffer_b = worktree_b
1264            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1265            .await
1266            .unwrap();
1267        let buffer_c = worktree_c
1268            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1269            .await
1270            .unwrap();
1271        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1272        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1273
1274        // Open and edit that buffer as the host.
1275        let buffer_a = worktree_a
1276            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1277            .await
1278            .unwrap();
1279
1280        buffer_a
1281            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1282            .await;
1283        buffer_a.update(&mut cx_a, |buf, cx| {
1284            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1285        });
1286
1287        // Wait for edits to propagate
1288        buffer_a
1289            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1290            .await;
1291        buffer_b
1292            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1293            .await;
1294        buffer_c
1295            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1296            .await;
1297
1298        // Edit the buffer as the host and concurrently save as guest B.
1299        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1300        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1301        save_b.await.unwrap();
1302        assert_eq!(
1303            fs.load("/a/file1".as_ref()).await.unwrap(),
1304            "hi-a, i-am-c, i-am-b, i-am-a"
1305        );
1306        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1307        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1308        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1309
1310        // Make changes on host's file system, see those changes on the guests.
1311        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1312            .await
1313            .unwrap();
1314        fs.insert_file(Path::new("/a/file4"), "4".into())
1315            .await
1316            .unwrap();
1317
1318        worktree_b
1319            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1320            .await;
1321        worktree_c
1322            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1323            .await;
1324        worktree_b.read_with(&cx_b, |tree, _| {
1325            assert_eq!(
1326                tree.paths()
1327                    .map(|p| p.to_string_lossy())
1328                    .collect::<Vec<_>>(),
1329                &[".zed.toml", "file1", "file3", "file4"]
1330            )
1331        });
1332        worktree_c.read_with(&cx_c, |tree, _| {
1333            assert_eq!(
1334                tree.paths()
1335                    .map(|p| p.to_string_lossy())
1336                    .collect::<Vec<_>>(),
1337                &[".zed.toml", "file1", "file3", "file4"]
1338            )
1339        });
1340    }
1341
1342    #[gpui::test]
1343    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1344        cx_a.foreground().forbid_parking();
1345        let lang_registry = Arc::new(LanguageRegistry::new());
1346
1347        // Connect to a server as 2 clients.
1348        let mut server = TestServer::start().await;
1349        let client_a = server.create_client(&mut cx_a, "user_a").await;
1350        let client_b = server.create_client(&mut cx_b, "user_b").await;
1351
1352        // Share a local worktree as client A
1353        let fs = Arc::new(FakeFs::new());
1354        fs.insert_tree(
1355            "/dir",
1356            json!({
1357                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1358                "a.txt": "a-contents",
1359            }),
1360        )
1361        .await;
1362
1363        let worktree_a = Worktree::open_local(
1364            client_a.clone(),
1365            client_a.user_store.clone(),
1366            "/dir".as_ref(),
1367            fs,
1368            lang_registry.clone(),
1369            &mut cx_a.to_async(),
1370        )
1371        .await
1372        .unwrap();
1373        worktree_a
1374            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1375            .await;
1376        let worktree_id = worktree_a
1377            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1378            .await
1379            .unwrap();
1380
1381        // Join that worktree as client B, and see that a guest has joined as client A.
1382        let worktree_b = Worktree::open_remote(
1383            client_b.clone(),
1384            worktree_id,
1385            lang_registry.clone(),
1386            client_b.user_store.clone(),
1387            &mut cx_b.to_async(),
1388        )
1389        .await
1390        .unwrap();
1391
1392        let buffer_b = worktree_b
1393            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1394            .await
1395            .unwrap();
1396        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1397
1398        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1399        buffer_b.read_with(&cx_b, |buf, _| {
1400            assert!(buf.is_dirty());
1401            assert!(!buf.has_conflict());
1402        });
1403
1404        buffer_b
1405            .update(&mut cx_b, |buf, cx| buf.save(cx))
1406            .unwrap()
1407            .await
1408            .unwrap();
1409        worktree_b
1410            .condition(&cx_b, |_, cx| {
1411                buffer_b.read(cx).file().unwrap().mtime() != mtime
1412            })
1413            .await;
1414        buffer_b.read_with(&cx_b, |buf, _| {
1415            assert!(!buf.is_dirty());
1416            assert!(!buf.has_conflict());
1417        });
1418
1419        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1420        buffer_b.read_with(&cx_b, |buf, _| {
1421            assert!(buf.is_dirty());
1422            assert!(!buf.has_conflict());
1423        });
1424    }
1425
1426    #[gpui::test]
1427    async fn test_editing_while_guest_opens_buffer(
1428        mut cx_a: TestAppContext,
1429        mut cx_b: TestAppContext,
1430    ) {
1431        cx_a.foreground().forbid_parking();
1432        let lang_registry = Arc::new(LanguageRegistry::new());
1433
1434        // Connect to a server as 2 clients.
1435        let mut server = TestServer::start().await;
1436        let client_a = server.create_client(&mut cx_a, "user_a").await;
1437        let client_b = server.create_client(&mut cx_b, "user_b").await;
1438
1439        // Share a local worktree as client A
1440        let fs = Arc::new(FakeFs::new());
1441        fs.insert_tree(
1442            "/dir",
1443            json!({
1444                ".zed.toml": r#"collaborators = ["user_b"]"#,
1445                "a.txt": "a-contents",
1446            }),
1447        )
1448        .await;
1449        let worktree_a = Worktree::open_local(
1450            client_a.clone(),
1451            client_a.user_store.clone(),
1452            "/dir".as_ref(),
1453            fs,
1454            lang_registry.clone(),
1455            &mut cx_a.to_async(),
1456        )
1457        .await
1458        .unwrap();
1459        worktree_a
1460            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1461            .await;
1462        let worktree_id = worktree_a
1463            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1464            .await
1465            .unwrap();
1466
1467        // Join that worktree as client B, and see that a guest has joined as client A.
1468        let worktree_b = Worktree::open_remote(
1469            client_b.clone(),
1470            worktree_id,
1471            lang_registry.clone(),
1472            client_b.user_store.clone(),
1473            &mut cx_b.to_async(),
1474        )
1475        .await
1476        .unwrap();
1477
1478        let buffer_a = worktree_a
1479            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1480            .await
1481            .unwrap();
1482        let buffer_b = cx_b
1483            .background()
1484            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1485
1486        task::yield_now().await;
1487        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1488
1489        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1490        let buffer_b = buffer_b.await.unwrap();
1491        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1492    }
1493
1494    #[gpui::test]
1495    async fn test_leaving_worktree_while_opening_buffer(
1496        mut cx_a: TestAppContext,
1497        mut cx_b: TestAppContext,
1498    ) {
1499        cx_a.foreground().forbid_parking();
1500        let lang_registry = Arc::new(LanguageRegistry::new());
1501
1502        // Connect to a server as 2 clients.
1503        let mut server = TestServer::start().await;
1504        let client_a = server.create_client(&mut cx_a, "user_a").await;
1505        let client_b = server.create_client(&mut cx_b, "user_b").await;
1506
1507        // Share a local worktree as client A
1508        let fs = Arc::new(FakeFs::new());
1509        fs.insert_tree(
1510            "/dir",
1511            json!({
1512                ".zed.toml": r#"collaborators = ["user_b"]"#,
1513                "a.txt": "a-contents",
1514            }),
1515        )
1516        .await;
1517        let worktree_a = Worktree::open_local(
1518            client_a.clone(),
1519            client_a.user_store.clone(),
1520            "/dir".as_ref(),
1521            fs,
1522            lang_registry.clone(),
1523            &mut cx_a.to_async(),
1524        )
1525        .await
1526        .unwrap();
1527        worktree_a
1528            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529            .await;
1530        let worktree_id = worktree_a
1531            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1532            .await
1533            .unwrap();
1534
1535        // Join that worktree as client B, and see that a guest has joined as client A.
1536        let worktree_b = Worktree::open_remote(
1537            client_b.clone(),
1538            worktree_id,
1539            lang_registry.clone(),
1540            client_b.user_store.clone(),
1541            &mut cx_b.to_async(),
1542        )
1543        .await
1544        .unwrap();
1545        worktree_a
1546            .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1547            .await;
1548
1549        let buffer_b = cx_b
1550            .background()
1551            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1552        cx_b.update(|_| drop(worktree_b));
1553        drop(buffer_b);
1554        worktree_a
1555            .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1556            .await;
1557    }
1558
1559    #[gpui::test]
1560    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1561        cx_a.foreground().forbid_parking();
1562        let lang_registry = Arc::new(LanguageRegistry::new());
1563
1564        // Connect to a server as 2 clients.
1565        let mut server = TestServer::start().await;
1566        let client_a = server.create_client(&mut cx_a, "user_a").await;
1567        let client_b = server.create_client(&mut cx_b, "user_b").await;
1568
1569        // Share a local worktree as client A
1570        let fs = Arc::new(FakeFs::new());
1571        fs.insert_tree(
1572            "/a",
1573            json!({
1574                ".zed.toml": r#"collaborators = ["user_b"]"#,
1575                "a.txt": "a-contents",
1576                "b.txt": "b-contents",
1577            }),
1578        )
1579        .await;
1580        let worktree_a = Worktree::open_local(
1581            client_a.clone(),
1582            client_a.user_store.clone(),
1583            "/a".as_ref(),
1584            fs,
1585            lang_registry.clone(),
1586            &mut cx_a.to_async(),
1587        )
1588        .await
1589        .unwrap();
1590        worktree_a
1591            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1592            .await;
1593        let worktree_id = worktree_a
1594            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1595            .await
1596            .unwrap();
1597
1598        // Join that worktree as client B, and see that a guest has joined as client A.
1599        let _worktree_b = Worktree::open_remote(
1600            client_b.clone(),
1601            worktree_id,
1602            lang_registry.clone(),
1603            client_b.user_store.clone(),
1604            &mut cx_b.to_async(),
1605        )
1606        .await
1607        .unwrap();
1608        worktree_a
1609            .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1610            .await;
1611
1612        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1613        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1614        worktree_a
1615            .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1616            .await;
1617    }
1618
1619    #[gpui::test]
1620    async fn test_collaborating_with_diagnostics(
1621        mut cx_a: TestAppContext,
1622        mut cx_b: TestAppContext,
1623    ) {
1624        cx_a.foreground().forbid_parking();
1625        let (language_server_config, mut fake_language_server) =
1626            LanguageServerConfig::fake(cx_a.background()).await;
1627        let mut lang_registry = LanguageRegistry::new();
1628        lang_registry.add(Arc::new(Language::new(
1629            LanguageConfig {
1630                name: "Rust".to_string(),
1631                path_suffixes: vec!["rs".to_string()],
1632                language_server: Some(language_server_config),
1633                ..Default::default()
1634            },
1635            Some(tree_sitter_rust::language()),
1636        )));
1637
1638        let lang_registry = Arc::new(lang_registry);
1639
1640        // Connect to a server as 2 clients.
1641        let mut server = TestServer::start().await;
1642        let client_a = server.create_client(&mut cx_a, "user_a").await;
1643        let client_b = server.create_client(&mut cx_b, "user_b").await;
1644
1645        // Share a local worktree as client A
1646        let fs = Arc::new(FakeFs::new());
1647        fs.insert_tree(
1648            "/a",
1649            json!({
1650                ".zed.toml": r#"collaborators = ["user_b"]"#,
1651                "a.rs": "let one = two",
1652                "other.rs": "",
1653            }),
1654        )
1655        .await;
1656        let worktree_a = Worktree::open_local(
1657            client_a.clone(),
1658            client_a.user_store.clone(),
1659            "/a".as_ref(),
1660            fs,
1661            lang_registry.clone(),
1662            &mut cx_a.to_async(),
1663        )
1664        .await
1665        .unwrap();
1666        worktree_a
1667            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1668            .await;
1669        let worktree_id = worktree_a
1670            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1671            .await
1672            .unwrap();
1673
1674        // Cause language server to start.
1675        let _ = cx_a
1676            .background()
1677            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1678                worktree.open_buffer("other.rs", cx)
1679            }))
1680            .await
1681            .unwrap();
1682
1683        // Simulate a language server reporting errors for a file.
1684        fake_language_server
1685            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1686                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1687                version: None,
1688                diagnostics: vec![
1689                    lsp::Diagnostic {
1690                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1691                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1692                        message: "message 1".to_string(),
1693                        ..Default::default()
1694                    },
1695                    lsp::Diagnostic {
1696                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1697                        range: lsp::Range::new(
1698                            lsp::Position::new(0, 10),
1699                            lsp::Position::new(0, 13),
1700                        ),
1701                        message: "message 2".to_string(),
1702                        ..Default::default()
1703                    },
1704                ],
1705            })
1706            .await;
1707
1708        // Join the worktree as client B.
1709        let worktree_b = Worktree::open_remote(
1710            client_b.clone(),
1711            worktree_id,
1712            lang_registry.clone(),
1713            client_b.user_store.clone(),
1714            &mut cx_b.to_async(),
1715        )
1716        .await
1717        .unwrap();
1718
1719        // Open the file with the errors.
1720        let buffer_b = cx_b
1721            .background()
1722            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1723            .await
1724            .unwrap();
1725
1726        buffer_b.read_with(&cx_b, |buffer, _| {
1727            assert_eq!(
1728                buffer
1729                    .snapshot()
1730                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1731                    .collect::<Vec<_>>(),
1732                &[
1733                    DiagnosticEntry {
1734                        range: Point::new(0, 4)..Point::new(0, 7),
1735                        diagnostic: Diagnostic {
1736                            group_id: 0,
1737                            message: "message 1".to_string(),
1738                            severity: lsp::DiagnosticSeverity::ERROR,
1739                            is_primary: true,
1740                            ..Default::default()
1741                        }
1742                    },
1743                    DiagnosticEntry {
1744                        range: Point::new(0, 10)..Point::new(0, 13),
1745                        diagnostic: Diagnostic {
1746                            group_id: 1,
1747                            severity: lsp::DiagnosticSeverity::WARNING,
1748                            message: "message 2".to_string(),
1749                            is_primary: true,
1750                            ..Default::default()
1751                        }
1752                    }
1753                ]
1754            );
1755        });
1756    }
1757
1758    #[gpui::test]
1759    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1760        cx_a.foreground().forbid_parking();
1761
1762        // Connect to a server as 2 clients.
1763        let mut server = TestServer::start().await;
1764        let client_a = server.create_client(&mut cx_a, "user_a").await;
1765        let client_b = server.create_client(&mut cx_b, "user_b").await;
1766
1767        // Create an org that includes these 2 users.
1768        let db = &server.app_state.db;
1769        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1770        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1771            .await
1772            .unwrap();
1773        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1774            .await
1775            .unwrap();
1776
1777        // Create a channel that includes all the users.
1778        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1779        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1780            .await
1781            .unwrap();
1782        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1783            .await
1784            .unwrap();
1785        db.create_channel_message(
1786            channel_id,
1787            client_b.current_user_id(&cx_b),
1788            "hello A, it's B.",
1789            OffsetDateTime::now_utc(),
1790            1,
1791        )
1792        .await
1793        .unwrap();
1794
1795        let channels_a = cx_a
1796            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1797        channels_a
1798            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1799            .await;
1800        channels_a.read_with(&cx_a, |list, _| {
1801            assert_eq!(
1802                list.available_channels().unwrap(),
1803                &[ChannelDetails {
1804                    id: channel_id.to_proto(),
1805                    name: "test-channel".to_string()
1806                }]
1807            )
1808        });
1809        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1810            this.get_channel(channel_id.to_proto(), cx).unwrap()
1811        });
1812        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1813        channel_a
1814            .condition(&cx_a, |channel, _| {
1815                channel_messages(channel)
1816                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1817            })
1818            .await;
1819
1820        let channels_b = cx_b
1821            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1822        channels_b
1823            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1824            .await;
1825        channels_b.read_with(&cx_b, |list, _| {
1826            assert_eq!(
1827                list.available_channels().unwrap(),
1828                &[ChannelDetails {
1829                    id: channel_id.to_proto(),
1830                    name: "test-channel".to_string()
1831                }]
1832            )
1833        });
1834
1835        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1836            this.get_channel(channel_id.to_proto(), cx).unwrap()
1837        });
1838        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1839        channel_b
1840            .condition(&cx_b, |channel, _| {
1841                channel_messages(channel)
1842                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1843            })
1844            .await;
1845
1846        channel_a
1847            .update(&mut cx_a, |channel, cx| {
1848                channel
1849                    .send_message("oh, hi B.".to_string(), cx)
1850                    .unwrap()
1851                    .detach();
1852                let task = channel.send_message("sup".to_string(), cx).unwrap();
1853                assert_eq!(
1854                    channel_messages(channel),
1855                    &[
1856                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1857                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1858                        ("user_a".to_string(), "sup".to_string(), true)
1859                    ]
1860                );
1861                task
1862            })
1863            .await
1864            .unwrap();
1865
1866        channel_b
1867            .condition(&cx_b, |channel, _| {
1868                channel_messages(channel)
1869                    == [
1870                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1871                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1872                        ("user_a".to_string(), "sup".to_string(), false),
1873                    ]
1874            })
1875            .await;
1876
1877        assert_eq!(
1878            server
1879                .state()
1880                .await
1881                .channel(channel_id)
1882                .unwrap()
1883                .connection_ids
1884                .len(),
1885            2
1886        );
1887        cx_b.update(|_| drop(channel_b));
1888        server
1889            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1890            .await;
1891
1892        cx_a.update(|_| drop(channel_a));
1893        server
1894            .condition(|state| state.channel(channel_id).is_none())
1895            .await;
1896    }
1897
1898    #[gpui::test]
1899    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1900        cx_a.foreground().forbid_parking();
1901
1902        let mut server = TestServer::start().await;
1903        let client_a = server.create_client(&mut cx_a, "user_a").await;
1904
1905        let db = &server.app_state.db;
1906        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1907        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1908        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1909            .await
1910            .unwrap();
1911        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1912            .await
1913            .unwrap();
1914
1915        let channels_a = cx_a
1916            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1917        channels_a
1918            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1919            .await;
1920        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1921            this.get_channel(channel_id.to_proto(), cx).unwrap()
1922        });
1923
1924        // Messages aren't allowed to be too long.
1925        channel_a
1926            .update(&mut cx_a, |channel, cx| {
1927                let long_body = "this is long.\n".repeat(1024);
1928                channel.send_message(long_body, cx).unwrap()
1929            })
1930            .await
1931            .unwrap_err();
1932
1933        // Messages aren't allowed to be blank.
1934        channel_a.update(&mut cx_a, |channel, cx| {
1935            channel.send_message(String::new(), cx).unwrap_err()
1936        });
1937
1938        // Leading and trailing whitespace are trimmed.
1939        channel_a
1940            .update(&mut cx_a, |channel, cx| {
1941                channel
1942                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
1943                    .unwrap()
1944            })
1945            .await
1946            .unwrap();
1947        assert_eq!(
1948            db.get_channel_messages(channel_id, 10, None)
1949                .await
1950                .unwrap()
1951                .iter()
1952                .map(|m| &m.body)
1953                .collect::<Vec<_>>(),
1954            &["surrounded by whitespace"]
1955        );
1956    }
1957
1958    #[gpui::test]
1959    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1960        cx_a.foreground().forbid_parking();
1961
1962        // Connect to a server as 2 clients.
1963        let mut server = TestServer::start().await;
1964        let client_a = server.create_client(&mut cx_a, "user_a").await;
1965        let client_b = server.create_client(&mut cx_b, "user_b").await;
1966        let mut status_b = client_b.status();
1967
1968        // Create an org that includes these 2 users.
1969        let db = &server.app_state.db;
1970        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1971        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1972            .await
1973            .unwrap();
1974        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1975            .await
1976            .unwrap();
1977
1978        // Create a channel that includes all the users.
1979        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1980        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1981            .await
1982            .unwrap();
1983        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1984            .await
1985            .unwrap();
1986        db.create_channel_message(
1987            channel_id,
1988            client_b.current_user_id(&cx_b),
1989            "hello A, it's B.",
1990            OffsetDateTime::now_utc(),
1991            2,
1992        )
1993        .await
1994        .unwrap();
1995
1996        let channels_a = cx_a
1997            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1998        channels_a
1999            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2000            .await;
2001
2002        channels_a.read_with(&cx_a, |list, _| {
2003            assert_eq!(
2004                list.available_channels().unwrap(),
2005                &[ChannelDetails {
2006                    id: channel_id.to_proto(),
2007                    name: "test-channel".to_string()
2008                }]
2009            )
2010        });
2011        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2012            this.get_channel(channel_id.to_proto(), cx).unwrap()
2013        });
2014        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2015        channel_a
2016            .condition(&cx_a, |channel, _| {
2017                channel_messages(channel)
2018                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2019            })
2020            .await;
2021
2022        let channels_b = cx_b
2023            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2024        channels_b
2025            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2026            .await;
2027        channels_b.read_with(&cx_b, |list, _| {
2028            assert_eq!(
2029                list.available_channels().unwrap(),
2030                &[ChannelDetails {
2031                    id: channel_id.to_proto(),
2032                    name: "test-channel".to_string()
2033                }]
2034            )
2035        });
2036
2037        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2038            this.get_channel(channel_id.to_proto(), cx).unwrap()
2039        });
2040        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2041        channel_b
2042            .condition(&cx_b, |channel, _| {
2043                channel_messages(channel)
2044                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2045            })
2046            .await;
2047
2048        // Disconnect client B, ensuring we can still access its cached channel data.
2049        server.forbid_connections();
2050        server.disconnect_client(client_b.current_user_id(&cx_b));
2051        while !matches!(
2052            status_b.recv().await,
2053            Some(client::Status::ReconnectionError { .. })
2054        ) {}
2055
2056        channels_b.read_with(&cx_b, |channels, _| {
2057            assert_eq!(
2058                channels.available_channels().unwrap(),
2059                [ChannelDetails {
2060                    id: channel_id.to_proto(),
2061                    name: "test-channel".to_string()
2062                }]
2063            )
2064        });
2065        channel_b.read_with(&cx_b, |channel, _| {
2066            assert_eq!(
2067                channel_messages(channel),
2068                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2069            )
2070        });
2071
2072        // Send a message from client B while it is disconnected.
2073        channel_b
2074            .update(&mut cx_b, |channel, cx| {
2075                let task = channel
2076                    .send_message("can you see this?".to_string(), cx)
2077                    .unwrap();
2078                assert_eq!(
2079                    channel_messages(channel),
2080                    &[
2081                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2082                        ("user_b".to_string(), "can you see this?".to_string(), true)
2083                    ]
2084                );
2085                task
2086            })
2087            .await
2088            .unwrap_err();
2089
2090        // Send a message from client A while B is disconnected.
2091        channel_a
2092            .update(&mut cx_a, |channel, cx| {
2093                channel
2094                    .send_message("oh, hi B.".to_string(), cx)
2095                    .unwrap()
2096                    .detach();
2097                let task = channel.send_message("sup".to_string(), cx).unwrap();
2098                assert_eq!(
2099                    channel_messages(channel),
2100                    &[
2101                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2102                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2103                        ("user_a".to_string(), "sup".to_string(), true)
2104                    ]
2105                );
2106                task
2107            })
2108            .await
2109            .unwrap();
2110
2111        // Give client B a chance to reconnect.
2112        server.allow_connections();
2113        cx_b.foreground().advance_clock(Duration::from_secs(10));
2114
2115        // Verify that B sees the new messages upon reconnection, as well as the message client B
2116        // sent while offline.
2117        channel_b
2118            .condition(&cx_b, |channel, _| {
2119                channel_messages(channel)
2120                    == [
2121                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2122                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2123                        ("user_a".to_string(), "sup".to_string(), false),
2124                        ("user_b".to_string(), "can you see this?".to_string(), false),
2125                    ]
2126            })
2127            .await;
2128
2129        // Ensure client A and B can communicate normally after reconnection.
2130        channel_a
2131            .update(&mut cx_a, |channel, cx| {
2132                channel.send_message("you online?".to_string(), cx).unwrap()
2133            })
2134            .await
2135            .unwrap();
2136        channel_b
2137            .condition(&cx_b, |channel, _| {
2138                channel_messages(channel)
2139                    == [
2140                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2141                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2142                        ("user_a".to_string(), "sup".to_string(), false),
2143                        ("user_b".to_string(), "can you see this?".to_string(), false),
2144                        ("user_a".to_string(), "you online?".to_string(), false),
2145                    ]
2146            })
2147            .await;
2148
2149        channel_b
2150            .update(&mut cx_b, |channel, cx| {
2151                channel.send_message("yep".to_string(), cx).unwrap()
2152            })
2153            .await
2154            .unwrap();
2155        channel_a
2156            .condition(&cx_a, |channel, _| {
2157                channel_messages(channel)
2158                    == [
2159                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2160                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2161                        ("user_a".to_string(), "sup".to_string(), false),
2162                        ("user_b".to_string(), "can you see this?".to_string(), false),
2163                        ("user_a".to_string(), "you online?".to_string(), false),
2164                        ("user_b".to_string(), "yep".to_string(), false),
2165                    ]
2166            })
2167            .await;
2168    }
2169
2170    #[gpui::test]
2171    async fn test_contacts(
2172        mut cx_a: TestAppContext,
2173        mut cx_b: TestAppContext,
2174        mut cx_c: TestAppContext,
2175    ) {
2176        cx_a.foreground().forbid_parking();
2177        let lang_registry = Arc::new(LanguageRegistry::new());
2178
2179        // Connect to a server as 3 clients.
2180        let mut server = TestServer::start().await;
2181        let client_a = server.create_client(&mut cx_a, "user_a").await;
2182        let client_b = server.create_client(&mut cx_b, "user_b").await;
2183        let client_c = server.create_client(&mut cx_c, "user_c").await;
2184
2185        let fs = Arc::new(FakeFs::new());
2186
2187        // Share a worktree as client A.
2188        fs.insert_tree(
2189            "/a",
2190            json!({
2191                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2192            }),
2193        )
2194        .await;
2195
2196        let worktree_a = Worktree::open_local(
2197            client_a.clone(),
2198            client_a.user_store.clone(),
2199            "/a".as_ref(),
2200            fs.clone(),
2201            lang_registry.clone(),
2202            &mut cx_a.to_async(),
2203        )
2204        .await
2205        .unwrap();
2206
2207        client_a
2208            .user_store
2209            .condition(&cx_a, |user_store, _| {
2210                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2211            })
2212            .await;
2213        client_b
2214            .user_store
2215            .condition(&cx_b, |user_store, _| {
2216                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2217            })
2218            .await;
2219        client_c
2220            .user_store
2221            .condition(&cx_c, |user_store, _| {
2222                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2223            })
2224            .await;
2225
2226        let worktree_id = worktree_a
2227            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2228            .await
2229            .unwrap();
2230
2231        let _worktree_b = Worktree::open_remote(
2232            client_b.clone(),
2233            worktree_id,
2234            lang_registry.clone(),
2235            client_b.user_store.clone(),
2236            &mut cx_b.to_async(),
2237        )
2238        .await
2239        .unwrap();
2240
2241        client_a
2242            .user_store
2243            .condition(&cx_a, |user_store, _| {
2244                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2245            })
2246            .await;
2247        client_b
2248            .user_store
2249            .condition(&cx_b, |user_store, _| {
2250                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2251            })
2252            .await;
2253        client_c
2254            .user_store
2255            .condition(&cx_c, |user_store, _| {
2256                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2257            })
2258            .await;
2259
2260        worktree_a
2261            .condition(&cx_a, |worktree, _| {
2262                worktree.collaborators().contains_key(&client_b.peer_id)
2263            })
2264            .await;
2265
2266        cx_a.update(move |_| drop(worktree_a));
2267        client_a
2268            .user_store
2269            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2270            .await;
2271        client_b
2272            .user_store
2273            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2274            .await;
2275        client_c
2276            .user_store
2277            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2278            .await;
2279
2280        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2281            user_store
2282                .contacts()
2283                .iter()
2284                .map(|contact| {
2285                    let worktrees = contact
2286                        .worktrees
2287                        .iter()
2288                        .map(|w| {
2289                            (
2290                                w.root_name.as_str(),
2291                                w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2292                            )
2293                        })
2294                        .collect();
2295                    (contact.user.github_login.as_str(), worktrees)
2296                })
2297                .collect()
2298        }
2299    }
2300
2301    struct TestServer {
2302        peer: Arc<Peer>,
2303        app_state: Arc<AppState>,
2304        server: Arc<Server>,
2305        notifications: mpsc::Receiver<()>,
2306        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2307        forbid_connections: Arc<AtomicBool>,
2308        _test_db: TestDb,
2309    }
2310
2311    impl TestServer {
2312        async fn start() -> Self {
2313            let test_db = TestDb::new();
2314            let app_state = Self::build_app_state(&test_db).await;
2315            let peer = Peer::new();
2316            let notifications = mpsc::channel(128);
2317            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2318            Self {
2319                peer,
2320                app_state,
2321                server,
2322                notifications: notifications.1,
2323                connection_killers: Default::default(),
2324                forbid_connections: Default::default(),
2325                _test_db: test_db,
2326            }
2327        }
2328
2329        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2330            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2331            let client_name = name.to_string();
2332            let mut client = Client::new();
2333            let server = self.server.clone();
2334            let connection_killers = self.connection_killers.clone();
2335            let forbid_connections = self.forbid_connections.clone();
2336            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2337
2338            Arc::get_mut(&mut client)
2339                .unwrap()
2340                .override_authenticate(move |cx| {
2341                    cx.spawn(|_| async move {
2342                        let access_token = "the-token".to_string();
2343                        Ok(Credentials {
2344                            user_id: user_id.0 as u64,
2345                            access_token,
2346                        })
2347                    })
2348                })
2349                .override_establish_connection(move |credentials, cx| {
2350                    assert_eq!(credentials.user_id, user_id.0 as u64);
2351                    assert_eq!(credentials.access_token, "the-token");
2352
2353                    let server = server.clone();
2354                    let connection_killers = connection_killers.clone();
2355                    let forbid_connections = forbid_connections.clone();
2356                    let client_name = client_name.clone();
2357                    let connection_id_tx = connection_id_tx.clone();
2358                    cx.spawn(move |cx| async move {
2359                        if forbid_connections.load(SeqCst) {
2360                            Err(EstablishConnectionError::other(anyhow!(
2361                                "server is forbidding connections"
2362                            )))
2363                        } else {
2364                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2365                            connection_killers.lock().insert(user_id, kill_conn);
2366                            cx.background()
2367                                .spawn(server.handle_connection(
2368                                    server_conn,
2369                                    client_name,
2370                                    user_id,
2371                                    Some(connection_id_tx),
2372                                ))
2373                                .detach();
2374                            Ok(client_conn)
2375                        }
2376                    })
2377                });
2378
2379            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2380            client
2381                .authenticate_and_connect(&cx.to_async())
2382                .await
2383                .unwrap();
2384
2385            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2386            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2387            let mut authed_user =
2388                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2389            while authed_user.recv().await.unwrap().is_none() {}
2390
2391            TestClient {
2392                client,
2393                peer_id,
2394                user_store,
2395            }
2396        }
2397
2398        fn disconnect_client(&self, user_id: UserId) {
2399            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2400                let _ = kill_conn.try_send(Some(()));
2401            }
2402        }
2403
2404        fn forbid_connections(&self) {
2405            self.forbid_connections.store(true, SeqCst);
2406        }
2407
2408        fn allow_connections(&self) {
2409            self.forbid_connections.store(false, SeqCst);
2410        }
2411
2412        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2413            let mut config = Config::default();
2414            config.session_secret = "a".repeat(32);
2415            config.database_url = test_db.url.clone();
2416            let github_client = github::AppClient::test();
2417            Arc::new(AppState {
2418                db: test_db.db().clone(),
2419                handlebars: Default::default(),
2420                auth_client: auth::build_client("", ""),
2421                repo_client: github::RepoClient::test(&github_client),
2422                github_client,
2423                config,
2424            })
2425        }
2426
2427        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2428            self.server.store.read()
2429        }
2430
2431        async fn condition<F>(&mut self, mut predicate: F)
2432        where
2433            F: FnMut(&Store) -> bool,
2434        {
2435            async_std::future::timeout(Duration::from_millis(500), async {
2436                while !(predicate)(&*self.server.store.read()) {
2437                    self.notifications.recv().await;
2438                }
2439            })
2440            .await
2441            .expect("condition timed out");
2442        }
2443    }
2444
2445    impl Drop for TestServer {
2446        fn drop(&mut self) {
2447            task::block_on(self.peer.reset());
2448        }
2449    }
2450
2451    struct TestClient {
2452        client: Arc<Client>,
2453        pub peer_id: PeerId,
2454        pub user_store: ModelHandle<UserStore>,
2455    }
2456
2457    impl Deref for TestClient {
2458        type Target = Arc<Client>;
2459
2460        fn deref(&self) -> &Self::Target {
2461            &self.client
2462        }
2463    }
2464
2465    impl TestClient {
2466        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2467            UserId::from_proto(
2468                self.user_store
2469                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2470            )
2471        }
2472    }
2473
2474    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2475        channel
2476            .messages()
2477            .cursor::<()>()
2478            .map(|m| {
2479                (
2480                    m.sender.github_login.clone(),
2481                    m.body.clone(),
2482                    m.is_pending(),
2483                )
2484            })
2485            .collect()
2486    }
2487
2488    struct EmptyView;
2489
2490    impl gpui::Entity for EmptyView {
2491        type Event = ();
2492    }
2493
2494    impl gpui::View for EmptyView {
2495        fn ui_name() -> &'static str {
2496            "empty view"
2497        }
2498
2499        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2500            gpui::Element::boxed(gpui::elements::Empty)
2501        }
2502    }
2503}