rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46
  47impl Server {
  48    pub fn new(
  49        app_state: Arc<AppState>,
  50        peer: Arc<Peer>,
  51        notifications: Option<mpsc::Sender<()>>,
  52    ) -> Arc<Self> {
  53        let mut server = Self {
  54            peer,
  55            app_state,
  56            store: Default::default(),
  57            handlers: Default::default(),
  58            notifications,
  59        };
  60
  61        server
  62            .add_handler(Server::ping)
  63            .add_handler(Server::register_worktree)
  64            .add_handler(Server::unregister_worktree)
  65            .add_handler(Server::share_worktree)
  66            .add_handler(Server::unshare_worktree)
  67            .add_handler(Server::join_worktree)
  68            .add_handler(Server::leave_worktree)
  69            .add_handler(Server::update_worktree)
  70            .add_handler(Server::open_buffer)
  71            .add_handler(Server::close_buffer)
  72            .add_handler(Server::update_buffer)
  73            .add_handler(Server::buffer_saved)
  74            .add_handler(Server::save_buffer)
  75            .add_handler(Server::get_channels)
  76            .add_handler(Server::get_users)
  77            .add_handler(Server::join_channel)
  78            .add_handler(Server::leave_channel)
  79            .add_handler(Server::send_channel_message)
  80            .add_handler(Server::get_channel_messages);
  81
  82        Arc::new(server)
  83    }
  84
  85    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  86    where
  87        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  88        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  89        M: EnvelopedMessage,
  90    {
  91        let prev_handler = self.handlers.insert(
  92            TypeId::of::<M>(),
  93            Box::new(move |server, envelope| {
  94                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
  95                (handler)(server, *envelope).boxed()
  96            }),
  97        );
  98        if prev_handler.is_some() {
  99            panic!("registered a handler for the same message twice");
 100        }
 101        self
 102    }
 103
 104    pub fn handle_connection(
 105        self: &Arc<Self>,
 106        connection: Connection,
 107        addr: String,
 108        user_id: UserId,
 109        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 110    ) -> impl Future<Output = ()> {
 111        let mut this = self.clone();
 112        async move {
 113            let (connection_id, handle_io, mut incoming_rx) =
 114                this.peer.add_connection(connection).await;
 115
 116            if let Some(send_connection_id) = send_connection_id.as_mut() {
 117                let _ = send_connection_id.send(connection_id).await;
 118            }
 119
 120            this.state_mut().add_connection(connection_id, user_id);
 121            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 122                log::error!("error updating contacts for {:?}: {}", user_id, err);
 123            }
 124
 125            let handle_io = handle_io.fuse();
 126            futures::pin_mut!(handle_io);
 127            loop {
 128                let next_message = incoming_rx.recv().fuse();
 129                futures::pin_mut!(next_message);
 130                futures::select_biased! {
 131                    message = next_message => {
 132                        if let Some(message) = message {
 133                            let start_time = Instant::now();
 134                            log::info!("RPC message received: {}", message.payload_type_name());
 135                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 136                                if let Err(err) = (handler)(this.clone(), message).await {
 137                                    log::error!("error handling message: {:?}", err);
 138                                } else {
 139                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 140                                }
 141
 142                                if let Some(mut notifications) = this.notifications.clone() {
 143                                    let _ = notifications.send(()).await;
 144                                }
 145                            } else {
 146                                log::warn!("unhandled message: {}", message.payload_type_name());
 147                            }
 148                        } else {
 149                            log::info!("rpc connection closed {:?}", addr);
 150                            break;
 151                        }
 152                    }
 153                    handle_io = handle_io => {
 154                        if let Err(err) = handle_io {
 155                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 156                        }
 157                        break;
 158                    }
 159                }
 160            }
 161
 162            if let Err(err) = this.sign_out(connection_id).await {
 163                log::error!("error signing out connection {:?} - {:?}", addr, err);
 164            }
 165        }
 166    }
 167
 168    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 169        self.peer.disconnect(connection_id).await;
 170        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 171
 172        for (project_id, project) in removed_connection.hosted_projects {
 173            if let Some(share) = project.share {
 174                broadcast(
 175                    connection_id,
 176                    share.guests.keys().copied().collect(),
 177                    |conn_id| {
 178                        self.peer
 179                            .send(conn_id, proto::UnshareProject { project_id })
 180                    },
 181                )
 182                .await?;
 183            }
 184        }
 185
 186        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 187            broadcast(connection_id, peer_ids, |conn_id| {
 188                self.peer.send(
 189                    conn_id,
 190                    proto::RemoveProjectCollaborator {
 191                        project_id,
 192                        peer_id: connection_id.0,
 193                    },
 194                )
 195            })
 196            .await?;
 197        }
 198
 199        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 200            .await?;
 201
 202        Ok(())
 203    }
 204
 205    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 206        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 207        Ok(())
 208    }
 209
 210    async fn register_worktree(
 211        mut self: Arc<Server>,
 212        request: TypedEnvelope<proto::RegisterWorktree>,
 213    ) -> tide::Result<()> {
 214        let receipt = request.receipt();
 215        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 216
 217        let mut contact_user_ids = HashSet::default();
 218        contact_user_ids.insert(host_user_id);
 219        for github_login in request.payload.authorized_logins {
 220            match self.app_state.db.create_user(&github_login, false).await {
 221                Ok(contact_user_id) => {
 222                    contact_user_ids.insert(contact_user_id);
 223                }
 224                Err(err) => {
 225                    let message = err.to_string();
 226                    self.peer
 227                        .respond_with_error(receipt, proto::Error { message })
 228                        .await?;
 229                    return Ok(());
 230                }
 231            }
 232        }
 233
 234        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 235        let ok = self.state_mut().register_worktree(
 236            request.project_id,
 237            request.worktree_id,
 238            Worktree {
 239                authorized_user_ids: contact_user_ids.clone(),
 240                root_name: request.payload.root_name,
 241            },
 242        );
 243
 244        if ok {
 245            self.peer.respond(receipt, proto::Ack {}).await?;
 246            self.update_contacts_for_users(&contact_user_ids).await?;
 247        } else {
 248            self.peer
 249                .respond_with_error(
 250                    receipt,
 251                    proto::Error {
 252                        message: "no such project".to_string(),
 253                    },
 254                )
 255                .await?;
 256        }
 257
 258        Ok(())
 259    }
 260
 261    async fn unregister_worktree(
 262        mut self: Arc<Server>,
 263        request: TypedEnvelope<proto::UnregisterWorktree>,
 264    ) -> tide::Result<()> {
 265        let project_id = request.payload.project_id;
 266        let worktree_id = request.payload.worktree_id;
 267        let worktree =
 268            self.state_mut()
 269                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 270
 271        if let Some(share) = worktree.share {
 272            broadcast(
 273                request.sender_id,
 274                share.guests.keys().copied().collect(),
 275                |conn_id| {
 276                    self.peer.send(
 277                        conn_id,
 278                        proto::UnregisterWorktree {
 279                            project_id,
 280                            worktree_id,
 281                        },
 282                    )
 283                },
 284            )
 285            .await?;
 286        }
 287        self.update_contacts_for_users(&worktree.authorized_user_ids)
 288            .await?;
 289        Ok(())
 290    }
 291
 292    async fn share_worktree(
 293        mut self: Arc<Server>,
 294        mut request: TypedEnvelope<proto::ShareWorktree>,
 295    ) -> tide::Result<()> {
 296        let worktree = request
 297            .payload
 298            .worktree
 299            .as_mut()
 300            .ok_or_else(|| anyhow!("missing worktree"))?;
 301        let entries = mem::take(&mut worktree.entries)
 302            .into_iter()
 303            .map(|entry| (entry.id, entry))
 304            .collect();
 305
 306        let contact_user_ids =
 307            self.state_mut()
 308                .share_worktree(worktree.id, request.sender_id, entries);
 309        if let Some(contact_user_ids) = contact_user_ids {
 310            self.peer
 311                .respond(request.receipt(), proto::ShareWorktreeResponse {})
 312                .await?;
 313            self.update_contacts_for_users(&contact_user_ids).await?;
 314        } else {
 315            self.peer
 316                .respond_with_error(
 317                    request.receipt(),
 318                    proto::Error {
 319                        message: "no such worktree".to_string(),
 320                    },
 321                )
 322                .await?;
 323        }
 324        Ok(())
 325    }
 326
 327    async fn unshare_worktree(
 328        mut self: Arc<Server>,
 329        request: TypedEnvelope<proto::UnshareWorktree>,
 330    ) -> tide::Result<()> {
 331        let worktree_id = request.payload.worktree_id;
 332        let worktree = self
 333            .state_mut()
 334            .unshare_worktree(worktree_id, request.sender_id)?;
 335
 336        broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
 337            self.peer
 338                .send(conn_id, proto::UnshareWorktree { worktree_id })
 339        })
 340        .await?;
 341        self.update_contacts_for_users(&worktree.authorized_user_ids)
 342            .await?;
 343
 344        Ok(())
 345    }
 346
 347    async fn join_worktree(
 348        mut self: Arc<Server>,
 349        request: TypedEnvelope<proto::JoinWorktree>,
 350    ) -> tide::Result<()> {
 351        let worktree_id = request.payload.worktree_id;
 352
 353        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 354        let response_data = self
 355            .state_mut()
 356            .join_worktree(request.sender_id, user_id, worktree_id)
 357            .and_then(|joined| {
 358                let share = joined.worktree.share()?;
 359                let peer_count = share.guests.len();
 360                let mut collaborators = Vec::with_capacity(peer_count);
 361                collaborators.push(proto::Collaborator {
 362                    peer_id: joined.worktree.host_connection_id.0,
 363                    replica_id: 0,
 364                    user_id: joined.worktree.host_user_id.to_proto(),
 365                });
 366                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 367                    if *peer_conn_id != request.sender_id {
 368                        collaborators.push(proto::Collaborator {
 369                            peer_id: peer_conn_id.0,
 370                            replica_id: *peer_replica_id as u32,
 371                            user_id: peer_user_id.to_proto(),
 372                        });
 373                    }
 374                }
 375                let response = proto::JoinWorktreeResponse {
 376                    worktree: Some(proto::Worktree {
 377                        id: worktree_id,
 378                        root_name: joined.worktree.root_name.clone(),
 379                        entries: share.entries.values().cloned().collect(),
 380                    }),
 381                    replica_id: joined.replica_id as u32,
 382                    collaborators,
 383                };
 384                let connection_ids = joined.worktree.connection_ids();
 385                let contact_user_ids = joined.worktree.authorized_user_ids.clone();
 386                Ok((response, connection_ids, contact_user_ids))
 387            });
 388
 389        match response_data {
 390            Ok((response, connection_ids, contact_user_ids)) => {
 391                broadcast(request.sender_id, connection_ids, |conn_id| {
 392                    self.peer.send(
 393                        conn_id,
 394                        proto::AddCollaborator {
 395                            worktree_id,
 396                            collaborator: Some(proto::Collaborator {
 397                                peer_id: request.sender_id.0,
 398                                replica_id: response.replica_id,
 399                                user_id: user_id.to_proto(),
 400                            }),
 401                        },
 402                    )
 403                })
 404                .await?;
 405                self.peer.respond(request.receipt(), response).await?;
 406                self.update_contacts_for_users(&contact_user_ids).await?;
 407            }
 408            Err(error) => {
 409                self.peer
 410                    .respond_with_error(
 411                        request.receipt(),
 412                        proto::Error {
 413                            message: error.to_string(),
 414                        },
 415                    )
 416                    .await?;
 417            }
 418        }
 419
 420        Ok(())
 421    }
 422
 423    async fn leave_worktree(
 424        mut self: Arc<Server>,
 425        request: TypedEnvelope<proto::LeaveWorktree>,
 426    ) -> tide::Result<()> {
 427        let sender_id = request.sender_id;
 428        let worktree_id = request.payload.worktree_id;
 429        let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
 430        if let Some(worktree) = worktree {
 431            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 432                self.peer.send(
 433                    conn_id,
 434                    proto::RemoveCollaborator {
 435                        worktree_id,
 436                        peer_id: sender_id.0,
 437                    },
 438                )
 439            })
 440            .await?;
 441            self.update_contacts_for_users(&worktree.authorized_user_ids)
 442                .await?;
 443        }
 444        Ok(())
 445    }
 446
 447    async fn update_worktree(
 448        mut self: Arc<Server>,
 449        request: TypedEnvelope<proto::UpdateWorktree>,
 450    ) -> tide::Result<()> {
 451        let connection_ids = self.state_mut().update_worktree(
 452            request.sender_id,
 453            request.payload.worktree_id,
 454            &request.payload.removed_entries,
 455            &request.payload.updated_entries,
 456        )?;
 457
 458        broadcast(request.sender_id, connection_ids, |connection_id| {
 459            self.peer
 460                .forward_send(request.sender_id, connection_id, request.payload.clone())
 461        })
 462        .await?;
 463
 464        Ok(())
 465    }
 466
 467    async fn open_buffer(
 468        self: Arc<Server>,
 469        request: TypedEnvelope<proto::OpenBuffer>,
 470    ) -> tide::Result<()> {
 471        let receipt = request.receipt();
 472        let host_connection_id = self
 473            .state()
 474            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 475        let response = self
 476            .peer
 477            .forward_request(request.sender_id, host_connection_id, request.payload)
 478            .await?;
 479        self.peer.respond(receipt, response).await?;
 480        Ok(())
 481    }
 482
 483    async fn close_buffer(
 484        self: Arc<Server>,
 485        request: TypedEnvelope<proto::CloseBuffer>,
 486    ) -> tide::Result<()> {
 487        let host_connection_id = self
 488            .state()
 489            .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 490        self.peer
 491            .forward_send(request.sender_id, host_connection_id, request.payload)
 492            .await?;
 493        Ok(())
 494    }
 495
 496    async fn save_buffer(
 497        self: Arc<Server>,
 498        request: TypedEnvelope<proto::SaveBuffer>,
 499    ) -> tide::Result<()> {
 500        let host;
 501        let guests;
 502        {
 503            let state = self.state();
 504            host = state
 505                .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
 506            guests = state
 507                .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
 508        }
 509
 510        let sender = request.sender_id;
 511        let receipt = request.receipt();
 512        let response = self
 513            .peer
 514            .forward_request(sender, host, request.payload.clone())
 515            .await?;
 516
 517        broadcast(host, guests, |conn_id| {
 518            let response = response.clone();
 519            let peer = &self.peer;
 520            async move {
 521                if conn_id == sender {
 522                    peer.respond(receipt, response).await
 523                } else {
 524                    peer.forward_send(host, conn_id, response).await
 525                }
 526            }
 527        })
 528        .await?;
 529
 530        Ok(())
 531    }
 532
 533    async fn update_buffer(
 534        self: Arc<Server>,
 535        request: TypedEnvelope<proto::UpdateBuffer>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = self
 538            .state()
 539            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 540        broadcast(request.sender_id, receiver_ids, |connection_id| {
 541            self.peer
 542                .forward_send(request.sender_id, connection_id, request.payload.clone())
 543        })
 544        .await?;
 545        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 546        Ok(())
 547    }
 548
 549    async fn buffer_saved(
 550        self: Arc<Server>,
 551        request: TypedEnvelope<proto::BufferSaved>,
 552    ) -> tide::Result<()> {
 553        let receiver_ids = self
 554            .state()
 555            .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
 556        broadcast(request.sender_id, receiver_ids, |connection_id| {
 557            self.peer
 558                .forward_send(request.sender_id, connection_id, request.payload.clone())
 559        })
 560        .await?;
 561        Ok(())
 562    }
 563
 564    async fn get_channels(
 565        self: Arc<Server>,
 566        request: TypedEnvelope<proto::GetChannels>,
 567    ) -> tide::Result<()> {
 568        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 569        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 570        self.peer
 571            .respond(
 572                request.receipt(),
 573                proto::GetChannelsResponse {
 574                    channels: channels
 575                        .into_iter()
 576                        .map(|chan| proto::Channel {
 577                            id: chan.id.to_proto(),
 578                            name: chan.name,
 579                        })
 580                        .collect(),
 581                },
 582            )
 583            .await?;
 584        Ok(())
 585    }
 586
 587    async fn get_users(
 588        self: Arc<Server>,
 589        request: TypedEnvelope<proto::GetUsers>,
 590    ) -> tide::Result<()> {
 591        let receipt = request.receipt();
 592        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 593        let users = self
 594            .app_state
 595            .db
 596            .get_users_by_ids(user_ids)
 597            .await?
 598            .into_iter()
 599            .map(|user| proto::User {
 600                id: user.id.to_proto(),
 601                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 602                github_login: user.github_login,
 603            })
 604            .collect();
 605        self.peer
 606            .respond(receipt, proto::GetUsersResponse { users })
 607            .await?;
 608        Ok(())
 609    }
 610
 611    async fn update_contacts_for_users<'a>(
 612        self: &Arc<Server>,
 613        user_ids: impl IntoIterator<Item = &'a UserId>,
 614    ) -> tide::Result<()> {
 615        let mut send_futures = Vec::new();
 616
 617        {
 618            let state = self.state();
 619            for user_id in user_ids {
 620                let contacts = state.contacts_for_user(*user_id);
 621                for connection_id in state.connection_ids_for_user(*user_id) {
 622                    send_futures.push(self.peer.send(
 623                        connection_id,
 624                        proto::UpdateContacts {
 625                            contacts: contacts.clone(),
 626                        },
 627                    ));
 628                }
 629            }
 630        }
 631        futures::future::try_join_all(send_futures).await?;
 632
 633        Ok(())
 634    }
 635
 636    async fn join_channel(
 637        mut self: Arc<Self>,
 638        request: TypedEnvelope<proto::JoinChannel>,
 639    ) -> tide::Result<()> {
 640        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 641        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 642        if !self
 643            .app_state
 644            .db
 645            .can_user_access_channel(user_id, channel_id)
 646            .await?
 647        {
 648            Err(anyhow!("access denied"))?;
 649        }
 650
 651        self.state_mut().join_channel(request.sender_id, channel_id);
 652        let messages = self
 653            .app_state
 654            .db
 655            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 656            .await?
 657            .into_iter()
 658            .map(|msg| proto::ChannelMessage {
 659                id: msg.id.to_proto(),
 660                body: msg.body,
 661                timestamp: msg.sent_at.unix_timestamp() as u64,
 662                sender_id: msg.sender_id.to_proto(),
 663                nonce: Some(msg.nonce.as_u128().into()),
 664            })
 665            .collect::<Vec<_>>();
 666        self.peer
 667            .respond(
 668                request.receipt(),
 669                proto::JoinChannelResponse {
 670                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 671                    messages,
 672                },
 673            )
 674            .await?;
 675        Ok(())
 676    }
 677
 678    async fn leave_channel(
 679        mut self: Arc<Self>,
 680        request: TypedEnvelope<proto::LeaveChannel>,
 681    ) -> tide::Result<()> {
 682        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 683        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 684        if !self
 685            .app_state
 686            .db
 687            .can_user_access_channel(user_id, channel_id)
 688            .await?
 689        {
 690            Err(anyhow!("access denied"))?;
 691        }
 692
 693        self.state_mut()
 694            .leave_channel(request.sender_id, channel_id);
 695
 696        Ok(())
 697    }
 698
 699    async fn send_channel_message(
 700        self: Arc<Self>,
 701        request: TypedEnvelope<proto::SendChannelMessage>,
 702    ) -> tide::Result<()> {
 703        let receipt = request.receipt();
 704        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 705        let user_id;
 706        let connection_ids;
 707        {
 708            let state = self.state();
 709            user_id = state.user_id_for_connection(request.sender_id)?;
 710            if let Some(ids) = state.channel_connection_ids(channel_id) {
 711                connection_ids = ids;
 712            } else {
 713                return Ok(());
 714            }
 715        }
 716
 717        // Validate the message body.
 718        let body = request.payload.body.trim().to_string();
 719        if body.len() > MAX_MESSAGE_LEN {
 720            self.peer
 721                .respond_with_error(
 722                    receipt,
 723                    proto::Error {
 724                        message: "message is too long".to_string(),
 725                    },
 726                )
 727                .await?;
 728            return Ok(());
 729        }
 730        if body.is_empty() {
 731            self.peer
 732                .respond_with_error(
 733                    receipt,
 734                    proto::Error {
 735                        message: "message can't be blank".to_string(),
 736                    },
 737                )
 738                .await?;
 739            return Ok(());
 740        }
 741
 742        let timestamp = OffsetDateTime::now_utc();
 743        let nonce = if let Some(nonce) = request.payload.nonce {
 744            nonce
 745        } else {
 746            self.peer
 747                .respond_with_error(
 748                    receipt,
 749                    proto::Error {
 750                        message: "nonce can't be blank".to_string(),
 751                    },
 752                )
 753                .await?;
 754            return Ok(());
 755        };
 756
 757        let message_id = self
 758            .app_state
 759            .db
 760            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 761            .await?
 762            .to_proto();
 763        let message = proto::ChannelMessage {
 764            sender_id: user_id.to_proto(),
 765            id: message_id,
 766            body,
 767            timestamp: timestamp.unix_timestamp() as u64,
 768            nonce: Some(nonce),
 769        };
 770        broadcast(request.sender_id, connection_ids, |conn_id| {
 771            self.peer.send(
 772                conn_id,
 773                proto::ChannelMessageSent {
 774                    channel_id: channel_id.to_proto(),
 775                    message: Some(message.clone()),
 776                },
 777            )
 778        })
 779        .await?;
 780        self.peer
 781            .respond(
 782                receipt,
 783                proto::SendChannelMessageResponse {
 784                    message: Some(message),
 785                },
 786            )
 787            .await?;
 788        Ok(())
 789    }
 790
 791    async fn get_channel_messages(
 792        self: Arc<Self>,
 793        request: TypedEnvelope<proto::GetChannelMessages>,
 794    ) -> tide::Result<()> {
 795        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 796        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 797        if !self
 798            .app_state
 799            .db
 800            .can_user_access_channel(user_id, channel_id)
 801            .await?
 802        {
 803            Err(anyhow!("access denied"))?;
 804        }
 805
 806        let messages = self
 807            .app_state
 808            .db
 809            .get_channel_messages(
 810                channel_id,
 811                MESSAGE_COUNT_PER_PAGE,
 812                Some(MessageId::from_proto(request.payload.before_message_id)),
 813            )
 814            .await?
 815            .into_iter()
 816            .map(|msg| proto::ChannelMessage {
 817                id: msg.id.to_proto(),
 818                body: msg.body,
 819                timestamp: msg.sent_at.unix_timestamp() as u64,
 820                sender_id: msg.sender_id.to_proto(),
 821                nonce: Some(msg.nonce.as_u128().into()),
 822            })
 823            .collect::<Vec<_>>();
 824        self.peer
 825            .respond(
 826                request.receipt(),
 827                proto::GetChannelMessagesResponse {
 828                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 829                    messages,
 830                },
 831            )
 832            .await?;
 833        Ok(())
 834    }
 835
 836    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 837        self.store.read()
 838    }
 839
 840    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 841        self.store.write()
 842    }
 843}
 844
 845pub async fn broadcast<F, T>(
 846    sender_id: ConnectionId,
 847    receiver_ids: Vec<ConnectionId>,
 848    mut f: F,
 849) -> anyhow::Result<()>
 850where
 851    F: FnMut(ConnectionId) -> T,
 852    T: Future<Output = anyhow::Result<()>>,
 853{
 854    let futures = receiver_ids
 855        .into_iter()
 856        .filter(|id| *id != sender_id)
 857        .map(|id| f(id));
 858    futures::future::try_join_all(futures).await?;
 859    Ok(())
 860}
 861
 862pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 863    let server = Server::new(app.state().clone(), rpc.clone(), None);
 864    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 865        let server = server.clone();
 866        async move {
 867            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 868
 869            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 870            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 871            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 872            let client_protocol_version: Option<u32> = request
 873                .header("X-Zed-Protocol-Version")
 874                .and_then(|v| v.as_str().parse().ok());
 875
 876            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 877                return Ok(Response::new(StatusCode::UpgradeRequired));
 878            }
 879
 880            let header = match request.header("Sec-Websocket-Key") {
 881                Some(h) => h.as_str(),
 882                None => return Err(anyhow!("expected sec-websocket-key"))?,
 883            };
 884
 885            let user_id = process_auth_header(&request).await?;
 886
 887            let mut response = Response::new(StatusCode::SwitchingProtocols);
 888            response.insert_header(UPGRADE, "websocket");
 889            response.insert_header(CONNECTION, "Upgrade");
 890            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 891            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 892            response.insert_header("Sec-Websocket-Version", "13");
 893
 894            let http_res: &mut tide::http::Response = response.as_mut();
 895            let upgrade_receiver = http_res.recv_upgrade().await;
 896            let addr = request.remote().unwrap_or("unknown").to_string();
 897            task::spawn(async move {
 898                if let Some(stream) = upgrade_receiver.await {
 899                    server
 900                        .handle_connection(
 901                            Connection::new(
 902                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 903                            ),
 904                            addr,
 905                            user_id,
 906                            None,
 907                        )
 908                        .await;
 909                }
 910            });
 911
 912            Ok(response)
 913        }
 914    });
 915}
 916
 917fn header_contains_ignore_case<T>(
 918    request: &tide::Request<T>,
 919    header_name: HeaderName,
 920    value: &str,
 921) -> bool {
 922    request
 923        .header(header_name)
 924        .map(|h| {
 925            h.as_str()
 926                .split(',')
 927                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 928        })
 929        .unwrap_or(false)
 930}
 931
 932#[cfg(test)]
 933mod tests {
 934    use super::*;
 935    use crate::{
 936        auth,
 937        db::{tests::TestDb, UserId},
 938        github, AppState, Config,
 939    };
 940    use ::rpc::Peer;
 941    use async_std::task;
 942    use gpui::{ModelHandle, TestAppContext};
 943    use parking_lot::Mutex;
 944    use postage::{mpsc, watch};
 945    use rpc::PeerId;
 946    use serde_json::json;
 947    use sqlx::types::time::OffsetDateTime;
 948    use std::{
 949        ops::Deref,
 950        path::Path,
 951        sync::{
 952            atomic::{AtomicBool, Ordering::SeqCst},
 953            Arc,
 954        },
 955        time::Duration,
 956    };
 957    use zed::{
 958        client::{
 959            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
 960            EstablishConnectionError, UserStore,
 961        },
 962        contacts_panel::JoinWorktree,
 963        editor::{Editor, EditorSettings, Input, MultiBuffer},
 964        fs::{FakeFs, Fs as _},
 965        language::{
 966            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
 967            LanguageRegistry, LanguageServerConfig, Point,
 968        },
 969        lsp,
 970        project::{ProjectPath, Worktree},
 971        test::test_app_state,
 972        workspace::Workspace,
 973    };
 974
 975    #[gpui::test]
 976    async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
 977        let (window_b, _) = cx_b.add_window(|_| EmptyView);
 978        let lang_registry = Arc::new(LanguageRegistry::new());
 979
 980        // Connect to a server as 2 clients.
 981        let mut server = TestServer::start().await;
 982        let client_a = server.create_client(&mut cx_a, "user_a").await;
 983        let client_b = server.create_client(&mut cx_b, "user_b").await;
 984
 985        cx_a.foreground().forbid_parking();
 986
 987        // Share a local worktree as client A
 988        let fs = Arc::new(FakeFs::new());
 989        fs.insert_tree(
 990            "/a",
 991            json!({
 992                ".zed.toml": r#"collaborators = ["user_b"]"#,
 993                "a.txt": "a-contents",
 994                "b.txt": "b-contents",
 995            }),
 996        )
 997        .await;
 998        let worktree_a = Worktree::open_local(
 999            client_a.clone(),
1000            client_a.user_store.clone(),
1001            "/a".as_ref(),
1002            fs,
1003            lang_registry.clone(),
1004            &mut cx_a.to_async(),
1005        )
1006        .await
1007        .unwrap();
1008        worktree_a
1009            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1010            .await;
1011        let worktree_id = worktree_a
1012            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1013            .await
1014            .unwrap();
1015
1016        // Join that worktree as client B, and see that a guest has joined as client A.
1017        let worktree_b = Worktree::open_remote(
1018            client_b.clone(),
1019            worktree_id,
1020            lang_registry.clone(),
1021            client_b.user_store.clone(),
1022            &mut cx_b.to_async(),
1023        )
1024        .await
1025        .unwrap();
1026
1027        let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| {
1028            assert_eq!(
1029                tree.collaborators()
1030                    .get(&client_a.peer_id)
1031                    .unwrap()
1032                    .user
1033                    .github_login,
1034                "user_a"
1035            );
1036            tree.replica_id()
1037        });
1038        worktree_a
1039            .condition(&cx_a, |tree, _| {
1040                tree.collaborators()
1041                    .get(&client_b.peer_id)
1042                    .map_or(false, |collaborator| {
1043                        collaborator.replica_id == replica_id_b
1044                            && collaborator.user.github_login == "user_b"
1045                    })
1046            })
1047            .await;
1048
1049        // Open the same file as client B and client A.
1050        let buffer_b = worktree_b
1051            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1052            .await
1053            .unwrap();
1054        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1055        buffer_b.read_with(&cx_b, |buf, cx| {
1056            assert_eq!(buf.read(cx).text(), "b-contents")
1057        });
1058        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1059        let buffer_a = worktree_a
1060            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1061            .await
1062            .unwrap();
1063
1064        let editor_b = cx_b.add_view(window_b, |cx| {
1065            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1066        });
1067        // TODO
1068        // // Create a selection set as client B and see that selection set as client A.
1069        // buffer_a
1070        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1071        //     .await;
1072
1073        // Edit the buffer as client B and see that edit as client A.
1074        editor_b.update(&mut cx_b, |editor, cx| {
1075            editor.handle_input(&Input("ok, ".into()), cx)
1076        });
1077        buffer_a
1078            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1079            .await;
1080
1081        // TODO
1082        // // Remove the selection set as client B, see those selections disappear as client A.
1083        cx_b.update(move |_| drop(editor_b));
1084        // buffer_a
1085        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1086        //     .await;
1087
1088        // Close the buffer as client A, see that the buffer is closed.
1089        cx_a.update(move |_| drop(buffer_a));
1090        worktree_a
1091            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1092            .await;
1093
1094        // Dropping the worktree removes client B from client A's collaborators.
1095        cx_b.update(move |_| drop(worktree_b));
1096        worktree_a
1097            .condition(&cx_a, |tree, _| tree.collaborators().is_empty())
1098            .await;
1099    }
1100
1101    #[gpui::test]
1102    async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1103        cx_b.update(zed::contacts_panel::init);
1104        let mut app_state_a = cx_a.update(test_app_state);
1105        let mut app_state_b = cx_b.update(test_app_state);
1106
1107        // Connect to a server as 2 clients.
1108        let mut server = TestServer::start().await;
1109        let client_a = server.create_client(&mut cx_a, "user_a").await;
1110        let client_b = server.create_client(&mut cx_b, "user_b").await;
1111        Arc::get_mut(&mut app_state_a).unwrap().client = client_a.clone();
1112        Arc::get_mut(&mut app_state_a).unwrap().user_store = client_a.user_store.clone();
1113        Arc::get_mut(&mut app_state_b).unwrap().client = client_b.clone();
1114        Arc::get_mut(&mut app_state_b).unwrap().user_store = client_b.user_store.clone();
1115
1116        cx_a.foreground().forbid_parking();
1117
1118        // Share a local worktree as client A
1119        let fs = Arc::new(FakeFs::new());
1120        fs.insert_tree(
1121            "/a",
1122            json!({
1123                ".zed.toml": r#"collaborators = ["user_b"]"#,
1124                "a.txt": "a-contents",
1125                "b.txt": "b-contents",
1126            }),
1127        )
1128        .await;
1129        let worktree_a = Worktree::open_local(
1130            app_state_a.client.clone(),
1131            app_state_a.user_store.clone(),
1132            "/a".as_ref(),
1133            fs,
1134            app_state_a.languages.clone(),
1135            &mut cx_a.to_async(),
1136        )
1137        .await
1138        .unwrap();
1139        worktree_a
1140            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1141            .await;
1142
1143        let remote_worktree_id = worktree_a
1144            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1145            .await
1146            .unwrap();
1147
1148        let (window_b, workspace_b) =
1149            cx_b.add_window(|cx| Workspace::new(&app_state_b.as_ref().into(), cx));
1150        cx_b.update(|cx| {
1151            cx.dispatch_action(
1152                window_b,
1153                vec![workspace_b.id()],
1154                &JoinWorktree(remote_worktree_id),
1155            );
1156        });
1157        workspace_b
1158            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1159            .await;
1160
1161        let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1162            let active_pane = workspace.active_pane().read(cx);
1163            assert!(active_pane.active_item().is_none());
1164            workspace.worktrees(cx).first().unwrap().id()
1165        });
1166        workspace_b
1167            .update(&mut cx_b, |workspace, cx| {
1168                workspace.open_entry(
1169                    ProjectPath {
1170                        worktree_id: local_worktree_id_b,
1171                        path: Path::new("a.txt").into(),
1172                    },
1173                    cx,
1174                )
1175            })
1176            .unwrap()
1177            .await;
1178        workspace_b.read_with(&cx_b, |workspace, cx| {
1179            let active_pane = workspace.active_pane().read(cx);
1180            assert!(active_pane.active_item().is_some());
1181        });
1182
1183        worktree_a.update(&mut cx_a, |tree, cx| {
1184            tree.as_local_mut().unwrap().unshare(cx);
1185        });
1186        workspace_b
1187            .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1188            .await;
1189        workspace_b.read_with(&cx_b, |workspace, cx| {
1190            let active_pane = workspace.active_pane().read(cx);
1191            assert!(active_pane.active_item().is_none());
1192        });
1193    }
1194
1195    #[gpui::test]
1196    async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1197        mut cx_a: TestAppContext,
1198        mut cx_b: TestAppContext,
1199        mut cx_c: TestAppContext,
1200    ) {
1201        cx_a.foreground().forbid_parking();
1202        let lang_registry = Arc::new(LanguageRegistry::new());
1203
1204        // Connect to a server as 3 clients.
1205        let mut server = TestServer::start().await;
1206        let client_a = server.create_client(&mut cx_a, "user_a").await;
1207        let client_b = server.create_client(&mut cx_b, "user_b").await;
1208        let client_c = server.create_client(&mut cx_c, "user_c").await;
1209
1210        let fs = Arc::new(FakeFs::new());
1211
1212        // Share a worktree as client A.
1213        fs.insert_tree(
1214            "/a",
1215            json!({
1216                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1217                "file1": "",
1218                "file2": ""
1219            }),
1220        )
1221        .await;
1222
1223        let worktree_a = Worktree::open_local(
1224            client_a.clone(),
1225            client_a.user_store.clone(),
1226            "/a".as_ref(),
1227            fs.clone(),
1228            lang_registry.clone(),
1229            &mut cx_a.to_async(),
1230        )
1231        .await
1232        .unwrap();
1233        worktree_a
1234            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1235            .await;
1236        let worktree_id = worktree_a
1237            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1238            .await
1239            .unwrap();
1240
1241        // Join that worktree as clients B and C.
1242        let worktree_b = Worktree::open_remote(
1243            client_b.clone(),
1244            worktree_id,
1245            lang_registry.clone(),
1246            client_b.user_store.clone(),
1247            &mut cx_b.to_async(),
1248        )
1249        .await
1250        .unwrap();
1251        let worktree_c = Worktree::open_remote(
1252            client_c.clone(),
1253            worktree_id,
1254            lang_registry.clone(),
1255            client_c.user_store.clone(),
1256            &mut cx_c.to_async(),
1257        )
1258        .await
1259        .unwrap();
1260
1261        // Open and edit a buffer as both guests B and C.
1262        let buffer_b = worktree_b
1263            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1264            .await
1265            .unwrap();
1266        let buffer_c = worktree_c
1267            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1268            .await
1269            .unwrap();
1270        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1271        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1272
1273        // Open and edit that buffer as the host.
1274        let buffer_a = worktree_a
1275            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1276            .await
1277            .unwrap();
1278
1279        buffer_a
1280            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1281            .await;
1282        buffer_a.update(&mut cx_a, |buf, cx| {
1283            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1284        });
1285
1286        // Wait for edits to propagate
1287        buffer_a
1288            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1289            .await;
1290        buffer_b
1291            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1292            .await;
1293        buffer_c
1294            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1295            .await;
1296
1297        // Edit the buffer as the host and concurrently save as guest B.
1298        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1299        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1300        save_b.await.unwrap();
1301        assert_eq!(
1302            fs.load("/a/file1".as_ref()).await.unwrap(),
1303            "hi-a, i-am-c, i-am-b, i-am-a"
1304        );
1305        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1306        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1307        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1308
1309        // Make changes on host's file system, see those changes on the guests.
1310        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1311            .await
1312            .unwrap();
1313        fs.insert_file(Path::new("/a/file4"), "4".into())
1314            .await
1315            .unwrap();
1316
1317        worktree_b
1318            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1319            .await;
1320        worktree_c
1321            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1322            .await;
1323        worktree_b.read_with(&cx_b, |tree, _| {
1324            assert_eq!(
1325                tree.paths()
1326                    .map(|p| p.to_string_lossy())
1327                    .collect::<Vec<_>>(),
1328                &[".zed.toml", "file1", "file3", "file4"]
1329            )
1330        });
1331        worktree_c.read_with(&cx_c, |tree, _| {
1332            assert_eq!(
1333                tree.paths()
1334                    .map(|p| p.to_string_lossy())
1335                    .collect::<Vec<_>>(),
1336                &[".zed.toml", "file1", "file3", "file4"]
1337            )
1338        });
1339    }
1340
1341    #[gpui::test]
1342    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1343        cx_a.foreground().forbid_parking();
1344        let lang_registry = Arc::new(LanguageRegistry::new());
1345
1346        // Connect to a server as 2 clients.
1347        let mut server = TestServer::start().await;
1348        let client_a = server.create_client(&mut cx_a, "user_a").await;
1349        let client_b = server.create_client(&mut cx_b, "user_b").await;
1350
1351        // Share a local worktree as client A
1352        let fs = Arc::new(FakeFs::new());
1353        fs.insert_tree(
1354            "/dir",
1355            json!({
1356                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1357                "a.txt": "a-contents",
1358            }),
1359        )
1360        .await;
1361
1362        let worktree_a = Worktree::open_local(
1363            client_a.clone(),
1364            client_a.user_store.clone(),
1365            "/dir".as_ref(),
1366            fs,
1367            lang_registry.clone(),
1368            &mut cx_a.to_async(),
1369        )
1370        .await
1371        .unwrap();
1372        worktree_a
1373            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1374            .await;
1375        let worktree_id = worktree_a
1376            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1377            .await
1378            .unwrap();
1379
1380        // Join that worktree as client B, and see that a guest has joined as client A.
1381        let worktree_b = Worktree::open_remote(
1382            client_b.clone(),
1383            worktree_id,
1384            lang_registry.clone(),
1385            client_b.user_store.clone(),
1386            &mut cx_b.to_async(),
1387        )
1388        .await
1389        .unwrap();
1390
1391        let buffer_b = worktree_b
1392            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1393            .await
1394            .unwrap();
1395        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1396
1397        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1398        buffer_b.read_with(&cx_b, |buf, _| {
1399            assert!(buf.is_dirty());
1400            assert!(!buf.has_conflict());
1401        });
1402
1403        buffer_b
1404            .update(&mut cx_b, |buf, cx| buf.save(cx))
1405            .unwrap()
1406            .await
1407            .unwrap();
1408        worktree_b
1409            .condition(&cx_b, |_, cx| {
1410                buffer_b.read(cx).file().unwrap().mtime() != mtime
1411            })
1412            .await;
1413        buffer_b.read_with(&cx_b, |buf, _| {
1414            assert!(!buf.is_dirty());
1415            assert!(!buf.has_conflict());
1416        });
1417
1418        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1419        buffer_b.read_with(&cx_b, |buf, _| {
1420            assert!(buf.is_dirty());
1421            assert!(!buf.has_conflict());
1422        });
1423    }
1424
1425    #[gpui::test]
1426    async fn test_editing_while_guest_opens_buffer(
1427        mut cx_a: TestAppContext,
1428        mut cx_b: TestAppContext,
1429    ) {
1430        cx_a.foreground().forbid_parking();
1431        let lang_registry = Arc::new(LanguageRegistry::new());
1432
1433        // Connect to a server as 2 clients.
1434        let mut server = TestServer::start().await;
1435        let client_a = server.create_client(&mut cx_a, "user_a").await;
1436        let client_b = server.create_client(&mut cx_b, "user_b").await;
1437
1438        // Share a local worktree as client A
1439        let fs = Arc::new(FakeFs::new());
1440        fs.insert_tree(
1441            "/dir",
1442            json!({
1443                ".zed.toml": r#"collaborators = ["user_b"]"#,
1444                "a.txt": "a-contents",
1445            }),
1446        )
1447        .await;
1448        let worktree_a = Worktree::open_local(
1449            client_a.clone(),
1450            client_a.user_store.clone(),
1451            "/dir".as_ref(),
1452            fs,
1453            lang_registry.clone(),
1454            &mut cx_a.to_async(),
1455        )
1456        .await
1457        .unwrap();
1458        worktree_a
1459            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1460            .await;
1461        let worktree_id = worktree_a
1462            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1463            .await
1464            .unwrap();
1465
1466        // Join that worktree as client B, and see that a guest has joined as client A.
1467        let worktree_b = Worktree::open_remote(
1468            client_b.clone(),
1469            worktree_id,
1470            lang_registry.clone(),
1471            client_b.user_store.clone(),
1472            &mut cx_b.to_async(),
1473        )
1474        .await
1475        .unwrap();
1476
1477        let buffer_a = worktree_a
1478            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1479            .await
1480            .unwrap();
1481        let buffer_b = cx_b
1482            .background()
1483            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1484
1485        task::yield_now().await;
1486        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1487
1488        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1489        let buffer_b = buffer_b.await.unwrap();
1490        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1491    }
1492
1493    #[gpui::test]
1494    async fn test_leaving_worktree_while_opening_buffer(
1495        mut cx_a: TestAppContext,
1496        mut cx_b: TestAppContext,
1497    ) {
1498        cx_a.foreground().forbid_parking();
1499        let lang_registry = Arc::new(LanguageRegistry::new());
1500
1501        // Connect to a server as 2 clients.
1502        let mut server = TestServer::start().await;
1503        let client_a = server.create_client(&mut cx_a, "user_a").await;
1504        let client_b = server.create_client(&mut cx_b, "user_b").await;
1505
1506        // Share a local worktree as client A
1507        let fs = Arc::new(FakeFs::new());
1508        fs.insert_tree(
1509            "/dir",
1510            json!({
1511                ".zed.toml": r#"collaborators = ["user_b"]"#,
1512                "a.txt": "a-contents",
1513            }),
1514        )
1515        .await;
1516        let worktree_a = Worktree::open_local(
1517            client_a.clone(),
1518            client_a.user_store.clone(),
1519            "/dir".as_ref(),
1520            fs,
1521            lang_registry.clone(),
1522            &mut cx_a.to_async(),
1523        )
1524        .await
1525        .unwrap();
1526        worktree_a
1527            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1528            .await;
1529        let worktree_id = worktree_a
1530            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1531            .await
1532            .unwrap();
1533
1534        // Join that worktree as client B, and see that a guest has joined as client A.
1535        let worktree_b = Worktree::open_remote(
1536            client_b.clone(),
1537            worktree_id,
1538            lang_registry.clone(),
1539            client_b.user_store.clone(),
1540            &mut cx_b.to_async(),
1541        )
1542        .await
1543        .unwrap();
1544        worktree_a
1545            .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1546            .await;
1547
1548        let buffer_b = cx_b
1549            .background()
1550            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1551        cx_b.update(|_| drop(worktree_b));
1552        drop(buffer_b);
1553        worktree_a
1554            .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1555            .await;
1556    }
1557
1558    #[gpui::test]
1559    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1560        cx_a.foreground().forbid_parking();
1561        let lang_registry = Arc::new(LanguageRegistry::new());
1562
1563        // Connect to a server as 2 clients.
1564        let mut server = TestServer::start().await;
1565        let client_a = server.create_client(&mut cx_a, "user_a").await;
1566        let client_b = server.create_client(&mut cx_b, "user_b").await;
1567
1568        // Share a local worktree as client A
1569        let fs = Arc::new(FakeFs::new());
1570        fs.insert_tree(
1571            "/a",
1572            json!({
1573                ".zed.toml": r#"collaborators = ["user_b"]"#,
1574                "a.txt": "a-contents",
1575                "b.txt": "b-contents",
1576            }),
1577        )
1578        .await;
1579        let worktree_a = Worktree::open_local(
1580            client_a.clone(),
1581            client_a.user_store.clone(),
1582            "/a".as_ref(),
1583            fs,
1584            lang_registry.clone(),
1585            &mut cx_a.to_async(),
1586        )
1587        .await
1588        .unwrap();
1589        worktree_a
1590            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1591            .await;
1592        let worktree_id = worktree_a
1593            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1594            .await
1595            .unwrap();
1596
1597        // Join that worktree as client B, and see that a guest has joined as client A.
1598        let _worktree_b = Worktree::open_remote(
1599            client_b.clone(),
1600            worktree_id,
1601            lang_registry.clone(),
1602            client_b.user_store.clone(),
1603            &mut cx_b.to_async(),
1604        )
1605        .await
1606        .unwrap();
1607        worktree_a
1608            .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1609            .await;
1610
1611        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1612        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1613        worktree_a
1614            .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1615            .await;
1616    }
1617
1618    #[gpui::test]
1619    async fn test_collaborating_with_diagnostics(
1620        mut cx_a: TestAppContext,
1621        mut cx_b: TestAppContext,
1622    ) {
1623        cx_a.foreground().forbid_parking();
1624        let (language_server_config, mut fake_language_server) =
1625            LanguageServerConfig::fake(cx_a.background()).await;
1626        let mut lang_registry = LanguageRegistry::new();
1627        lang_registry.add(Arc::new(Language::new(
1628            LanguageConfig {
1629                name: "Rust".to_string(),
1630                path_suffixes: vec!["rs".to_string()],
1631                language_server: Some(language_server_config),
1632                ..Default::default()
1633            },
1634            Some(tree_sitter_rust::language()),
1635        )));
1636
1637        let lang_registry = Arc::new(lang_registry);
1638
1639        // Connect to a server as 2 clients.
1640        let mut server = TestServer::start().await;
1641        let client_a = server.create_client(&mut cx_a, "user_a").await;
1642        let client_b = server.create_client(&mut cx_b, "user_b").await;
1643
1644        // Share a local worktree as client A
1645        let fs = Arc::new(FakeFs::new());
1646        fs.insert_tree(
1647            "/a",
1648            json!({
1649                ".zed.toml": r#"collaborators = ["user_b"]"#,
1650                "a.rs": "let one = two",
1651                "other.rs": "",
1652            }),
1653        )
1654        .await;
1655        let worktree_a = Worktree::open_local(
1656            client_a.clone(),
1657            client_a.user_store.clone(),
1658            "/a".as_ref(),
1659            fs,
1660            lang_registry.clone(),
1661            &mut cx_a.to_async(),
1662        )
1663        .await
1664        .unwrap();
1665        worktree_a
1666            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1667            .await;
1668        let worktree_id = worktree_a
1669            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1670            .await
1671            .unwrap();
1672
1673        // Cause language server to start.
1674        let _ = cx_a
1675            .background()
1676            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1677                worktree.open_buffer("other.rs", cx)
1678            }))
1679            .await
1680            .unwrap();
1681
1682        // Simulate a language server reporting errors for a file.
1683        fake_language_server
1684            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1685                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1686                version: None,
1687                diagnostics: vec![
1688                    lsp::Diagnostic {
1689                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1690                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1691                        message: "message 1".to_string(),
1692                        ..Default::default()
1693                    },
1694                    lsp::Diagnostic {
1695                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1696                        range: lsp::Range::new(
1697                            lsp::Position::new(0, 10),
1698                            lsp::Position::new(0, 13),
1699                        ),
1700                        message: "message 2".to_string(),
1701                        ..Default::default()
1702                    },
1703                ],
1704            })
1705            .await;
1706
1707        // Join the worktree as client B.
1708        let worktree_b = Worktree::open_remote(
1709            client_b.clone(),
1710            worktree_id,
1711            lang_registry.clone(),
1712            client_b.user_store.clone(),
1713            &mut cx_b.to_async(),
1714        )
1715        .await
1716        .unwrap();
1717
1718        // Open the file with the errors.
1719        let buffer_b = cx_b
1720            .background()
1721            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1722            .await
1723            .unwrap();
1724
1725        buffer_b.read_with(&cx_b, |buffer, _| {
1726            assert_eq!(
1727                buffer
1728                    .snapshot()
1729                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1730                    .collect::<Vec<_>>(),
1731                &[
1732                    DiagnosticEntry {
1733                        range: Point::new(0, 4)..Point::new(0, 7),
1734                        diagnostic: Diagnostic {
1735                            group_id: 0,
1736                            message: "message 1".to_string(),
1737                            severity: lsp::DiagnosticSeverity::ERROR,
1738                            is_primary: true,
1739                            ..Default::default()
1740                        }
1741                    },
1742                    DiagnosticEntry {
1743                        range: Point::new(0, 10)..Point::new(0, 13),
1744                        diagnostic: Diagnostic {
1745                            group_id: 1,
1746                            severity: lsp::DiagnosticSeverity::WARNING,
1747                            message: "message 2".to_string(),
1748                            is_primary: true,
1749                            ..Default::default()
1750                        }
1751                    }
1752                ]
1753            );
1754        });
1755    }
1756
1757    #[gpui::test]
1758    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1759        cx_a.foreground().forbid_parking();
1760
1761        // Connect to a server as 2 clients.
1762        let mut server = TestServer::start().await;
1763        let client_a = server.create_client(&mut cx_a, "user_a").await;
1764        let client_b = server.create_client(&mut cx_b, "user_b").await;
1765
1766        // Create an org that includes these 2 users.
1767        let db = &server.app_state.db;
1768        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1769        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1770            .await
1771            .unwrap();
1772        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1773            .await
1774            .unwrap();
1775
1776        // Create a channel that includes all the users.
1777        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1778        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1779            .await
1780            .unwrap();
1781        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1782            .await
1783            .unwrap();
1784        db.create_channel_message(
1785            channel_id,
1786            client_b.current_user_id(&cx_b),
1787            "hello A, it's B.",
1788            OffsetDateTime::now_utc(),
1789            1,
1790        )
1791        .await
1792        .unwrap();
1793
1794        let channels_a = cx_a
1795            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1796        channels_a
1797            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1798            .await;
1799        channels_a.read_with(&cx_a, |list, _| {
1800            assert_eq!(
1801                list.available_channels().unwrap(),
1802                &[ChannelDetails {
1803                    id: channel_id.to_proto(),
1804                    name: "test-channel".to_string()
1805                }]
1806            )
1807        });
1808        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1809            this.get_channel(channel_id.to_proto(), cx).unwrap()
1810        });
1811        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1812        channel_a
1813            .condition(&cx_a, |channel, _| {
1814                channel_messages(channel)
1815                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1816            })
1817            .await;
1818
1819        let channels_b = cx_b
1820            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1821        channels_b
1822            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1823            .await;
1824        channels_b.read_with(&cx_b, |list, _| {
1825            assert_eq!(
1826                list.available_channels().unwrap(),
1827                &[ChannelDetails {
1828                    id: channel_id.to_proto(),
1829                    name: "test-channel".to_string()
1830                }]
1831            )
1832        });
1833
1834        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1835            this.get_channel(channel_id.to_proto(), cx).unwrap()
1836        });
1837        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1838        channel_b
1839            .condition(&cx_b, |channel, _| {
1840                channel_messages(channel)
1841                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1842            })
1843            .await;
1844
1845        channel_a
1846            .update(&mut cx_a, |channel, cx| {
1847                channel
1848                    .send_message("oh, hi B.".to_string(), cx)
1849                    .unwrap()
1850                    .detach();
1851                let task = channel.send_message("sup".to_string(), cx).unwrap();
1852                assert_eq!(
1853                    channel_messages(channel),
1854                    &[
1855                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1856                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1857                        ("user_a".to_string(), "sup".to_string(), true)
1858                    ]
1859                );
1860                task
1861            })
1862            .await
1863            .unwrap();
1864
1865        channel_b
1866            .condition(&cx_b, |channel, _| {
1867                channel_messages(channel)
1868                    == [
1869                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1870                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1871                        ("user_a".to_string(), "sup".to_string(), false),
1872                    ]
1873            })
1874            .await;
1875
1876        assert_eq!(
1877            server
1878                .state()
1879                .await
1880                .channel(channel_id)
1881                .unwrap()
1882                .connection_ids
1883                .len(),
1884            2
1885        );
1886        cx_b.update(|_| drop(channel_b));
1887        server
1888            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1889            .await;
1890
1891        cx_a.update(|_| drop(channel_a));
1892        server
1893            .condition(|state| state.channel(channel_id).is_none())
1894            .await;
1895    }
1896
1897    #[gpui::test]
1898    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1899        cx_a.foreground().forbid_parking();
1900
1901        let mut server = TestServer::start().await;
1902        let client_a = server.create_client(&mut cx_a, "user_a").await;
1903
1904        let db = &server.app_state.db;
1905        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1906        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1907        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1908            .await
1909            .unwrap();
1910        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1911            .await
1912            .unwrap();
1913
1914        let channels_a = cx_a
1915            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1916        channels_a
1917            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1918            .await;
1919        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1920            this.get_channel(channel_id.to_proto(), cx).unwrap()
1921        });
1922
1923        // Messages aren't allowed to be too long.
1924        channel_a
1925            .update(&mut cx_a, |channel, cx| {
1926                let long_body = "this is long.\n".repeat(1024);
1927                channel.send_message(long_body, cx).unwrap()
1928            })
1929            .await
1930            .unwrap_err();
1931
1932        // Messages aren't allowed to be blank.
1933        channel_a.update(&mut cx_a, |channel, cx| {
1934            channel.send_message(String::new(), cx).unwrap_err()
1935        });
1936
1937        // Leading and trailing whitespace are trimmed.
1938        channel_a
1939            .update(&mut cx_a, |channel, cx| {
1940                channel
1941                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
1942                    .unwrap()
1943            })
1944            .await
1945            .unwrap();
1946        assert_eq!(
1947            db.get_channel_messages(channel_id, 10, None)
1948                .await
1949                .unwrap()
1950                .iter()
1951                .map(|m| &m.body)
1952                .collect::<Vec<_>>(),
1953            &["surrounded by whitespace"]
1954        );
1955    }
1956
1957    #[gpui::test]
1958    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1959        cx_a.foreground().forbid_parking();
1960
1961        // Connect to a server as 2 clients.
1962        let mut server = TestServer::start().await;
1963        let client_a = server.create_client(&mut cx_a, "user_a").await;
1964        let client_b = server.create_client(&mut cx_b, "user_b").await;
1965        let mut status_b = client_b.status();
1966
1967        // Create an org that includes these 2 users.
1968        let db = &server.app_state.db;
1969        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1970        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1971            .await
1972            .unwrap();
1973        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1974            .await
1975            .unwrap();
1976
1977        // Create a channel that includes all the users.
1978        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1979        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1980            .await
1981            .unwrap();
1982        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1983            .await
1984            .unwrap();
1985        db.create_channel_message(
1986            channel_id,
1987            client_b.current_user_id(&cx_b),
1988            "hello A, it's B.",
1989            OffsetDateTime::now_utc(),
1990            2,
1991        )
1992        .await
1993        .unwrap();
1994
1995        let channels_a = cx_a
1996            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1997        channels_a
1998            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1999            .await;
2000
2001        channels_a.read_with(&cx_a, |list, _| {
2002            assert_eq!(
2003                list.available_channels().unwrap(),
2004                &[ChannelDetails {
2005                    id: channel_id.to_proto(),
2006                    name: "test-channel".to_string()
2007                }]
2008            )
2009        });
2010        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2011            this.get_channel(channel_id.to_proto(), cx).unwrap()
2012        });
2013        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2014        channel_a
2015            .condition(&cx_a, |channel, _| {
2016                channel_messages(channel)
2017                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2018            })
2019            .await;
2020
2021        let channels_b = cx_b
2022            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2023        channels_b
2024            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2025            .await;
2026        channels_b.read_with(&cx_b, |list, _| {
2027            assert_eq!(
2028                list.available_channels().unwrap(),
2029                &[ChannelDetails {
2030                    id: channel_id.to_proto(),
2031                    name: "test-channel".to_string()
2032                }]
2033            )
2034        });
2035
2036        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2037            this.get_channel(channel_id.to_proto(), cx).unwrap()
2038        });
2039        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2040        channel_b
2041            .condition(&cx_b, |channel, _| {
2042                channel_messages(channel)
2043                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2044            })
2045            .await;
2046
2047        // Disconnect client B, ensuring we can still access its cached channel data.
2048        server.forbid_connections();
2049        server.disconnect_client(client_b.current_user_id(&cx_b));
2050        while !matches!(
2051            status_b.recv().await,
2052            Some(client::Status::ReconnectionError { .. })
2053        ) {}
2054
2055        channels_b.read_with(&cx_b, |channels, _| {
2056            assert_eq!(
2057                channels.available_channels().unwrap(),
2058                [ChannelDetails {
2059                    id: channel_id.to_proto(),
2060                    name: "test-channel".to_string()
2061                }]
2062            )
2063        });
2064        channel_b.read_with(&cx_b, |channel, _| {
2065            assert_eq!(
2066                channel_messages(channel),
2067                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2068            )
2069        });
2070
2071        // Send a message from client B while it is disconnected.
2072        channel_b
2073            .update(&mut cx_b, |channel, cx| {
2074                let task = channel
2075                    .send_message("can you see this?".to_string(), cx)
2076                    .unwrap();
2077                assert_eq!(
2078                    channel_messages(channel),
2079                    &[
2080                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2081                        ("user_b".to_string(), "can you see this?".to_string(), true)
2082                    ]
2083                );
2084                task
2085            })
2086            .await
2087            .unwrap_err();
2088
2089        // Send a message from client A while B is disconnected.
2090        channel_a
2091            .update(&mut cx_a, |channel, cx| {
2092                channel
2093                    .send_message("oh, hi B.".to_string(), cx)
2094                    .unwrap()
2095                    .detach();
2096                let task = channel.send_message("sup".to_string(), cx).unwrap();
2097                assert_eq!(
2098                    channel_messages(channel),
2099                    &[
2100                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2101                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2102                        ("user_a".to_string(), "sup".to_string(), true)
2103                    ]
2104                );
2105                task
2106            })
2107            .await
2108            .unwrap();
2109
2110        // Give client B a chance to reconnect.
2111        server.allow_connections();
2112        cx_b.foreground().advance_clock(Duration::from_secs(10));
2113
2114        // Verify that B sees the new messages upon reconnection, as well as the message client B
2115        // sent while offline.
2116        channel_b
2117            .condition(&cx_b, |channel, _| {
2118                channel_messages(channel)
2119                    == [
2120                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2121                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2122                        ("user_a".to_string(), "sup".to_string(), false),
2123                        ("user_b".to_string(), "can you see this?".to_string(), false),
2124                    ]
2125            })
2126            .await;
2127
2128        // Ensure client A and B can communicate normally after reconnection.
2129        channel_a
2130            .update(&mut cx_a, |channel, cx| {
2131                channel.send_message("you online?".to_string(), cx).unwrap()
2132            })
2133            .await
2134            .unwrap();
2135        channel_b
2136            .condition(&cx_b, |channel, _| {
2137                channel_messages(channel)
2138                    == [
2139                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2140                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2141                        ("user_a".to_string(), "sup".to_string(), false),
2142                        ("user_b".to_string(), "can you see this?".to_string(), false),
2143                        ("user_a".to_string(), "you online?".to_string(), false),
2144                    ]
2145            })
2146            .await;
2147
2148        channel_b
2149            .update(&mut cx_b, |channel, cx| {
2150                channel.send_message("yep".to_string(), cx).unwrap()
2151            })
2152            .await
2153            .unwrap();
2154        channel_a
2155            .condition(&cx_a, |channel, _| {
2156                channel_messages(channel)
2157                    == [
2158                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2159                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2160                        ("user_a".to_string(), "sup".to_string(), false),
2161                        ("user_b".to_string(), "can you see this?".to_string(), false),
2162                        ("user_a".to_string(), "you online?".to_string(), false),
2163                        ("user_b".to_string(), "yep".to_string(), false),
2164                    ]
2165            })
2166            .await;
2167    }
2168
2169    #[gpui::test]
2170    async fn test_contacts(
2171        mut cx_a: TestAppContext,
2172        mut cx_b: TestAppContext,
2173        mut cx_c: TestAppContext,
2174    ) {
2175        cx_a.foreground().forbid_parking();
2176        let lang_registry = Arc::new(LanguageRegistry::new());
2177
2178        // Connect to a server as 3 clients.
2179        let mut server = TestServer::start().await;
2180        let client_a = server.create_client(&mut cx_a, "user_a").await;
2181        let client_b = server.create_client(&mut cx_b, "user_b").await;
2182        let client_c = server.create_client(&mut cx_c, "user_c").await;
2183
2184        let fs = Arc::new(FakeFs::new());
2185
2186        // Share a worktree as client A.
2187        fs.insert_tree(
2188            "/a",
2189            json!({
2190                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2191            }),
2192        )
2193        .await;
2194
2195        let worktree_a = Worktree::open_local(
2196            client_a.clone(),
2197            client_a.user_store.clone(),
2198            "/a".as_ref(),
2199            fs.clone(),
2200            lang_registry.clone(),
2201            &mut cx_a.to_async(),
2202        )
2203        .await
2204        .unwrap();
2205
2206        client_a
2207            .user_store
2208            .condition(&cx_a, |user_store, _| {
2209                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2210            })
2211            .await;
2212        client_b
2213            .user_store
2214            .condition(&cx_b, |user_store, _| {
2215                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2216            })
2217            .await;
2218        client_c
2219            .user_store
2220            .condition(&cx_c, |user_store, _| {
2221                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2222            })
2223            .await;
2224
2225        let worktree_id = worktree_a
2226            .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2227            .await
2228            .unwrap();
2229
2230        let _worktree_b = Worktree::open_remote(
2231            client_b.clone(),
2232            worktree_id,
2233            lang_registry.clone(),
2234            client_b.user_store.clone(),
2235            &mut cx_b.to_async(),
2236        )
2237        .await
2238        .unwrap();
2239
2240        client_a
2241            .user_store
2242            .condition(&cx_a, |user_store, _| {
2243                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2244            })
2245            .await;
2246        client_b
2247            .user_store
2248            .condition(&cx_b, |user_store, _| {
2249                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2250            })
2251            .await;
2252        client_c
2253            .user_store
2254            .condition(&cx_c, |user_store, _| {
2255                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2256            })
2257            .await;
2258
2259        worktree_a
2260            .condition(&cx_a, |worktree, _| {
2261                worktree.collaborators().contains_key(&client_b.peer_id)
2262            })
2263            .await;
2264
2265        cx_a.update(move |_| drop(worktree_a));
2266        client_a
2267            .user_store
2268            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2269            .await;
2270        client_b
2271            .user_store
2272            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2273            .await;
2274        client_c
2275            .user_store
2276            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2277            .await;
2278
2279        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2280            user_store
2281                .contacts()
2282                .iter()
2283                .map(|contact| {
2284                    let worktrees = contact
2285                        .worktrees
2286                        .iter()
2287                        .map(|w| {
2288                            (
2289                                w.root_name.as_str(),
2290                                w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2291                            )
2292                        })
2293                        .collect();
2294                    (contact.user.github_login.as_str(), worktrees)
2295                })
2296                .collect()
2297        }
2298    }
2299
2300    struct TestServer {
2301        peer: Arc<Peer>,
2302        app_state: Arc<AppState>,
2303        server: Arc<Server>,
2304        notifications: mpsc::Receiver<()>,
2305        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2306        forbid_connections: Arc<AtomicBool>,
2307        _test_db: TestDb,
2308    }
2309
2310    impl TestServer {
2311        async fn start() -> Self {
2312            let test_db = TestDb::new();
2313            let app_state = Self::build_app_state(&test_db).await;
2314            let peer = Peer::new();
2315            let notifications = mpsc::channel(128);
2316            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2317            Self {
2318                peer,
2319                app_state,
2320                server,
2321                notifications: notifications.1,
2322                connection_killers: Default::default(),
2323                forbid_connections: Default::default(),
2324                _test_db: test_db,
2325            }
2326        }
2327
2328        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2329            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2330            let client_name = name.to_string();
2331            let mut client = Client::new();
2332            let server = self.server.clone();
2333            let connection_killers = self.connection_killers.clone();
2334            let forbid_connections = self.forbid_connections.clone();
2335            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2336
2337            Arc::get_mut(&mut client)
2338                .unwrap()
2339                .override_authenticate(move |cx| {
2340                    cx.spawn(|_| async move {
2341                        let access_token = "the-token".to_string();
2342                        Ok(Credentials {
2343                            user_id: user_id.0 as u64,
2344                            access_token,
2345                        })
2346                    })
2347                })
2348                .override_establish_connection(move |credentials, cx| {
2349                    assert_eq!(credentials.user_id, user_id.0 as u64);
2350                    assert_eq!(credentials.access_token, "the-token");
2351
2352                    let server = server.clone();
2353                    let connection_killers = connection_killers.clone();
2354                    let forbid_connections = forbid_connections.clone();
2355                    let client_name = client_name.clone();
2356                    let connection_id_tx = connection_id_tx.clone();
2357                    cx.spawn(move |cx| async move {
2358                        if forbid_connections.load(SeqCst) {
2359                            Err(EstablishConnectionError::other(anyhow!(
2360                                "server is forbidding connections"
2361                            )))
2362                        } else {
2363                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2364                            connection_killers.lock().insert(user_id, kill_conn);
2365                            cx.background()
2366                                .spawn(server.handle_connection(
2367                                    server_conn,
2368                                    client_name,
2369                                    user_id,
2370                                    Some(connection_id_tx),
2371                                ))
2372                                .detach();
2373                            Ok(client_conn)
2374                        }
2375                    })
2376                });
2377
2378            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2379            client
2380                .authenticate_and_connect(&cx.to_async())
2381                .await
2382                .unwrap();
2383
2384            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2385            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2386            let mut authed_user =
2387                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2388            while authed_user.recv().await.unwrap().is_none() {}
2389
2390            TestClient {
2391                client,
2392                peer_id,
2393                user_store,
2394            }
2395        }
2396
2397        fn disconnect_client(&self, user_id: UserId) {
2398            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2399                let _ = kill_conn.try_send(Some(()));
2400            }
2401        }
2402
2403        fn forbid_connections(&self) {
2404            self.forbid_connections.store(true, SeqCst);
2405        }
2406
2407        fn allow_connections(&self) {
2408            self.forbid_connections.store(false, SeqCst);
2409        }
2410
2411        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2412            let mut config = Config::default();
2413            config.session_secret = "a".repeat(32);
2414            config.database_url = test_db.url.clone();
2415            let github_client = github::AppClient::test();
2416            Arc::new(AppState {
2417                db: test_db.db().clone(),
2418                handlebars: Default::default(),
2419                auth_client: auth::build_client("", ""),
2420                repo_client: github::RepoClient::test(&github_client),
2421                github_client,
2422                config,
2423            })
2424        }
2425
2426        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2427            self.server.store.read()
2428        }
2429
2430        async fn condition<F>(&mut self, mut predicate: F)
2431        where
2432            F: FnMut(&Store) -> bool,
2433        {
2434            async_std::future::timeout(Duration::from_millis(500), async {
2435                while !(predicate)(&*self.server.store.read()) {
2436                    self.notifications.recv().await;
2437                }
2438            })
2439            .await
2440            .expect("condition timed out");
2441        }
2442    }
2443
2444    impl Drop for TestServer {
2445        fn drop(&mut self) {
2446            task::block_on(self.peer.reset());
2447        }
2448    }
2449
2450    struct TestClient {
2451        client: Arc<Client>,
2452        pub peer_id: PeerId,
2453        pub user_store: ModelHandle<UserStore>,
2454    }
2455
2456    impl Deref for TestClient {
2457        type Target = Arc<Client>;
2458
2459        fn deref(&self) -> &Self::Target {
2460            &self.client
2461        }
2462    }
2463
2464    impl TestClient {
2465        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2466            UserId::from_proto(
2467                self.user_store
2468                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2469            )
2470        }
2471    }
2472
2473    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2474        channel
2475            .messages()
2476            .cursor::<()>()
2477            .map(|m| {
2478                (
2479                    m.sender.github_login.clone(),
2480                    m.body.clone(),
2481                    m.is_pending(),
2482                )
2483            })
2484            .collect()
2485    }
2486
2487    struct EmptyView;
2488
2489    impl gpui::Entity for EmptyView {
2490        type Event = ();
2491    }
2492
2493    impl gpui::View for EmptyView {
2494        fn ui_name() -> &'static str {
2495            "empty view"
2496        }
2497
2498        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2499            gpui::Element::boxed(gpui::elements::Empty)
2500        }
2501    }
2502}