rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::open_buffer)
  75            .add_handler(Server::close_buffer)
  76            .add_handler(Server::update_buffer)
  77            .add_handler(Server::buffer_saved)
  78            .add_handler(Server::save_buffer)
  79            .add_handler(Server::get_channels)
  80            .add_handler(Server::get_users)
  81            .add_handler(Server::join_channel)
  82            .add_handler(Server::leave_channel)
  83            .add_handler(Server::send_channel_message)
  84            .add_handler(Server::get_channel_messages);
  85
  86        Arc::new(server)
  87    }
  88
  89    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  90    where
  91        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  92        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  93        M: EnvelopedMessage,
  94    {
  95        let prev_handler = self.handlers.insert(
  96            TypeId::of::<M>(),
  97            Box::new(move |server, envelope| {
  98                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
  99                (handler)(server, *envelope).boxed()
 100            }),
 101        );
 102        if prev_handler.is_some() {
 103            panic!("registered a handler for the same message twice");
 104        }
 105        self
 106    }
 107
 108    pub fn handle_connection(
 109        self: &Arc<Self>,
 110        connection: Connection,
 111        addr: String,
 112        user_id: UserId,
 113        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 114    ) -> impl Future<Output = ()> {
 115        let mut this = self.clone();
 116        async move {
 117            let (connection_id, handle_io, mut incoming_rx) =
 118                this.peer.add_connection(connection).await;
 119
 120            if let Some(send_connection_id) = send_connection_id.as_mut() {
 121                let _ = send_connection_id.send(connection_id).await;
 122            }
 123
 124            this.state_mut().add_connection(connection_id, user_id);
 125            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 126                log::error!("error updating contacts for {:?}: {}", user_id, err);
 127            }
 128
 129            let handle_io = handle_io.fuse();
 130            futures::pin_mut!(handle_io);
 131            loop {
 132                let next_message = incoming_rx.recv().fuse();
 133                futures::pin_mut!(next_message);
 134                futures::select_biased! {
 135                    message = next_message => {
 136                        if let Some(message) = message {
 137                            let start_time = Instant::now();
 138                            log::info!("RPC message received: {}", message.payload_type_name());
 139                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 140                                if let Err(err) = (handler)(this.clone(), message).await {
 141                                    log::error!("error handling message: {:?}", err);
 142                                } else {
 143                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 144                                }
 145
 146                                if let Some(mut notifications) = this.notifications.clone() {
 147                                    let _ = notifications.send(()).await;
 148                                }
 149                            } else {
 150                                log::warn!("unhandled message: {}", message.payload_type_name());
 151                            }
 152                        } else {
 153                            log::info!("rpc connection closed {:?}", addr);
 154                            break;
 155                        }
 156                    }
 157                    handle_io = handle_io => {
 158                        if let Err(err) = handle_io {
 159                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 160                        }
 161                        break;
 162                    }
 163                }
 164            }
 165
 166            if let Err(err) = this.sign_out(connection_id).await {
 167                log::error!("error signing out connection {:?} - {:?}", addr, err);
 168            }
 169        }
 170    }
 171
 172    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 173        self.peer.disconnect(connection_id).await;
 174        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 175
 176        for (project_id, project) in removed_connection.hosted_projects {
 177            if let Some(share) = project.share {
 178                broadcast(
 179                    connection_id,
 180                    share.guests.keys().copied().collect(),
 181                    |conn_id| {
 182                        self.peer
 183                            .send(conn_id, proto::UnshareProject { project_id })
 184                    },
 185                )
 186                .await?;
 187            }
 188        }
 189
 190        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 191            broadcast(connection_id, peer_ids, |conn_id| {
 192                self.peer.send(
 193                    conn_id,
 194                    proto::RemoveProjectCollaborator {
 195                        project_id,
 196                        peer_id: connection_id.0,
 197                    },
 198                )
 199            })
 200            .await?;
 201        }
 202
 203        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 204            .await?;
 205
 206        Ok(())
 207    }
 208
 209    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 210        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 211        Ok(())
 212    }
 213
 214    async fn register_project(
 215        mut self: Arc<Server>,
 216        request: TypedEnvelope<proto::RegisterProject>,
 217    ) -> tide::Result<()> {
 218        let mut state = self.state_mut();
 219        let user_id = state.user_id_for_connection(request.sender_id)?;
 220        state.register_project(request.sender_id, user_id);
 221        Ok(())
 222    }
 223
 224    async fn unregister_project(
 225        mut self: Arc<Server>,
 226        request: TypedEnvelope<proto::UnregisterProject>,
 227    ) -> tide::Result<()> {
 228        self.state_mut()
 229            .unregister_project(request.payload.project_id, request.sender_id);
 230        Ok(())
 231    }
 232
 233    async fn share_project(
 234        mut self: Arc<Server>,
 235        request: TypedEnvelope<proto::ShareProject>,
 236    ) -> tide::Result<()> {
 237        self.state_mut()
 238            .share_project(request.payload.project_id, request.sender_id);
 239        Ok(())
 240    }
 241
 242    async fn unshare_project(
 243        mut self: Arc<Server>,
 244        request: TypedEnvelope<proto::UnshareProject>,
 245    ) -> tide::Result<()> {
 246        let project_id = request.payload.project_id;
 247        let project = self
 248            .state_mut()
 249            .unshare_project(project_id, request.sender_id)?;
 250
 251        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 252            self.peer
 253                .send(conn_id, proto::UnshareProject { project_id })
 254        })
 255        .await?;
 256        self.update_contacts_for_users(&project.authorized_user_ids)
 257            .await?;
 258
 259        Ok(())
 260    }
 261
 262    async fn join_project(
 263        mut self: Arc<Server>,
 264        request: TypedEnvelope<proto::JoinProject>,
 265    ) -> tide::Result<()> {
 266        let project_id = request.payload.project_id;
 267
 268        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 269        let response_data = self
 270            .state_mut()
 271            .join_project(request.sender_id, user_id, project_id)
 272            .and_then(|joined| {
 273                let share = joined.project.share()?;
 274                let peer_count = share.guests.len();
 275                let mut collaborators = Vec::with_capacity(peer_count);
 276                collaborators.push(proto::Collaborator {
 277                    peer_id: joined.project.host_connection_id.0,
 278                    replica_id: 0,
 279                    user_id: joined.project.host_user_id.to_proto(),
 280                });
 281                let worktrees = joined
 282                    .project
 283                    .worktrees
 284                    .values()
 285                    .filter_map(|worktree| {
 286                        worktree.share.as_ref().map(|share| proto::Worktree {
 287                            id: project_id,
 288                            root_name: worktree.root_name.clone(),
 289                            entries: share.entries.values().cloned().collect(),
 290                        })
 291                    })
 292                    .collect();
 293                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 294                    if *peer_conn_id != request.sender_id {
 295                        collaborators.push(proto::Collaborator {
 296                            peer_id: peer_conn_id.0,
 297                            replica_id: *peer_replica_id as u32,
 298                            user_id: peer_user_id.to_proto(),
 299                        });
 300                    }
 301                }
 302                let response = proto::JoinProjectResponse {
 303                    worktrees,
 304                    replica_id: joined.replica_id as u32,
 305                    collaborators,
 306                };
 307                let connection_ids = joined.project.connection_ids();
 308                let contact_user_ids = joined.project.authorized_user_ids();
 309                Ok((response, connection_ids, contact_user_ids))
 310            });
 311
 312        match response_data {
 313            Ok((response, connection_ids, contact_user_ids)) => {
 314                broadcast(request.sender_id, connection_ids, |conn_id| {
 315                    self.peer.send(
 316                        conn_id,
 317                        proto::AddProjectCollaborator {
 318                            project_id: project_id,
 319                            collaborator: Some(proto::Collaborator {
 320                                peer_id: request.sender_id.0,
 321                                replica_id: response.replica_id,
 322                                user_id: user_id.to_proto(),
 323                            }),
 324                        },
 325                    )
 326                })
 327                .await?;
 328                self.peer.respond(request.receipt(), response).await?;
 329                self.update_contacts_for_users(&contact_user_ids).await?;
 330            }
 331            Err(error) => {
 332                self.peer
 333                    .respond_with_error(
 334                        request.receipt(),
 335                        proto::Error {
 336                            message: error.to_string(),
 337                        },
 338                    )
 339                    .await?;
 340            }
 341        }
 342
 343        Ok(())
 344    }
 345
 346    async fn leave_project(
 347        mut self: Arc<Server>,
 348        request: TypedEnvelope<proto::LeaveProject>,
 349    ) -> tide::Result<()> {
 350        let sender_id = request.sender_id;
 351        let project_id = request.payload.project_id;
 352        let worktree = self.state_mut().leave_project(sender_id, project_id);
 353        if let Some(worktree) = worktree {
 354            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 355                self.peer.send(
 356                    conn_id,
 357                    proto::RemoveProjectCollaborator {
 358                        project_id,
 359                        peer_id: sender_id.0,
 360                    },
 361                )
 362            })
 363            .await?;
 364            self.update_contacts_for_users(&worktree.authorized_user_ids)
 365                .await?;
 366        }
 367        Ok(())
 368    }
 369
 370    async fn register_worktree(
 371        mut self: Arc<Server>,
 372        request: TypedEnvelope<proto::RegisterWorktree>,
 373    ) -> tide::Result<()> {
 374        let receipt = request.receipt();
 375        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 376
 377        let mut contact_user_ids = HashSet::default();
 378        contact_user_ids.insert(host_user_id);
 379        for github_login in request.payload.authorized_logins {
 380            match self.app_state.db.create_user(&github_login, false).await {
 381                Ok(contact_user_id) => {
 382                    contact_user_ids.insert(contact_user_id);
 383                }
 384                Err(err) => {
 385                    let message = err.to_string();
 386                    self.peer
 387                        .respond_with_error(receipt, proto::Error { message })
 388                        .await?;
 389                    return Ok(());
 390                }
 391            }
 392        }
 393
 394        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 395        let ok = self.state_mut().register_worktree(
 396            request.payload.project_id,
 397            request.payload.worktree_id,
 398            Worktree {
 399                authorized_user_ids: contact_user_ids.clone(),
 400                root_name: request.payload.root_name,
 401                share: None,
 402            },
 403        );
 404
 405        if ok {
 406            self.peer.respond(receipt, proto::Ack {}).await?;
 407            self.update_contacts_for_users(&contact_user_ids).await?;
 408        } else {
 409            self.peer
 410                .respond_with_error(
 411                    receipt,
 412                    proto::Error {
 413                        message: NO_SUCH_PROJECT.to_string(),
 414                    },
 415                )
 416                .await?;
 417        }
 418
 419        Ok(())
 420    }
 421
 422    async fn unregister_worktree(
 423        mut self: Arc<Server>,
 424        request: TypedEnvelope<proto::UnregisterWorktree>,
 425    ) -> tide::Result<()> {
 426        let project_id = request.payload.project_id;
 427        let worktree_id = request.payload.worktree_id;
 428        let (worktree, guest_connection_ids) =
 429            self.state_mut()
 430                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 431
 432        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 433            self.peer.send(
 434                conn_id,
 435                proto::UnregisterWorktree {
 436                    project_id,
 437                    worktree_id,
 438                },
 439            )
 440        })
 441        .await?;
 442        self.update_contacts_for_users(&worktree.authorized_user_ids)
 443            .await?;
 444        Ok(())
 445    }
 446
 447    async fn share_worktree(
 448        mut self: Arc<Server>,
 449        mut request: TypedEnvelope<proto::ShareWorktree>,
 450    ) -> tide::Result<()> {
 451        let worktree = request
 452            .payload
 453            .worktree
 454            .as_mut()
 455            .ok_or_else(|| anyhow!("missing worktree"))?;
 456        let entries = mem::take(&mut worktree.entries)
 457            .into_iter()
 458            .map(|entry| (entry.id, entry))
 459            .collect();
 460
 461        let contact_user_ids = self.state_mut().share_worktree(
 462            request.payload.project_id,
 463            worktree.id,
 464            request.sender_id,
 465            entries,
 466        );
 467        if let Some(contact_user_ids) = contact_user_ids {
 468            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 469            self.update_contacts_for_users(&contact_user_ids).await?;
 470        } else {
 471            self.peer
 472                .respond_with_error(
 473                    request.receipt(),
 474                    proto::Error {
 475                        message: "no such worktree".to_string(),
 476                    },
 477                )
 478                .await?;
 479        }
 480        Ok(())
 481    }
 482
 483    async fn update_worktree(
 484        mut self: Arc<Server>,
 485        request: TypedEnvelope<proto::UpdateWorktree>,
 486    ) -> tide::Result<()> {
 487        let connection_ids = self
 488            .state_mut()
 489            .update_worktree(
 490                request.sender_id,
 491                request.payload.project_id,
 492                request.payload.worktree_id,
 493                &request.payload.removed_entries,
 494                &request.payload.updated_entries,
 495            )
 496            .ok_or_else(|| anyhow!("no such worktree"))?;
 497
 498        broadcast(request.sender_id, connection_ids, |connection_id| {
 499            self.peer
 500                .forward_send(request.sender_id, connection_id, request.payload.clone())
 501        })
 502        .await?;
 503
 504        Ok(())
 505    }
 506
 507    async fn open_buffer(
 508        self: Arc<Server>,
 509        request: TypedEnvelope<proto::OpenBuffer>,
 510    ) -> tide::Result<()> {
 511        let receipt = request.receipt();
 512        let host_connection_id = self
 513            .state()
 514            .read_project(request.payload.project_id, request.sender_id)
 515            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 516            .host_connection_id;
 517        let response = self
 518            .peer
 519            .forward_request(request.sender_id, host_connection_id, request.payload)
 520            .await?;
 521        self.peer.respond(receipt, response).await?;
 522        Ok(())
 523    }
 524
 525    async fn close_buffer(
 526        self: Arc<Server>,
 527        request: TypedEnvelope<proto::CloseBuffer>,
 528    ) -> tide::Result<()> {
 529        let host_connection_id = self
 530            .state()
 531            .read_project(request.payload.project_id, request.sender_id)
 532            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 533            .host_connection_id;
 534        self.peer
 535            .forward_send(request.sender_id, host_connection_id, request.payload)
 536            .await?;
 537        Ok(())
 538    }
 539
 540    async fn save_buffer(
 541        self: Arc<Server>,
 542        request: TypedEnvelope<proto::SaveBuffer>,
 543    ) -> tide::Result<()> {
 544        let host;
 545        let guests;
 546        {
 547            let state = self.state();
 548            let project = state
 549                .read_project(request.payload.project_id, request.sender_id)
 550                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 551            host = project.host_connection_id;
 552            guests = project.guest_connection_ids()
 553        }
 554
 555        let sender = request.sender_id;
 556        let receipt = request.receipt();
 557        let response = self
 558            .peer
 559            .forward_request(sender, host, request.payload.clone())
 560            .await?;
 561
 562        broadcast(host, guests, |conn_id| {
 563            let response = response.clone();
 564            let peer = &self.peer;
 565            async move {
 566                if conn_id == sender {
 567                    peer.respond(receipt, response).await
 568                } else {
 569                    peer.forward_send(host, conn_id, response).await
 570                }
 571            }
 572        })
 573        .await?;
 574
 575        Ok(())
 576    }
 577
 578    async fn update_buffer(
 579        self: Arc<Server>,
 580        request: TypedEnvelope<proto::UpdateBuffer>,
 581    ) -> tide::Result<()> {
 582        let receiver_ids = self
 583            .state()
 584            .project_connection_ids(request.payload.project_id, request.sender_id)
 585            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 586        broadcast(request.sender_id, receiver_ids, |connection_id| {
 587            self.peer
 588                .forward_send(request.sender_id, connection_id, request.payload.clone())
 589        })
 590        .await?;
 591        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 592        Ok(())
 593    }
 594
 595    async fn buffer_saved(
 596        self: Arc<Server>,
 597        request: TypedEnvelope<proto::BufferSaved>,
 598    ) -> tide::Result<()> {
 599        let receiver_ids = self
 600            .state()
 601            .project_connection_ids(request.payload.project_id, request.sender_id)
 602            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 603        broadcast(request.sender_id, receiver_ids, |connection_id| {
 604            self.peer
 605                .forward_send(request.sender_id, connection_id, request.payload.clone())
 606        })
 607        .await?;
 608        Ok(())
 609    }
 610
 611    async fn get_channels(
 612        self: Arc<Server>,
 613        request: TypedEnvelope<proto::GetChannels>,
 614    ) -> tide::Result<()> {
 615        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 616        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 617        self.peer
 618            .respond(
 619                request.receipt(),
 620                proto::GetChannelsResponse {
 621                    channels: channels
 622                        .into_iter()
 623                        .map(|chan| proto::Channel {
 624                            id: chan.id.to_proto(),
 625                            name: chan.name,
 626                        })
 627                        .collect(),
 628                },
 629            )
 630            .await?;
 631        Ok(())
 632    }
 633
 634    async fn get_users(
 635        self: Arc<Server>,
 636        request: TypedEnvelope<proto::GetUsers>,
 637    ) -> tide::Result<()> {
 638        let receipt = request.receipt();
 639        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 640        let users = self
 641            .app_state
 642            .db
 643            .get_users_by_ids(user_ids)
 644            .await?
 645            .into_iter()
 646            .map(|user| proto::User {
 647                id: user.id.to_proto(),
 648                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 649                github_login: user.github_login,
 650            })
 651            .collect();
 652        self.peer
 653            .respond(receipt, proto::GetUsersResponse { users })
 654            .await?;
 655        Ok(())
 656    }
 657
 658    async fn update_contacts_for_users<'a>(
 659        self: &Arc<Server>,
 660        user_ids: impl IntoIterator<Item = &'a UserId>,
 661    ) -> tide::Result<()> {
 662        let mut send_futures = Vec::new();
 663
 664        {
 665            let state = self.state();
 666            for user_id in user_ids {
 667                let contacts = state.contacts_for_user(*user_id);
 668                for connection_id in state.connection_ids_for_user(*user_id) {
 669                    send_futures.push(self.peer.send(
 670                        connection_id,
 671                        proto::UpdateContacts {
 672                            contacts: contacts.clone(),
 673                        },
 674                    ));
 675                }
 676            }
 677        }
 678        futures::future::try_join_all(send_futures).await?;
 679
 680        Ok(())
 681    }
 682
 683    async fn join_channel(
 684        mut self: Arc<Self>,
 685        request: TypedEnvelope<proto::JoinChannel>,
 686    ) -> tide::Result<()> {
 687        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 688        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 689        if !self
 690            .app_state
 691            .db
 692            .can_user_access_channel(user_id, channel_id)
 693            .await?
 694        {
 695            Err(anyhow!("access denied"))?;
 696        }
 697
 698        self.state_mut().join_channel(request.sender_id, channel_id);
 699        let messages = self
 700            .app_state
 701            .db
 702            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 703            .await?
 704            .into_iter()
 705            .map(|msg| proto::ChannelMessage {
 706                id: msg.id.to_proto(),
 707                body: msg.body,
 708                timestamp: msg.sent_at.unix_timestamp() as u64,
 709                sender_id: msg.sender_id.to_proto(),
 710                nonce: Some(msg.nonce.as_u128().into()),
 711            })
 712            .collect::<Vec<_>>();
 713        self.peer
 714            .respond(
 715                request.receipt(),
 716                proto::JoinChannelResponse {
 717                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 718                    messages,
 719                },
 720            )
 721            .await?;
 722        Ok(())
 723    }
 724
 725    async fn leave_channel(
 726        mut self: Arc<Self>,
 727        request: TypedEnvelope<proto::LeaveChannel>,
 728    ) -> tide::Result<()> {
 729        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 730        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 731        if !self
 732            .app_state
 733            .db
 734            .can_user_access_channel(user_id, channel_id)
 735            .await?
 736        {
 737            Err(anyhow!("access denied"))?;
 738        }
 739
 740        self.state_mut()
 741            .leave_channel(request.sender_id, channel_id);
 742
 743        Ok(())
 744    }
 745
 746    async fn send_channel_message(
 747        self: Arc<Self>,
 748        request: TypedEnvelope<proto::SendChannelMessage>,
 749    ) -> tide::Result<()> {
 750        let receipt = request.receipt();
 751        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 752        let user_id;
 753        let connection_ids;
 754        {
 755            let state = self.state();
 756            user_id = state.user_id_for_connection(request.sender_id)?;
 757            if let Some(ids) = state.channel_connection_ids(channel_id) {
 758                connection_ids = ids;
 759            } else {
 760                return Ok(());
 761            }
 762        }
 763
 764        // Validate the message body.
 765        let body = request.payload.body.trim().to_string();
 766        if body.len() > MAX_MESSAGE_LEN {
 767            self.peer
 768                .respond_with_error(
 769                    receipt,
 770                    proto::Error {
 771                        message: "message is too long".to_string(),
 772                    },
 773                )
 774                .await?;
 775            return Ok(());
 776        }
 777        if body.is_empty() {
 778            self.peer
 779                .respond_with_error(
 780                    receipt,
 781                    proto::Error {
 782                        message: "message can't be blank".to_string(),
 783                    },
 784                )
 785                .await?;
 786            return Ok(());
 787        }
 788
 789        let timestamp = OffsetDateTime::now_utc();
 790        let nonce = if let Some(nonce) = request.payload.nonce {
 791            nonce
 792        } else {
 793            self.peer
 794                .respond_with_error(
 795                    receipt,
 796                    proto::Error {
 797                        message: "nonce can't be blank".to_string(),
 798                    },
 799                )
 800                .await?;
 801            return Ok(());
 802        };
 803
 804        let message_id = self
 805            .app_state
 806            .db
 807            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 808            .await?
 809            .to_proto();
 810        let message = proto::ChannelMessage {
 811            sender_id: user_id.to_proto(),
 812            id: message_id,
 813            body,
 814            timestamp: timestamp.unix_timestamp() as u64,
 815            nonce: Some(nonce),
 816        };
 817        broadcast(request.sender_id, connection_ids, |conn_id| {
 818            self.peer.send(
 819                conn_id,
 820                proto::ChannelMessageSent {
 821                    channel_id: channel_id.to_proto(),
 822                    message: Some(message.clone()),
 823                },
 824            )
 825        })
 826        .await?;
 827        self.peer
 828            .respond(
 829                receipt,
 830                proto::SendChannelMessageResponse {
 831                    message: Some(message),
 832                },
 833            )
 834            .await?;
 835        Ok(())
 836    }
 837
 838    async fn get_channel_messages(
 839        self: Arc<Self>,
 840        request: TypedEnvelope<proto::GetChannelMessages>,
 841    ) -> tide::Result<()> {
 842        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 843        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 844        if !self
 845            .app_state
 846            .db
 847            .can_user_access_channel(user_id, channel_id)
 848            .await?
 849        {
 850            Err(anyhow!("access denied"))?;
 851        }
 852
 853        let messages = self
 854            .app_state
 855            .db
 856            .get_channel_messages(
 857                channel_id,
 858                MESSAGE_COUNT_PER_PAGE,
 859                Some(MessageId::from_proto(request.payload.before_message_id)),
 860            )
 861            .await?
 862            .into_iter()
 863            .map(|msg| proto::ChannelMessage {
 864                id: msg.id.to_proto(),
 865                body: msg.body,
 866                timestamp: msg.sent_at.unix_timestamp() as u64,
 867                sender_id: msg.sender_id.to_proto(),
 868                nonce: Some(msg.nonce.as_u128().into()),
 869            })
 870            .collect::<Vec<_>>();
 871        self.peer
 872            .respond(
 873                request.receipt(),
 874                proto::GetChannelMessagesResponse {
 875                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 876                    messages,
 877                },
 878            )
 879            .await?;
 880        Ok(())
 881    }
 882
 883    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 884        self.store.read()
 885    }
 886
 887    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 888        self.store.write()
 889    }
 890}
 891
 892pub async fn broadcast<F, T>(
 893    sender_id: ConnectionId,
 894    receiver_ids: Vec<ConnectionId>,
 895    mut f: F,
 896) -> anyhow::Result<()>
 897where
 898    F: FnMut(ConnectionId) -> T,
 899    T: Future<Output = anyhow::Result<()>>,
 900{
 901    let futures = receiver_ids
 902        .into_iter()
 903        .filter(|id| *id != sender_id)
 904        .map(|id| f(id));
 905    futures::future::try_join_all(futures).await?;
 906    Ok(())
 907}
 908
 909pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 910    let server = Server::new(app.state().clone(), rpc.clone(), None);
 911    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 912        let server = server.clone();
 913        async move {
 914            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 915
 916            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 917            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 918            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 919            let client_protocol_version: Option<u32> = request
 920                .header("X-Zed-Protocol-Version")
 921                .and_then(|v| v.as_str().parse().ok());
 922
 923            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 924                return Ok(Response::new(StatusCode::UpgradeRequired));
 925            }
 926
 927            let header = match request.header("Sec-Websocket-Key") {
 928                Some(h) => h.as_str(),
 929                None => return Err(anyhow!("expected sec-websocket-key"))?,
 930            };
 931
 932            let user_id = process_auth_header(&request).await?;
 933
 934            let mut response = Response::new(StatusCode::SwitchingProtocols);
 935            response.insert_header(UPGRADE, "websocket");
 936            response.insert_header(CONNECTION, "Upgrade");
 937            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 938            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 939            response.insert_header("Sec-Websocket-Version", "13");
 940
 941            let http_res: &mut tide::http::Response = response.as_mut();
 942            let upgrade_receiver = http_res.recv_upgrade().await;
 943            let addr = request.remote().unwrap_or("unknown").to_string();
 944            task::spawn(async move {
 945                if let Some(stream) = upgrade_receiver.await {
 946                    server
 947                        .handle_connection(
 948                            Connection::new(
 949                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 950                            ),
 951                            addr,
 952                            user_id,
 953                            None,
 954                        )
 955                        .await;
 956                }
 957            });
 958
 959            Ok(response)
 960        }
 961    });
 962}
 963
 964fn header_contains_ignore_case<T>(
 965    request: &tide::Request<T>,
 966    header_name: HeaderName,
 967    value: &str,
 968) -> bool {
 969    request
 970        .header(header_name)
 971        .map(|h| {
 972            h.as_str()
 973                .split(',')
 974                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 975        })
 976        .unwrap_or(false)
 977}
 978
 979#[cfg(test)]
 980mod tests {
 981    use super::*;
 982    use crate::{
 983        auth,
 984        db::{tests::TestDb, UserId},
 985        github, AppState, Config,
 986    };
 987    use ::rpc::Peer;
 988    use async_std::task;
 989    use gpui::{ModelHandle, TestAppContext};
 990    use parking_lot::Mutex;
 991    use postage::{mpsc, watch};
 992    use rpc::PeerId;
 993    use serde_json::json;
 994    use sqlx::types::time::OffsetDateTime;
 995    use std::{
 996        ops::Deref,
 997        path::Path,
 998        sync::{
 999            atomic::{AtomicBool, Ordering::SeqCst},
1000            Arc,
1001        },
1002        time::Duration,
1003    };
1004    use zed::{
1005        client::{
1006            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1007            EstablishConnectionError, UserStore,
1008        },
1009        editor::{Editor, EditorSettings, Input, MultiBuffer},
1010        fs::{FakeFs, Fs as _},
1011        language::{
1012            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1013            LanguageRegistry, LanguageServerConfig, Point,
1014        },
1015        lsp,
1016        project::Project,
1017    };
1018
1019    #[gpui::test]
1020    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1021        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1022        let lang_registry = Arc::new(LanguageRegistry::new());
1023        let fs = Arc::new(FakeFs::new());
1024        cx_a.foreground().forbid_parking();
1025
1026        // Connect to a server as 2 clients.
1027        let mut server = TestServer::start().await;
1028        let client_a = server.create_client(&mut cx_a, "user_a").await;
1029        let client_b = server.create_client(&mut cx_b, "user_b").await;
1030
1031        // Share a project as client A
1032        fs.insert_tree(
1033            "/a",
1034            json!({
1035                ".zed.toml": r#"collaborators = ["user_b"]"#,
1036                "a.txt": "a-contents",
1037                "b.txt": "b-contents",
1038            }),
1039        )
1040        .await;
1041        let project_a = cx_a.update(|cx| {
1042            Project::local(
1043                client_a.clone(),
1044                client_a.user_store.clone(),
1045                lang_registry.clone(),
1046                fs.clone(),
1047                cx,
1048            )
1049        });
1050        let worktree_a = project_a
1051            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1052            .await
1053            .unwrap();
1054        worktree_a
1055            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1056            .await;
1057        let project_id = project_a
1058            .update(&mut cx_a, |project, _| project.next_remote_id())
1059            .await;
1060        project_a
1061            .update(&mut cx_a, |project, cx| project.share(cx))
1062            .await
1063            .unwrap();
1064
1065        // Join that project as client B, and see that a guest has joined as client A.
1066        let project_b = Project::remote(
1067            project_id,
1068            client_b.clone(),
1069            client_b.user_store.clone(),
1070            lang_registry.clone(),
1071            fs.clone(),
1072            &mut cx_b.to_async(),
1073        )
1074        .await
1075        .unwrap();
1076        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1077
1078        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1079            assert_eq!(
1080                project
1081                    .collaborators()
1082                    .get(&client_a.peer_id)
1083                    .unwrap()
1084                    .user
1085                    .github_login,
1086                "user_a"
1087            );
1088            project.replica_id()
1089        });
1090        project_a
1091            .condition(&cx_a, |tree, _| {
1092                tree.collaborators()
1093                    .get(&client_b.peer_id)
1094                    .map_or(false, |collaborator| {
1095                        collaborator.replica_id == replica_id_b
1096                            && collaborator.user.github_login == "user_b"
1097                    })
1098            })
1099            .await;
1100
1101        // Open the same file as client B and client A.
1102        let buffer_b = worktree_b
1103            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1104            .await
1105            .unwrap();
1106        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1107        buffer_b.read_with(&cx_b, |buf, cx| {
1108            assert_eq!(buf.read(cx).text(), "b-contents")
1109        });
1110        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1111        let buffer_a = worktree_a
1112            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1113            .await
1114            .unwrap();
1115
1116        let editor_b = cx_b.add_view(window_b, |cx| {
1117            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1118        });
1119        // TODO
1120        // // Create a selection set as client B and see that selection set as client A.
1121        // buffer_a
1122        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1123        //     .await;
1124
1125        // Edit the buffer as client B and see that edit as client A.
1126        editor_b.update(&mut cx_b, |editor, cx| {
1127            editor.handle_input(&Input("ok, ".into()), cx)
1128        });
1129        buffer_a
1130            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1131            .await;
1132
1133        // TODO
1134        // // Remove the selection set as client B, see those selections disappear as client A.
1135        cx_b.update(move |_| drop(editor_b));
1136        // buffer_a
1137        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1138        //     .await;
1139
1140        // Close the buffer as client A, see that the buffer is closed.
1141        cx_a.update(move |_| drop(buffer_a));
1142        worktree_a
1143            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1144            .await;
1145
1146        // Dropping the worktree removes client B from client A's collaborators.
1147        cx_b.update(move |_| drop(worktree_b));
1148        project_a
1149            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1150            .await;
1151    }
1152
1153    #[gpui::test]
1154    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1155        cx_b.update(zed::contacts_panel::init);
1156        let lang_registry = Arc::new(LanguageRegistry::new());
1157        let fs = Arc::new(FakeFs::new());
1158
1159        // Connect to a server as 2 clients.
1160        let mut server = TestServer::start().await;
1161        let client_a = server.create_client(&mut cx_a, "user_a").await;
1162        let client_b = server.create_client(&mut cx_b, "user_b").await;
1163        cx_a.foreground().forbid_parking();
1164
1165        // Share a project as client A
1166        fs.insert_tree(
1167            "/a",
1168            json!({
1169                ".zed.toml": r#"collaborators = ["user_b"]"#,
1170                "a.txt": "a-contents",
1171                "b.txt": "b-contents",
1172            }),
1173        )
1174        .await;
1175        let project_a = cx_a.update(|cx| {
1176            Project::local(
1177                client_a.clone(),
1178                client_a.user_store.clone(),
1179                lang_registry.clone(),
1180                fs.clone(),
1181                cx,
1182            )
1183        });
1184        let worktree_a = project_a
1185            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1186            .await
1187            .unwrap();
1188        worktree_a
1189            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1190            .await;
1191        let project_id = project_a
1192            .update(&mut cx_a, |project, _| project.next_remote_id())
1193            .await;
1194        project_a
1195            .update(&mut cx_a, |project, cx| project.share(cx))
1196            .await
1197            .unwrap();
1198
1199        // Join that project as client B
1200        let project_b = Project::remote(
1201            project_id,
1202            client_b.clone(),
1203            client_b.user_store.clone(),
1204            lang_registry.clone(),
1205            fs.clone(),
1206            &mut cx_b.to_async(),
1207        )
1208        .await
1209        .unwrap();
1210
1211        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1212        worktree_b
1213            .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1214            .await
1215            .unwrap();
1216
1217        project_a
1218            .update(&mut cx_a, |project, cx| project.unshare(cx))
1219            .await
1220            .unwrap();
1221        project_b
1222            .condition(&mut cx_b, |project, _| project.is_read_only())
1223            .await;
1224    }
1225
1226    #[gpui::test]
1227    async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1228        mut cx_a: TestAppContext,
1229        mut cx_b: TestAppContext,
1230        mut cx_c: TestAppContext,
1231    ) {
1232        cx_a.foreground().forbid_parking();
1233        let lang_registry = Arc::new(LanguageRegistry::new());
1234        let fs = Arc::new(FakeFs::new());
1235
1236        // Connect to a server as 3 clients.
1237        let mut server = TestServer::start().await;
1238        let client_a = server.create_client(&mut cx_a, "user_a").await;
1239        let client_b = server.create_client(&mut cx_b, "user_b").await;
1240        let client_c = server.create_client(&mut cx_c, "user_c").await;
1241
1242        // Share a worktree as client A.
1243        fs.insert_tree(
1244            "/a",
1245            json!({
1246                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1247                "file1": "",
1248                "file2": ""
1249            }),
1250        )
1251        .await;
1252        let project_a = cx_a.update(|cx| {
1253            Project::local(
1254                client_a.clone(),
1255                client_a.user_store.clone(),
1256                lang_registry.clone(),
1257                fs.clone(),
1258                cx,
1259            )
1260        });
1261        let worktree_a = project_a
1262            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1263            .await
1264            .unwrap();
1265        worktree_a
1266            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1267            .await;
1268        let project_id = project_a
1269            .update(&mut cx_a, |project, _| project.next_remote_id())
1270            .await;
1271        project_a
1272            .update(&mut cx_a, |project, cx| project.share(cx))
1273            .await
1274            .unwrap();
1275
1276        // Join that worktree as clients B and C.
1277        let project_b = Project::remote(
1278            project_id,
1279            client_b.clone(),
1280            client_b.user_store.clone(),
1281            lang_registry.clone(),
1282            fs.clone(),
1283            &mut cx_b.to_async(),
1284        )
1285        .await
1286        .unwrap();
1287        let project_c = Project::remote(
1288            project_id,
1289            client_c.clone(),
1290            client_c.user_store.clone(),
1291            lang_registry.clone(),
1292            fs.clone(),
1293            &mut cx_c.to_async(),
1294        )
1295        .await
1296        .unwrap();
1297
1298        // Open and edit a buffer as both guests B and C.
1299        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1300        let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1301        let buffer_b = worktree_b
1302            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1303            .await
1304            .unwrap();
1305        let buffer_c = worktree_c
1306            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1307            .await
1308            .unwrap();
1309        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1310        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1311
1312        // Open and edit that buffer as the host.
1313        let buffer_a = worktree_a
1314            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1315            .await
1316            .unwrap();
1317
1318        buffer_a
1319            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1320            .await;
1321        buffer_a.update(&mut cx_a, |buf, cx| {
1322            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1323        });
1324
1325        // Wait for edits to propagate
1326        buffer_a
1327            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1328            .await;
1329        buffer_b
1330            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1331            .await;
1332        buffer_c
1333            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1334            .await;
1335
1336        // Edit the buffer as the host and concurrently save as guest B.
1337        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1338        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1339        save_b.await.unwrap();
1340        assert_eq!(
1341            fs.load("/a/file1".as_ref()).await.unwrap(),
1342            "hi-a, i-am-c, i-am-b, i-am-a"
1343        );
1344        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1345        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1346        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1347
1348        // Make changes on host's file system, see those changes on the guests.
1349        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1350            .await
1351            .unwrap();
1352        fs.insert_file(Path::new("/a/file4"), "4".into())
1353            .await
1354            .unwrap();
1355
1356        worktree_b
1357            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1358            .await;
1359        worktree_c
1360            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1361            .await;
1362        worktree_b.read_with(&cx_b, |tree, _| {
1363            assert_eq!(
1364                tree.paths()
1365                    .map(|p| p.to_string_lossy())
1366                    .collect::<Vec<_>>(),
1367                &[".zed.toml", "file1", "file3", "file4"]
1368            )
1369        });
1370        worktree_c.read_with(&cx_c, |tree, _| {
1371            assert_eq!(
1372                tree.paths()
1373                    .map(|p| p.to_string_lossy())
1374                    .collect::<Vec<_>>(),
1375                &[".zed.toml", "file1", "file3", "file4"]
1376            )
1377        });
1378    }
1379
1380    #[gpui::test]
1381    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1382        cx_a.foreground().forbid_parking();
1383        let lang_registry = Arc::new(LanguageRegistry::new());
1384        let fs = Arc::new(FakeFs::new());
1385
1386        // Connect to a server as 2 clients.
1387        let mut server = TestServer::start().await;
1388        let client_a = server.create_client(&mut cx_a, "user_a").await;
1389        let client_b = server.create_client(&mut cx_b, "user_b").await;
1390
1391        // Share a project as client A
1392        fs.insert_tree(
1393            "/dir",
1394            json!({
1395                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1396                "a.txt": "a-contents",
1397            }),
1398        )
1399        .await;
1400
1401        let project_a = cx_a.update(|cx| {
1402            Project::local(
1403                client_a.clone(),
1404                client_a.user_store.clone(),
1405                lang_registry.clone(),
1406                fs.clone(),
1407                cx,
1408            )
1409        });
1410        let worktree_a = project_a
1411            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1412            .await
1413            .unwrap();
1414        worktree_a
1415            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1416            .await;
1417        let project_id = project_a
1418            .update(&mut cx_a, |project, _| project.next_remote_id())
1419            .await;
1420        project_a
1421            .update(&mut cx_a, |project, cx| project.share(cx))
1422            .await
1423            .unwrap();
1424
1425        // Join that project as client B
1426        let project_b = Project::remote(
1427            project_id,
1428            client_b.clone(),
1429            client_b.user_store.clone(),
1430            lang_registry.clone(),
1431            fs.clone(),
1432            &mut cx_b.to_async(),
1433        )
1434        .await
1435        .unwrap();
1436        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1437
1438        // Open a buffer as client B
1439        let buffer_b = worktree_b
1440            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1441            .await
1442            .unwrap();
1443        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1444
1445        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1446        buffer_b.read_with(&cx_b, |buf, _| {
1447            assert!(buf.is_dirty());
1448            assert!(!buf.has_conflict());
1449        });
1450
1451        buffer_b
1452            .update(&mut cx_b, |buf, cx| buf.save(cx))
1453            .unwrap()
1454            .await
1455            .unwrap();
1456        worktree_b
1457            .condition(&cx_b, |_, cx| {
1458                buffer_b.read(cx).file().unwrap().mtime() != mtime
1459            })
1460            .await;
1461        buffer_b.read_with(&cx_b, |buf, _| {
1462            assert!(!buf.is_dirty());
1463            assert!(!buf.has_conflict());
1464        });
1465
1466        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1467        buffer_b.read_with(&cx_b, |buf, _| {
1468            assert!(buf.is_dirty());
1469            assert!(!buf.has_conflict());
1470        });
1471    }
1472
1473    #[gpui::test]
1474    async fn test_editing_while_guest_opens_buffer(
1475        mut cx_a: TestAppContext,
1476        mut cx_b: TestAppContext,
1477    ) {
1478        cx_a.foreground().forbid_parking();
1479        let lang_registry = Arc::new(LanguageRegistry::new());
1480        let fs = Arc::new(FakeFs::new());
1481
1482        // Connect to a server as 2 clients.
1483        let mut server = TestServer::start().await;
1484        let client_a = server.create_client(&mut cx_a, "user_a").await;
1485        let client_b = server.create_client(&mut cx_b, "user_b").await;
1486
1487        // Share a project as client A
1488        fs.insert_tree(
1489            "/dir",
1490            json!({
1491                ".zed.toml": r#"collaborators = ["user_b"]"#,
1492                "a.txt": "a-contents",
1493            }),
1494        )
1495        .await;
1496        let project_a = cx_a.update(|cx| {
1497            Project::local(
1498                client_a.clone(),
1499                client_a.user_store.clone(),
1500                lang_registry.clone(),
1501                fs.clone(),
1502                cx,
1503            )
1504        });
1505        let worktree_a = project_a
1506            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1507            .await
1508            .unwrap();
1509        worktree_a
1510            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1511            .await;
1512        let project_id = project_a
1513            .update(&mut cx_a, |project, _| project.next_remote_id())
1514            .await;
1515        project_a
1516            .update(&mut cx_a, |project, cx| project.share(cx))
1517            .await
1518            .unwrap();
1519
1520        // Join that project as client B
1521        let project_b = Project::remote(
1522            project_id,
1523            client_b.clone(),
1524            client_b.user_store.clone(),
1525            lang_registry.clone(),
1526            fs.clone(),
1527            &mut cx_b.to_async(),
1528        )
1529        .await
1530        .unwrap();
1531        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1532
1533        // Open a buffer as client A
1534        let buffer_a = worktree_a
1535            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1536            .await
1537            .unwrap();
1538
1539        // Start opening the same buffer as client B
1540        let buffer_b = cx_b
1541            .background()
1542            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1543        task::yield_now().await;
1544
1545        // Edit the buffer as client A while client B is still opening it.
1546        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1547
1548        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1549        let buffer_b = buffer_b.await.unwrap();
1550        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1551    }
1552
1553    #[gpui::test]
1554    async fn test_leaving_worktree_while_opening_buffer(
1555        mut cx_a: TestAppContext,
1556        mut cx_b: TestAppContext,
1557    ) {
1558        cx_a.foreground().forbid_parking();
1559        let lang_registry = Arc::new(LanguageRegistry::new());
1560        let fs = Arc::new(FakeFs::new());
1561
1562        // Connect to a server as 2 clients.
1563        let mut server = TestServer::start().await;
1564        let client_a = server.create_client(&mut cx_a, "user_a").await;
1565        let client_b = server.create_client(&mut cx_b, "user_b").await;
1566
1567        // Share a project as client A
1568        fs.insert_tree(
1569            "/dir",
1570            json!({
1571                ".zed.toml": r#"collaborators = ["user_b"]"#,
1572                "a.txt": "a-contents",
1573            }),
1574        )
1575        .await;
1576        let project_a = cx_a.update(|cx| {
1577            Project::local(
1578                client_a.clone(),
1579                client_a.user_store.clone(),
1580                lang_registry.clone(),
1581                fs.clone(),
1582                cx,
1583            )
1584        });
1585        let worktree_a = project_a
1586            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1587            .await
1588            .unwrap();
1589        worktree_a
1590            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1591            .await;
1592        let project_id = project_a
1593            .update(&mut cx_a, |project, _| project.next_remote_id())
1594            .await;
1595        project_a
1596            .update(&mut cx_a, |project, cx| project.share(cx))
1597            .await
1598            .unwrap();
1599
1600        // Join that project as client B
1601        let project_b = Project::remote(
1602            project_id,
1603            client_b.clone(),
1604            client_b.user_store.clone(),
1605            lang_registry.clone(),
1606            fs.clone(),
1607            &mut cx_b.to_async(),
1608        )
1609        .await
1610        .unwrap();
1611        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1612
1613        // See that a guest has joined as client A.
1614        project_a
1615            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1616            .await;
1617
1618        // Begin opening a buffer as client B, but leave the project before the open completes.
1619        let buffer_b = cx_b
1620            .background()
1621            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1622        cx_b.update(|_| drop(worktree_b));
1623        drop(buffer_b);
1624
1625        // See that the guest has left.
1626        project_a
1627            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1628            .await;
1629    }
1630
1631    #[gpui::test]
1632    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1633        cx_a.foreground().forbid_parking();
1634        let lang_registry = Arc::new(LanguageRegistry::new());
1635        let fs = Arc::new(FakeFs::new());
1636
1637        // Connect to a server as 2 clients.
1638        let mut server = TestServer::start().await;
1639        let client_a = server.create_client(&mut cx_a, "user_a").await;
1640        let client_b = server.create_client(&mut cx_b, "user_b").await;
1641
1642        // Share a project as client A
1643        fs.insert_tree(
1644            "/a",
1645            json!({
1646                ".zed.toml": r#"collaborators = ["user_b"]"#,
1647                "a.txt": "a-contents",
1648                "b.txt": "b-contents",
1649            }),
1650        )
1651        .await;
1652        let project_a = cx_a.update(|cx| {
1653            Project::local(
1654                client_a.clone(),
1655                client_a.user_store.clone(),
1656                lang_registry.clone(),
1657                fs.clone(),
1658                cx,
1659            )
1660        });
1661        let worktree_a = project_a
1662            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1663            .await
1664            .unwrap();
1665        worktree_a
1666            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1667            .await;
1668        let project_id = project_a
1669            .update(&mut cx_a, |project, _| project.next_remote_id())
1670            .await;
1671        project_a
1672            .update(&mut cx_a, |project, cx| project.share(cx))
1673            .await
1674            .unwrap();
1675
1676        // Join that project as client B
1677        let _project_b = Project::remote(
1678            project_id,
1679            client_b.clone(),
1680            client_b.user_store.clone(),
1681            lang_registry.clone(),
1682            fs.clone(),
1683            &mut cx_b.to_async(),
1684        )
1685        .await
1686        .unwrap();
1687
1688        // See that a guest has joined as client A.
1689        project_a
1690            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1691            .await;
1692
1693        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1694        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1695        project_a
1696            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1697            .await;
1698    }
1699
1700    #[gpui::test]
1701    async fn test_collaborating_with_diagnostics(
1702        mut cx_a: TestAppContext,
1703        mut cx_b: TestAppContext,
1704    ) {
1705        cx_a.foreground().forbid_parking();
1706        let mut lang_registry = Arc::new(LanguageRegistry::new());
1707        let fs = Arc::new(FakeFs::new());
1708
1709        // Set up a fake language server.
1710        let (language_server_config, mut fake_language_server) =
1711            LanguageServerConfig::fake(cx_a.background()).await;
1712        Arc::get_mut(&mut lang_registry)
1713            .unwrap()
1714            .add(Arc::new(Language::new(
1715                LanguageConfig {
1716                    name: "Rust".to_string(),
1717                    path_suffixes: vec!["rs".to_string()],
1718                    language_server: Some(language_server_config),
1719                    ..Default::default()
1720                },
1721                Some(tree_sitter_rust::language()),
1722            )));
1723
1724        // Connect to a server as 2 clients.
1725        let mut server = TestServer::start().await;
1726        let client_a = server.create_client(&mut cx_a, "user_a").await;
1727        let client_b = server.create_client(&mut cx_b, "user_b").await;
1728
1729        // Share a project as client A
1730        fs.insert_tree(
1731            "/a",
1732            json!({
1733                ".zed.toml": r#"collaborators = ["user_b"]"#,
1734                "a.rs": "let one = two",
1735                "other.rs": "",
1736            }),
1737        )
1738        .await;
1739        let project_a = cx_a.update(|cx| {
1740            Project::local(
1741                client_a.clone(),
1742                client_a.user_store.clone(),
1743                lang_registry.clone(),
1744                fs.clone(),
1745                cx,
1746            )
1747        });
1748        let worktree_a = project_a
1749            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1750            .await
1751            .unwrap();
1752        worktree_a
1753            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1754            .await;
1755        let project_id = project_a
1756            .update(&mut cx_a, |project, _| project.next_remote_id())
1757            .await;
1758        project_a
1759            .update(&mut cx_a, |project, cx| project.share(cx))
1760            .await
1761            .unwrap();
1762
1763        // Cause the language server to start.
1764        let _ = cx_a
1765            .background()
1766            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1767                worktree.open_buffer("other.rs", cx)
1768            }))
1769            .await
1770            .unwrap();
1771
1772        // Simulate a language server reporting errors for a file.
1773        fake_language_server
1774            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1775                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1776                version: None,
1777                diagnostics: vec![
1778                    lsp::Diagnostic {
1779                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1780                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1781                        message: "message 1".to_string(),
1782                        ..Default::default()
1783                    },
1784                    lsp::Diagnostic {
1785                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1786                        range: lsp::Range::new(
1787                            lsp::Position::new(0, 10),
1788                            lsp::Position::new(0, 13),
1789                        ),
1790                        message: "message 2".to_string(),
1791                        ..Default::default()
1792                    },
1793                ],
1794            })
1795            .await;
1796
1797        // Join the worktree as client B.
1798        let project_b = Project::remote(
1799            project_id,
1800            client_b.clone(),
1801            client_b.user_store.clone(),
1802            lang_registry.clone(),
1803            fs.clone(),
1804            &mut cx_b.to_async(),
1805        )
1806        .await
1807        .unwrap();
1808        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1809
1810        // Open the file with the errors.
1811        let buffer_b = cx_b
1812            .background()
1813            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1814            .await
1815            .unwrap();
1816
1817        buffer_b.read_with(&cx_b, |buffer, _| {
1818            assert_eq!(
1819                buffer
1820                    .snapshot()
1821                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1822                    .collect::<Vec<_>>(),
1823                &[
1824                    DiagnosticEntry {
1825                        range: Point::new(0, 4)..Point::new(0, 7),
1826                        diagnostic: Diagnostic {
1827                            group_id: 0,
1828                            message: "message 1".to_string(),
1829                            severity: lsp::DiagnosticSeverity::ERROR,
1830                            is_primary: true,
1831                            ..Default::default()
1832                        }
1833                    },
1834                    DiagnosticEntry {
1835                        range: Point::new(0, 10)..Point::new(0, 13),
1836                        diagnostic: Diagnostic {
1837                            group_id: 1,
1838                            severity: lsp::DiagnosticSeverity::WARNING,
1839                            message: "message 2".to_string(),
1840                            is_primary: true,
1841                            ..Default::default()
1842                        }
1843                    }
1844                ]
1845            );
1846        });
1847    }
1848
1849    #[gpui::test]
1850    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1851        cx_a.foreground().forbid_parking();
1852
1853        // Connect to a server as 2 clients.
1854        let mut server = TestServer::start().await;
1855        let client_a = server.create_client(&mut cx_a, "user_a").await;
1856        let client_b = server.create_client(&mut cx_b, "user_b").await;
1857
1858        // Create an org that includes these 2 users.
1859        let db = &server.app_state.db;
1860        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1861        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1862            .await
1863            .unwrap();
1864        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1865            .await
1866            .unwrap();
1867
1868        // Create a channel that includes all the users.
1869        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1870        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1871            .await
1872            .unwrap();
1873        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1874            .await
1875            .unwrap();
1876        db.create_channel_message(
1877            channel_id,
1878            client_b.current_user_id(&cx_b),
1879            "hello A, it's B.",
1880            OffsetDateTime::now_utc(),
1881            1,
1882        )
1883        .await
1884        .unwrap();
1885
1886        let channels_a = cx_a
1887            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1888        channels_a
1889            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1890            .await;
1891        channels_a.read_with(&cx_a, |list, _| {
1892            assert_eq!(
1893                list.available_channels().unwrap(),
1894                &[ChannelDetails {
1895                    id: channel_id.to_proto(),
1896                    name: "test-channel".to_string()
1897                }]
1898            )
1899        });
1900        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1901            this.get_channel(channel_id.to_proto(), cx).unwrap()
1902        });
1903        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1904        channel_a
1905            .condition(&cx_a, |channel, _| {
1906                channel_messages(channel)
1907                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1908            })
1909            .await;
1910
1911        let channels_b = cx_b
1912            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1913        channels_b
1914            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1915            .await;
1916        channels_b.read_with(&cx_b, |list, _| {
1917            assert_eq!(
1918                list.available_channels().unwrap(),
1919                &[ChannelDetails {
1920                    id: channel_id.to_proto(),
1921                    name: "test-channel".to_string()
1922                }]
1923            )
1924        });
1925
1926        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1927            this.get_channel(channel_id.to_proto(), cx).unwrap()
1928        });
1929        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1930        channel_b
1931            .condition(&cx_b, |channel, _| {
1932                channel_messages(channel)
1933                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1934            })
1935            .await;
1936
1937        channel_a
1938            .update(&mut cx_a, |channel, cx| {
1939                channel
1940                    .send_message("oh, hi B.".to_string(), cx)
1941                    .unwrap()
1942                    .detach();
1943                let task = channel.send_message("sup".to_string(), cx).unwrap();
1944                assert_eq!(
1945                    channel_messages(channel),
1946                    &[
1947                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1948                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1949                        ("user_a".to_string(), "sup".to_string(), true)
1950                    ]
1951                );
1952                task
1953            })
1954            .await
1955            .unwrap();
1956
1957        channel_b
1958            .condition(&cx_b, |channel, _| {
1959                channel_messages(channel)
1960                    == [
1961                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1962                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1963                        ("user_a".to_string(), "sup".to_string(), false),
1964                    ]
1965            })
1966            .await;
1967
1968        assert_eq!(
1969            server
1970                .state()
1971                .await
1972                .channel(channel_id)
1973                .unwrap()
1974                .connection_ids
1975                .len(),
1976            2
1977        );
1978        cx_b.update(|_| drop(channel_b));
1979        server
1980            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1981            .await;
1982
1983        cx_a.update(|_| drop(channel_a));
1984        server
1985            .condition(|state| state.channel(channel_id).is_none())
1986            .await;
1987    }
1988
1989    #[gpui::test]
1990    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1991        cx_a.foreground().forbid_parking();
1992
1993        let mut server = TestServer::start().await;
1994        let client_a = server.create_client(&mut cx_a, "user_a").await;
1995
1996        let db = &server.app_state.db;
1997        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1998        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1999        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2000            .await
2001            .unwrap();
2002        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2003            .await
2004            .unwrap();
2005
2006        let channels_a = cx_a
2007            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2008        channels_a
2009            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2010            .await;
2011        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2012            this.get_channel(channel_id.to_proto(), cx).unwrap()
2013        });
2014
2015        // Messages aren't allowed to be too long.
2016        channel_a
2017            .update(&mut cx_a, |channel, cx| {
2018                let long_body = "this is long.\n".repeat(1024);
2019                channel.send_message(long_body, cx).unwrap()
2020            })
2021            .await
2022            .unwrap_err();
2023
2024        // Messages aren't allowed to be blank.
2025        channel_a.update(&mut cx_a, |channel, cx| {
2026            channel.send_message(String::new(), cx).unwrap_err()
2027        });
2028
2029        // Leading and trailing whitespace are trimmed.
2030        channel_a
2031            .update(&mut cx_a, |channel, cx| {
2032                channel
2033                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2034                    .unwrap()
2035            })
2036            .await
2037            .unwrap();
2038        assert_eq!(
2039            db.get_channel_messages(channel_id, 10, None)
2040                .await
2041                .unwrap()
2042                .iter()
2043                .map(|m| &m.body)
2044                .collect::<Vec<_>>(),
2045            &["surrounded by whitespace"]
2046        );
2047    }
2048
2049    #[gpui::test]
2050    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2051        cx_a.foreground().forbid_parking();
2052
2053        // Connect to a server as 2 clients.
2054        let mut server = TestServer::start().await;
2055        let client_a = server.create_client(&mut cx_a, "user_a").await;
2056        let client_b = server.create_client(&mut cx_b, "user_b").await;
2057        let mut status_b = client_b.status();
2058
2059        // Create an org that includes these 2 users.
2060        let db = &server.app_state.db;
2061        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2062        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2063            .await
2064            .unwrap();
2065        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2066            .await
2067            .unwrap();
2068
2069        // Create a channel that includes all the users.
2070        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2071        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2072            .await
2073            .unwrap();
2074        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2075            .await
2076            .unwrap();
2077        db.create_channel_message(
2078            channel_id,
2079            client_b.current_user_id(&cx_b),
2080            "hello A, it's B.",
2081            OffsetDateTime::now_utc(),
2082            2,
2083        )
2084        .await
2085        .unwrap();
2086
2087        let channels_a = cx_a
2088            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2089        channels_a
2090            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2091            .await;
2092
2093        channels_a.read_with(&cx_a, |list, _| {
2094            assert_eq!(
2095                list.available_channels().unwrap(),
2096                &[ChannelDetails {
2097                    id: channel_id.to_proto(),
2098                    name: "test-channel".to_string()
2099                }]
2100            )
2101        });
2102        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2103            this.get_channel(channel_id.to_proto(), cx).unwrap()
2104        });
2105        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2106        channel_a
2107            .condition(&cx_a, |channel, _| {
2108                channel_messages(channel)
2109                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2110            })
2111            .await;
2112
2113        let channels_b = cx_b
2114            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2115        channels_b
2116            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2117            .await;
2118        channels_b.read_with(&cx_b, |list, _| {
2119            assert_eq!(
2120                list.available_channels().unwrap(),
2121                &[ChannelDetails {
2122                    id: channel_id.to_proto(),
2123                    name: "test-channel".to_string()
2124                }]
2125            )
2126        });
2127
2128        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2129            this.get_channel(channel_id.to_proto(), cx).unwrap()
2130        });
2131        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2132        channel_b
2133            .condition(&cx_b, |channel, _| {
2134                channel_messages(channel)
2135                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2136            })
2137            .await;
2138
2139        // Disconnect client B, ensuring we can still access its cached channel data.
2140        server.forbid_connections();
2141        server.disconnect_client(client_b.current_user_id(&cx_b));
2142        while !matches!(
2143            status_b.recv().await,
2144            Some(client::Status::ReconnectionError { .. })
2145        ) {}
2146
2147        channels_b.read_with(&cx_b, |channels, _| {
2148            assert_eq!(
2149                channels.available_channels().unwrap(),
2150                [ChannelDetails {
2151                    id: channel_id.to_proto(),
2152                    name: "test-channel".to_string()
2153                }]
2154            )
2155        });
2156        channel_b.read_with(&cx_b, |channel, _| {
2157            assert_eq!(
2158                channel_messages(channel),
2159                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2160            )
2161        });
2162
2163        // Send a message from client B while it is disconnected.
2164        channel_b
2165            .update(&mut cx_b, |channel, cx| {
2166                let task = channel
2167                    .send_message("can you see this?".to_string(), cx)
2168                    .unwrap();
2169                assert_eq!(
2170                    channel_messages(channel),
2171                    &[
2172                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2173                        ("user_b".to_string(), "can you see this?".to_string(), true)
2174                    ]
2175                );
2176                task
2177            })
2178            .await
2179            .unwrap_err();
2180
2181        // Send a message from client A while B is disconnected.
2182        channel_a
2183            .update(&mut cx_a, |channel, cx| {
2184                channel
2185                    .send_message("oh, hi B.".to_string(), cx)
2186                    .unwrap()
2187                    .detach();
2188                let task = channel.send_message("sup".to_string(), cx).unwrap();
2189                assert_eq!(
2190                    channel_messages(channel),
2191                    &[
2192                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2193                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2194                        ("user_a".to_string(), "sup".to_string(), true)
2195                    ]
2196                );
2197                task
2198            })
2199            .await
2200            .unwrap();
2201
2202        // Give client B a chance to reconnect.
2203        server.allow_connections();
2204        cx_b.foreground().advance_clock(Duration::from_secs(10));
2205
2206        // Verify that B sees the new messages upon reconnection, as well as the message client B
2207        // sent while offline.
2208        channel_b
2209            .condition(&cx_b, |channel, _| {
2210                channel_messages(channel)
2211                    == [
2212                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2213                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2214                        ("user_a".to_string(), "sup".to_string(), false),
2215                        ("user_b".to_string(), "can you see this?".to_string(), false),
2216                    ]
2217            })
2218            .await;
2219
2220        // Ensure client A and B can communicate normally after reconnection.
2221        channel_a
2222            .update(&mut cx_a, |channel, cx| {
2223                channel.send_message("you online?".to_string(), cx).unwrap()
2224            })
2225            .await
2226            .unwrap();
2227        channel_b
2228            .condition(&cx_b, |channel, _| {
2229                channel_messages(channel)
2230                    == [
2231                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2232                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2233                        ("user_a".to_string(), "sup".to_string(), false),
2234                        ("user_b".to_string(), "can you see this?".to_string(), false),
2235                        ("user_a".to_string(), "you online?".to_string(), false),
2236                    ]
2237            })
2238            .await;
2239
2240        channel_b
2241            .update(&mut cx_b, |channel, cx| {
2242                channel.send_message("yep".to_string(), cx).unwrap()
2243            })
2244            .await
2245            .unwrap();
2246        channel_a
2247            .condition(&cx_a, |channel, _| {
2248                channel_messages(channel)
2249                    == [
2250                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2251                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2252                        ("user_a".to_string(), "sup".to_string(), false),
2253                        ("user_b".to_string(), "can you see this?".to_string(), false),
2254                        ("user_a".to_string(), "you online?".to_string(), false),
2255                        ("user_b".to_string(), "yep".to_string(), false),
2256                    ]
2257            })
2258            .await;
2259    }
2260
2261    #[gpui::test]
2262    async fn test_contacts(
2263        mut cx_a: TestAppContext,
2264        mut cx_b: TestAppContext,
2265        mut cx_c: TestAppContext,
2266    ) {
2267        cx_a.foreground().forbid_parking();
2268        let lang_registry = Arc::new(LanguageRegistry::new());
2269        let fs = Arc::new(FakeFs::new());
2270
2271        // Connect to a server as 3 clients.
2272        let mut server = TestServer::start().await;
2273        let client_a = server.create_client(&mut cx_a, "user_a").await;
2274        let client_b = server.create_client(&mut cx_b, "user_b").await;
2275        let client_c = server.create_client(&mut cx_c, "user_c").await;
2276
2277        // Share a worktree as client A.
2278        fs.insert_tree(
2279            "/a",
2280            json!({
2281                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2282            }),
2283        )
2284        .await;
2285
2286        let project_a = cx_a.update(|cx| {
2287            Project::local(
2288                client_a.clone(),
2289                client_a.user_store.clone(),
2290                lang_registry.clone(),
2291                fs.clone(),
2292                cx,
2293            )
2294        });
2295        let worktree_a = project_a
2296            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2297            .await
2298            .unwrap();
2299        worktree_a
2300            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2301            .await;
2302
2303        client_a
2304            .user_store
2305            .condition(&cx_a, |user_store, _| {
2306                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2307            })
2308            .await;
2309        client_b
2310            .user_store
2311            .condition(&cx_b, |user_store, _| {
2312                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2313            })
2314            .await;
2315        client_c
2316            .user_store
2317            .condition(&cx_c, |user_store, _| {
2318                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2319            })
2320            .await;
2321
2322        let project_id = project_a
2323            .update(&mut cx_a, |project, _| project.next_remote_id())
2324            .await;
2325        project_a
2326            .update(&mut cx_a, |project, cx| project.share(cx))
2327            .await
2328            .unwrap();
2329
2330        let _project_b = Project::remote(
2331            project_id,
2332            client_b.clone(),
2333            client_b.user_store.clone(),
2334            lang_registry.clone(),
2335            fs.clone(),
2336            &mut cx_b.to_async(),
2337        )
2338        .await
2339        .unwrap();
2340
2341        client_a
2342            .user_store
2343            .condition(&cx_a, |user_store, _| {
2344                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2345            })
2346            .await;
2347        client_b
2348            .user_store
2349            .condition(&cx_b, |user_store, _| {
2350                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2351            })
2352            .await;
2353        client_c
2354            .user_store
2355            .condition(&cx_c, |user_store, _| {
2356                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2357            })
2358            .await;
2359
2360        project_a
2361            .condition(&cx_a, |project, _| {
2362                project.collaborators().contains_key(&client_b.peer_id)
2363            })
2364            .await;
2365
2366        cx_a.update(move |_| drop(project_a));
2367        client_a
2368            .user_store
2369            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2370            .await;
2371        client_b
2372            .user_store
2373            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2374            .await;
2375        client_c
2376            .user_store
2377            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2378            .await;
2379
2380        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2381            user_store
2382                .contacts()
2383                .iter()
2384                .map(|contact| {
2385                    let worktrees = contact
2386                        .projects
2387                        .iter()
2388                        .map(|p| {
2389                            (
2390                                p.worktree_root_names[0].as_str(),
2391                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2392                            )
2393                        })
2394                        .collect();
2395                    (contact.user.github_login.as_str(), worktrees)
2396                })
2397                .collect()
2398        }
2399    }
2400
2401    struct TestServer {
2402        peer: Arc<Peer>,
2403        app_state: Arc<AppState>,
2404        server: Arc<Server>,
2405        notifications: mpsc::Receiver<()>,
2406        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2407        forbid_connections: Arc<AtomicBool>,
2408        _test_db: TestDb,
2409    }
2410
2411    impl TestServer {
2412        async fn start() -> Self {
2413            let test_db = TestDb::new();
2414            let app_state = Self::build_app_state(&test_db).await;
2415            let peer = Peer::new();
2416            let notifications = mpsc::channel(128);
2417            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2418            Self {
2419                peer,
2420                app_state,
2421                server,
2422                notifications: notifications.1,
2423                connection_killers: Default::default(),
2424                forbid_connections: Default::default(),
2425                _test_db: test_db,
2426            }
2427        }
2428
2429        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2430            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2431            let client_name = name.to_string();
2432            let mut client = Client::new();
2433            let server = self.server.clone();
2434            let connection_killers = self.connection_killers.clone();
2435            let forbid_connections = self.forbid_connections.clone();
2436            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2437
2438            Arc::get_mut(&mut client)
2439                .unwrap()
2440                .override_authenticate(move |cx| {
2441                    cx.spawn(|_| async move {
2442                        let access_token = "the-token".to_string();
2443                        Ok(Credentials {
2444                            user_id: user_id.0 as u64,
2445                            access_token,
2446                        })
2447                    })
2448                })
2449                .override_establish_connection(move |credentials, cx| {
2450                    assert_eq!(credentials.user_id, user_id.0 as u64);
2451                    assert_eq!(credentials.access_token, "the-token");
2452
2453                    let server = server.clone();
2454                    let connection_killers = connection_killers.clone();
2455                    let forbid_connections = forbid_connections.clone();
2456                    let client_name = client_name.clone();
2457                    let connection_id_tx = connection_id_tx.clone();
2458                    cx.spawn(move |cx| async move {
2459                        if forbid_connections.load(SeqCst) {
2460                            Err(EstablishConnectionError::other(anyhow!(
2461                                "server is forbidding connections"
2462                            )))
2463                        } else {
2464                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2465                            connection_killers.lock().insert(user_id, kill_conn);
2466                            cx.background()
2467                                .spawn(server.handle_connection(
2468                                    server_conn,
2469                                    client_name,
2470                                    user_id,
2471                                    Some(connection_id_tx),
2472                                ))
2473                                .detach();
2474                            Ok(client_conn)
2475                        }
2476                    })
2477                });
2478
2479            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2480            client
2481                .authenticate_and_connect(&cx.to_async())
2482                .await
2483                .unwrap();
2484
2485            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2486            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2487            let mut authed_user =
2488                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2489            while authed_user.recv().await.unwrap().is_none() {}
2490
2491            TestClient {
2492                client,
2493                peer_id,
2494                user_store,
2495            }
2496        }
2497
2498        fn disconnect_client(&self, user_id: UserId) {
2499            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2500                let _ = kill_conn.try_send(Some(()));
2501            }
2502        }
2503
2504        fn forbid_connections(&self) {
2505            self.forbid_connections.store(true, SeqCst);
2506        }
2507
2508        fn allow_connections(&self) {
2509            self.forbid_connections.store(false, SeqCst);
2510        }
2511
2512        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2513            let mut config = Config::default();
2514            config.session_secret = "a".repeat(32);
2515            config.database_url = test_db.url.clone();
2516            let github_client = github::AppClient::test();
2517            Arc::new(AppState {
2518                db: test_db.db().clone(),
2519                handlebars: Default::default(),
2520                auth_client: auth::build_client("", ""),
2521                repo_client: github::RepoClient::test(&github_client),
2522                github_client,
2523                config,
2524            })
2525        }
2526
2527        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2528            self.server.store.read()
2529        }
2530
2531        async fn condition<F>(&mut self, mut predicate: F)
2532        where
2533            F: FnMut(&Store) -> bool,
2534        {
2535            async_std::future::timeout(Duration::from_millis(500), async {
2536                while !(predicate)(&*self.server.store.read()) {
2537                    self.notifications.recv().await;
2538                }
2539            })
2540            .await
2541            .expect("condition timed out");
2542        }
2543    }
2544
2545    impl Drop for TestServer {
2546        fn drop(&mut self) {
2547            task::block_on(self.peer.reset());
2548        }
2549    }
2550
2551    struct TestClient {
2552        client: Arc<Client>,
2553        pub peer_id: PeerId,
2554        pub user_store: ModelHandle<UserStore>,
2555    }
2556
2557    impl Deref for TestClient {
2558        type Target = Arc<Client>;
2559
2560        fn deref(&self) -> &Self::Target {
2561            &self.client
2562        }
2563    }
2564
2565    impl TestClient {
2566        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2567            UserId::from_proto(
2568                self.user_store
2569                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2570            )
2571        }
2572    }
2573
2574    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2575        channel
2576            .messages()
2577            .cursor::<()>()
2578            .map(|m| {
2579                (
2580                    m.sender.github_login.clone(),
2581                    m.body.clone(),
2582                    m.is_pending(),
2583                )
2584            })
2585            .collect()
2586    }
2587
2588    struct EmptyView;
2589
2590    impl gpui::Entity for EmptyView {
2591        type Event = ();
2592    }
2593
2594    impl gpui::View for EmptyView {
2595        fn ui_name() -> &'static str {
2596            "empty view"
2597        }
2598
2599        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2600            gpui::Element::boxed(gpui::elements::Empty)
2601        }
2602    }
2603}