rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::open_buffer)
  75            .add_handler(Server::close_buffer)
  76            .add_handler(Server::update_buffer)
  77            .add_handler(Server::buffer_saved)
  78            .add_handler(Server::save_buffer)
  79            .add_handler(Server::get_channels)
  80            .add_handler(Server::get_users)
  81            .add_handler(Server::join_channel)
  82            .add_handler(Server::leave_channel)
  83            .add_handler(Server::send_channel_message)
  84            .add_handler(Server::get_channel_messages);
  85
  86        Arc::new(server)
  87    }
  88
  89    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  90    where
  91        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  92        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  93        M: EnvelopedMessage,
  94    {
  95        let prev_handler = self.handlers.insert(
  96            TypeId::of::<M>(),
  97            Box::new(move |server, envelope| {
  98                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
  99                (handler)(server, *envelope).boxed()
 100            }),
 101        );
 102        if prev_handler.is_some() {
 103            panic!("registered a handler for the same message twice");
 104        }
 105        self
 106    }
 107
 108    pub fn handle_connection(
 109        self: &Arc<Self>,
 110        connection: Connection,
 111        addr: String,
 112        user_id: UserId,
 113        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 114    ) -> impl Future<Output = ()> {
 115        let mut this = self.clone();
 116        async move {
 117            let (connection_id, handle_io, mut incoming_rx) =
 118                this.peer.add_connection(connection).await;
 119
 120            if let Some(send_connection_id) = send_connection_id.as_mut() {
 121                let _ = send_connection_id.send(connection_id).await;
 122            }
 123
 124            this.state_mut().add_connection(connection_id, user_id);
 125            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 126                log::error!("error updating contacts for {:?}: {}", user_id, err);
 127            }
 128
 129            let handle_io = handle_io.fuse();
 130            futures::pin_mut!(handle_io);
 131            loop {
 132                let next_message = incoming_rx.recv().fuse();
 133                futures::pin_mut!(next_message);
 134                futures::select_biased! {
 135                    message = next_message => {
 136                        if let Some(message) = message {
 137                            let start_time = Instant::now();
 138                            log::info!("RPC message received: {}", message.payload_type_name());
 139                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 140                                if let Err(err) = (handler)(this.clone(), message).await {
 141                                    log::error!("error handling message: {:?}", err);
 142                                } else {
 143                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 144                                }
 145
 146                                if let Some(mut notifications) = this.notifications.clone() {
 147                                    let _ = notifications.send(()).await;
 148                                }
 149                            } else {
 150                                log::warn!("unhandled message: {}", message.payload_type_name());
 151                            }
 152                        } else {
 153                            log::info!("rpc connection closed {:?}", addr);
 154                            break;
 155                        }
 156                    }
 157                    handle_io = handle_io => {
 158                        if let Err(err) = handle_io {
 159                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 160                        }
 161                        break;
 162                    }
 163                }
 164            }
 165
 166            if let Err(err) = this.sign_out(connection_id).await {
 167                log::error!("error signing out connection {:?} - {:?}", addr, err);
 168            }
 169        }
 170    }
 171
 172    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 173        self.peer.disconnect(connection_id).await;
 174        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 175
 176        for (project_id, project) in removed_connection.hosted_projects {
 177            if let Some(share) = project.share {
 178                broadcast(
 179                    connection_id,
 180                    share.guests.keys().copied().collect(),
 181                    |conn_id| {
 182                        self.peer
 183                            .send(conn_id, proto::UnshareProject { project_id })
 184                    },
 185                )
 186                .await?;
 187            }
 188        }
 189
 190        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 191            broadcast(connection_id, peer_ids, |conn_id| {
 192                self.peer.send(
 193                    conn_id,
 194                    proto::RemoveProjectCollaborator {
 195                        project_id,
 196                        peer_id: connection_id.0,
 197                    },
 198                )
 199            })
 200            .await?;
 201        }
 202
 203        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 204            .await?;
 205
 206        Ok(())
 207    }
 208
 209    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 210        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 211        Ok(())
 212    }
 213
 214    async fn register_project(
 215        mut self: Arc<Server>,
 216        request: TypedEnvelope<proto::RegisterProject>,
 217    ) -> tide::Result<()> {
 218        let project_id = {
 219            let mut state = self.state_mut();
 220            let user_id = state.user_id_for_connection(request.sender_id)?;
 221            state.register_project(request.sender_id, user_id)
 222        };
 223        self.peer
 224            .respond(
 225                request.receipt(),
 226                proto::RegisterProjectResponse { project_id },
 227            )
 228            .await?;
 229        Ok(())
 230    }
 231
 232    async fn unregister_project(
 233        mut self: Arc<Server>,
 234        request: TypedEnvelope<proto::UnregisterProject>,
 235    ) -> tide::Result<()> {
 236        self.state_mut()
 237            .unregister_project(request.payload.project_id, request.sender_id);
 238        Ok(())
 239    }
 240
 241    async fn share_project(
 242        mut self: Arc<Server>,
 243        request: TypedEnvelope<proto::ShareProject>,
 244    ) -> tide::Result<()> {
 245        self.state_mut()
 246            .share_project(request.payload.project_id, request.sender_id);
 247        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 248        Ok(())
 249    }
 250
 251    async fn unshare_project(
 252        mut self: Arc<Server>,
 253        request: TypedEnvelope<proto::UnshareProject>,
 254    ) -> tide::Result<()> {
 255        let project_id = request.payload.project_id;
 256        let project = self
 257            .state_mut()
 258            .unshare_project(project_id, request.sender_id)?;
 259
 260        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 261            self.peer
 262                .send(conn_id, proto::UnshareProject { project_id })
 263        })
 264        .await?;
 265        self.update_contacts_for_users(&project.authorized_user_ids)
 266            .await?;
 267
 268        Ok(())
 269    }
 270
 271    async fn join_project(
 272        mut self: Arc<Server>,
 273        request: TypedEnvelope<proto::JoinProject>,
 274    ) -> tide::Result<()> {
 275        let project_id = request.payload.project_id;
 276
 277        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 278        let response_data = self
 279            .state_mut()
 280            .join_project(request.sender_id, user_id, project_id)
 281            .and_then(|joined| {
 282                let share = joined.project.share()?;
 283                let peer_count = share.guests.len();
 284                let mut collaborators = Vec::with_capacity(peer_count);
 285                collaborators.push(proto::Collaborator {
 286                    peer_id: joined.project.host_connection_id.0,
 287                    replica_id: 0,
 288                    user_id: joined.project.host_user_id.to_proto(),
 289                });
 290                let worktrees = joined
 291                    .project
 292                    .worktrees
 293                    .iter()
 294                    .filter_map(|(id, worktree)| {
 295                        worktree.share.as_ref().map(|share| proto::Worktree {
 296                            id: *id,
 297                            root_name: worktree.root_name.clone(),
 298                            entries: share.entries.values().cloned().collect(),
 299                        })
 300                    })
 301                    .collect();
 302                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 303                    if *peer_conn_id != request.sender_id {
 304                        collaborators.push(proto::Collaborator {
 305                            peer_id: peer_conn_id.0,
 306                            replica_id: *peer_replica_id as u32,
 307                            user_id: peer_user_id.to_proto(),
 308                        });
 309                    }
 310                }
 311                let response = proto::JoinProjectResponse {
 312                    worktrees,
 313                    replica_id: joined.replica_id as u32,
 314                    collaborators,
 315                };
 316                let connection_ids = joined.project.connection_ids();
 317                let contact_user_ids = joined.project.authorized_user_ids();
 318                Ok((response, connection_ids, contact_user_ids))
 319            });
 320
 321        match response_data {
 322            Ok((response, connection_ids, contact_user_ids)) => {
 323                broadcast(request.sender_id, connection_ids, |conn_id| {
 324                    self.peer.send(
 325                        conn_id,
 326                        proto::AddProjectCollaborator {
 327                            project_id: project_id,
 328                            collaborator: Some(proto::Collaborator {
 329                                peer_id: request.sender_id.0,
 330                                replica_id: response.replica_id,
 331                                user_id: user_id.to_proto(),
 332                            }),
 333                        },
 334                    )
 335                })
 336                .await?;
 337                self.peer.respond(request.receipt(), response).await?;
 338                self.update_contacts_for_users(&contact_user_ids).await?;
 339            }
 340            Err(error) => {
 341                self.peer
 342                    .respond_with_error(
 343                        request.receipt(),
 344                        proto::Error {
 345                            message: error.to_string(),
 346                        },
 347                    )
 348                    .await?;
 349            }
 350        }
 351
 352        Ok(())
 353    }
 354
 355    async fn leave_project(
 356        mut self: Arc<Server>,
 357        request: TypedEnvelope<proto::LeaveProject>,
 358    ) -> tide::Result<()> {
 359        let sender_id = request.sender_id;
 360        let project_id = request.payload.project_id;
 361        let worktree = self.state_mut().leave_project(sender_id, project_id);
 362        if let Some(worktree) = worktree {
 363            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 364                self.peer.send(
 365                    conn_id,
 366                    proto::RemoveProjectCollaborator {
 367                        project_id,
 368                        peer_id: sender_id.0,
 369                    },
 370                )
 371            })
 372            .await?;
 373            self.update_contacts_for_users(&worktree.authorized_user_ids)
 374                .await?;
 375        }
 376        Ok(())
 377    }
 378
 379    async fn register_worktree(
 380        mut self: Arc<Server>,
 381        request: TypedEnvelope<proto::RegisterWorktree>,
 382    ) -> tide::Result<()> {
 383        let receipt = request.receipt();
 384        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 385
 386        let mut contact_user_ids = HashSet::default();
 387        contact_user_ids.insert(host_user_id);
 388        for github_login in request.payload.authorized_logins {
 389            match self.app_state.db.create_user(&github_login, false).await {
 390                Ok(contact_user_id) => {
 391                    contact_user_ids.insert(contact_user_id);
 392                }
 393                Err(err) => {
 394                    let message = err.to_string();
 395                    self.peer
 396                        .respond_with_error(receipt, proto::Error { message })
 397                        .await?;
 398                    return Ok(());
 399                }
 400            }
 401        }
 402
 403        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 404        let ok = self.state_mut().register_worktree(
 405            request.payload.project_id,
 406            request.payload.worktree_id,
 407            Worktree {
 408                authorized_user_ids: contact_user_ids.clone(),
 409                root_name: request.payload.root_name,
 410                share: None,
 411            },
 412        );
 413
 414        if ok {
 415            self.peer.respond(receipt, proto::Ack {}).await?;
 416            self.update_contacts_for_users(&contact_user_ids).await?;
 417        } else {
 418            self.peer
 419                .respond_with_error(
 420                    receipt,
 421                    proto::Error {
 422                        message: NO_SUCH_PROJECT.to_string(),
 423                    },
 424                )
 425                .await?;
 426        }
 427
 428        Ok(())
 429    }
 430
 431    async fn unregister_worktree(
 432        mut self: Arc<Server>,
 433        request: TypedEnvelope<proto::UnregisterWorktree>,
 434    ) -> tide::Result<()> {
 435        let project_id = request.payload.project_id;
 436        let worktree_id = request.payload.worktree_id;
 437        let (worktree, guest_connection_ids) =
 438            self.state_mut()
 439                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 440
 441        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 442            self.peer.send(
 443                conn_id,
 444                proto::UnregisterWorktree {
 445                    project_id,
 446                    worktree_id,
 447                },
 448            )
 449        })
 450        .await?;
 451        self.update_contacts_for_users(&worktree.authorized_user_ids)
 452            .await?;
 453        Ok(())
 454    }
 455
 456    async fn share_worktree(
 457        mut self: Arc<Server>,
 458        mut request: TypedEnvelope<proto::ShareWorktree>,
 459    ) -> tide::Result<()> {
 460        let worktree = request
 461            .payload
 462            .worktree
 463            .as_mut()
 464            .ok_or_else(|| anyhow!("missing worktree"))?;
 465        let entries = mem::take(&mut worktree.entries)
 466            .into_iter()
 467            .map(|entry| (entry.id, entry))
 468            .collect();
 469
 470        let contact_user_ids = self.state_mut().share_worktree(
 471            request.payload.project_id,
 472            worktree.id,
 473            request.sender_id,
 474            entries,
 475        );
 476        if let Some(contact_user_ids) = contact_user_ids {
 477            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 478            self.update_contacts_for_users(&contact_user_ids).await?;
 479        } else {
 480            self.peer
 481                .respond_with_error(
 482                    request.receipt(),
 483                    proto::Error {
 484                        message: "no such worktree".to_string(),
 485                    },
 486                )
 487                .await?;
 488        }
 489        Ok(())
 490    }
 491
 492    async fn update_worktree(
 493        mut self: Arc<Server>,
 494        request: TypedEnvelope<proto::UpdateWorktree>,
 495    ) -> tide::Result<()> {
 496        let connection_ids = self
 497            .state_mut()
 498            .update_worktree(
 499                request.sender_id,
 500                request.payload.project_id,
 501                request.payload.worktree_id,
 502                &request.payload.removed_entries,
 503                &request.payload.updated_entries,
 504            )
 505            .ok_or_else(|| anyhow!("no such worktree"))?;
 506
 507        broadcast(request.sender_id, connection_ids, |connection_id| {
 508            self.peer
 509                .forward_send(request.sender_id, connection_id, request.payload.clone())
 510        })
 511        .await?;
 512
 513        Ok(())
 514    }
 515
 516    async fn open_buffer(
 517        self: Arc<Server>,
 518        request: TypedEnvelope<proto::OpenBuffer>,
 519    ) -> tide::Result<()> {
 520        let receipt = request.receipt();
 521        let host_connection_id = self
 522            .state()
 523            .read_project(request.payload.project_id, request.sender_id)
 524            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 525            .host_connection_id;
 526        let response = self
 527            .peer
 528            .forward_request(request.sender_id, host_connection_id, request.payload)
 529            .await?;
 530        self.peer.respond(receipt, response).await?;
 531        Ok(())
 532    }
 533
 534    async fn close_buffer(
 535        self: Arc<Server>,
 536        request: TypedEnvelope<proto::CloseBuffer>,
 537    ) -> tide::Result<()> {
 538        let host_connection_id = self
 539            .state()
 540            .read_project(request.payload.project_id, request.sender_id)
 541            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 542            .host_connection_id;
 543        self.peer
 544            .forward_send(request.sender_id, host_connection_id, request.payload)
 545            .await?;
 546        Ok(())
 547    }
 548
 549    async fn save_buffer(
 550        self: Arc<Server>,
 551        request: TypedEnvelope<proto::SaveBuffer>,
 552    ) -> tide::Result<()> {
 553        let host;
 554        let guests;
 555        {
 556            let state = self.state();
 557            let project = state
 558                .read_project(request.payload.project_id, request.sender_id)
 559                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 560            host = project.host_connection_id;
 561            guests = project.guest_connection_ids()
 562        }
 563
 564        let sender = request.sender_id;
 565        let receipt = request.receipt();
 566        let response = self
 567            .peer
 568            .forward_request(sender, host, request.payload.clone())
 569            .await?;
 570
 571        broadcast(host, guests, |conn_id| {
 572            let response = response.clone();
 573            let peer = &self.peer;
 574            async move {
 575                if conn_id == sender {
 576                    peer.respond(receipt, response).await
 577                } else {
 578                    peer.forward_send(host, conn_id, response).await
 579                }
 580            }
 581        })
 582        .await?;
 583
 584        Ok(())
 585    }
 586
 587    async fn update_buffer(
 588        self: Arc<Server>,
 589        request: TypedEnvelope<proto::UpdateBuffer>,
 590    ) -> tide::Result<()> {
 591        let receiver_ids = self
 592            .state()
 593            .project_connection_ids(request.payload.project_id, request.sender_id)
 594            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 595        broadcast(request.sender_id, receiver_ids, |connection_id| {
 596            self.peer
 597                .forward_send(request.sender_id, connection_id, request.payload.clone())
 598        })
 599        .await?;
 600        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 601        Ok(())
 602    }
 603
 604    async fn buffer_saved(
 605        self: Arc<Server>,
 606        request: TypedEnvelope<proto::BufferSaved>,
 607    ) -> tide::Result<()> {
 608        let receiver_ids = self
 609            .state()
 610            .project_connection_ids(request.payload.project_id, request.sender_id)
 611            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 612        broadcast(request.sender_id, receiver_ids, |connection_id| {
 613            self.peer
 614                .forward_send(request.sender_id, connection_id, request.payload.clone())
 615        })
 616        .await?;
 617        Ok(())
 618    }
 619
 620    async fn get_channels(
 621        self: Arc<Server>,
 622        request: TypedEnvelope<proto::GetChannels>,
 623    ) -> tide::Result<()> {
 624        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 625        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 626        self.peer
 627            .respond(
 628                request.receipt(),
 629                proto::GetChannelsResponse {
 630                    channels: channels
 631                        .into_iter()
 632                        .map(|chan| proto::Channel {
 633                            id: chan.id.to_proto(),
 634                            name: chan.name,
 635                        })
 636                        .collect(),
 637                },
 638            )
 639            .await?;
 640        Ok(())
 641    }
 642
 643    async fn get_users(
 644        self: Arc<Server>,
 645        request: TypedEnvelope<proto::GetUsers>,
 646    ) -> tide::Result<()> {
 647        let receipt = request.receipt();
 648        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 649        let users = self
 650            .app_state
 651            .db
 652            .get_users_by_ids(user_ids)
 653            .await?
 654            .into_iter()
 655            .map(|user| proto::User {
 656                id: user.id.to_proto(),
 657                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 658                github_login: user.github_login,
 659            })
 660            .collect();
 661        self.peer
 662            .respond(receipt, proto::GetUsersResponse { users })
 663            .await?;
 664        Ok(())
 665    }
 666
 667    async fn update_contacts_for_users<'a>(
 668        self: &Arc<Server>,
 669        user_ids: impl IntoIterator<Item = &'a UserId>,
 670    ) -> tide::Result<()> {
 671        let mut send_futures = Vec::new();
 672
 673        {
 674            let state = self.state();
 675            for user_id in user_ids {
 676                let contacts = state.contacts_for_user(*user_id);
 677                for connection_id in state.connection_ids_for_user(*user_id) {
 678                    send_futures.push(self.peer.send(
 679                        connection_id,
 680                        proto::UpdateContacts {
 681                            contacts: contacts.clone(),
 682                        },
 683                    ));
 684                }
 685            }
 686        }
 687        futures::future::try_join_all(send_futures).await?;
 688
 689        Ok(())
 690    }
 691
 692    async fn join_channel(
 693        mut self: Arc<Self>,
 694        request: TypedEnvelope<proto::JoinChannel>,
 695    ) -> tide::Result<()> {
 696        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 697        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 698        if !self
 699            .app_state
 700            .db
 701            .can_user_access_channel(user_id, channel_id)
 702            .await?
 703        {
 704            Err(anyhow!("access denied"))?;
 705        }
 706
 707        self.state_mut().join_channel(request.sender_id, channel_id);
 708        let messages = self
 709            .app_state
 710            .db
 711            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 712            .await?
 713            .into_iter()
 714            .map(|msg| proto::ChannelMessage {
 715                id: msg.id.to_proto(),
 716                body: msg.body,
 717                timestamp: msg.sent_at.unix_timestamp() as u64,
 718                sender_id: msg.sender_id.to_proto(),
 719                nonce: Some(msg.nonce.as_u128().into()),
 720            })
 721            .collect::<Vec<_>>();
 722        self.peer
 723            .respond(
 724                request.receipt(),
 725                proto::JoinChannelResponse {
 726                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 727                    messages,
 728                },
 729            )
 730            .await?;
 731        Ok(())
 732    }
 733
 734    async fn leave_channel(
 735        mut self: Arc<Self>,
 736        request: TypedEnvelope<proto::LeaveChannel>,
 737    ) -> tide::Result<()> {
 738        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 739        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 740        if !self
 741            .app_state
 742            .db
 743            .can_user_access_channel(user_id, channel_id)
 744            .await?
 745        {
 746            Err(anyhow!("access denied"))?;
 747        }
 748
 749        self.state_mut()
 750            .leave_channel(request.sender_id, channel_id);
 751
 752        Ok(())
 753    }
 754
 755    async fn send_channel_message(
 756        self: Arc<Self>,
 757        request: TypedEnvelope<proto::SendChannelMessage>,
 758    ) -> tide::Result<()> {
 759        let receipt = request.receipt();
 760        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 761        let user_id;
 762        let connection_ids;
 763        {
 764            let state = self.state();
 765            user_id = state.user_id_for_connection(request.sender_id)?;
 766            if let Some(ids) = state.channel_connection_ids(channel_id) {
 767                connection_ids = ids;
 768            } else {
 769                return Ok(());
 770            }
 771        }
 772
 773        // Validate the message body.
 774        let body = request.payload.body.trim().to_string();
 775        if body.len() > MAX_MESSAGE_LEN {
 776            self.peer
 777                .respond_with_error(
 778                    receipt,
 779                    proto::Error {
 780                        message: "message is too long".to_string(),
 781                    },
 782                )
 783                .await?;
 784            return Ok(());
 785        }
 786        if body.is_empty() {
 787            self.peer
 788                .respond_with_error(
 789                    receipt,
 790                    proto::Error {
 791                        message: "message can't be blank".to_string(),
 792                    },
 793                )
 794                .await?;
 795            return Ok(());
 796        }
 797
 798        let timestamp = OffsetDateTime::now_utc();
 799        let nonce = if let Some(nonce) = request.payload.nonce {
 800            nonce
 801        } else {
 802            self.peer
 803                .respond_with_error(
 804                    receipt,
 805                    proto::Error {
 806                        message: "nonce can't be blank".to_string(),
 807                    },
 808                )
 809                .await?;
 810            return Ok(());
 811        };
 812
 813        let message_id = self
 814            .app_state
 815            .db
 816            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 817            .await?
 818            .to_proto();
 819        let message = proto::ChannelMessage {
 820            sender_id: user_id.to_proto(),
 821            id: message_id,
 822            body,
 823            timestamp: timestamp.unix_timestamp() as u64,
 824            nonce: Some(nonce),
 825        };
 826        broadcast(request.sender_id, connection_ids, |conn_id| {
 827            self.peer.send(
 828                conn_id,
 829                proto::ChannelMessageSent {
 830                    channel_id: channel_id.to_proto(),
 831                    message: Some(message.clone()),
 832                },
 833            )
 834        })
 835        .await?;
 836        self.peer
 837            .respond(
 838                receipt,
 839                proto::SendChannelMessageResponse {
 840                    message: Some(message),
 841                },
 842            )
 843            .await?;
 844        Ok(())
 845    }
 846
 847    async fn get_channel_messages(
 848        self: Arc<Self>,
 849        request: TypedEnvelope<proto::GetChannelMessages>,
 850    ) -> tide::Result<()> {
 851        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 852        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 853        if !self
 854            .app_state
 855            .db
 856            .can_user_access_channel(user_id, channel_id)
 857            .await?
 858        {
 859            Err(anyhow!("access denied"))?;
 860        }
 861
 862        let messages = self
 863            .app_state
 864            .db
 865            .get_channel_messages(
 866                channel_id,
 867                MESSAGE_COUNT_PER_PAGE,
 868                Some(MessageId::from_proto(request.payload.before_message_id)),
 869            )
 870            .await?
 871            .into_iter()
 872            .map(|msg| proto::ChannelMessage {
 873                id: msg.id.to_proto(),
 874                body: msg.body,
 875                timestamp: msg.sent_at.unix_timestamp() as u64,
 876                sender_id: msg.sender_id.to_proto(),
 877                nonce: Some(msg.nonce.as_u128().into()),
 878            })
 879            .collect::<Vec<_>>();
 880        self.peer
 881            .respond(
 882                request.receipt(),
 883                proto::GetChannelMessagesResponse {
 884                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 885                    messages,
 886                },
 887            )
 888            .await?;
 889        Ok(())
 890    }
 891
 892    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 893        self.store.read()
 894    }
 895
 896    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 897        self.store.write()
 898    }
 899}
 900
 901pub async fn broadcast<F, T>(
 902    sender_id: ConnectionId,
 903    receiver_ids: Vec<ConnectionId>,
 904    mut f: F,
 905) -> anyhow::Result<()>
 906where
 907    F: FnMut(ConnectionId) -> T,
 908    T: Future<Output = anyhow::Result<()>>,
 909{
 910    let futures = receiver_ids
 911        .into_iter()
 912        .filter(|id| *id != sender_id)
 913        .map(|id| f(id));
 914    futures::future::try_join_all(futures).await?;
 915    Ok(())
 916}
 917
 918pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 919    let server = Server::new(app.state().clone(), rpc.clone(), None);
 920    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 921        let server = server.clone();
 922        async move {
 923            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 924
 925            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 926            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 927            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 928            let client_protocol_version: Option<u32> = request
 929                .header("X-Zed-Protocol-Version")
 930                .and_then(|v| v.as_str().parse().ok());
 931
 932            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 933                return Ok(Response::new(StatusCode::UpgradeRequired));
 934            }
 935
 936            let header = match request.header("Sec-Websocket-Key") {
 937                Some(h) => h.as_str(),
 938                None => return Err(anyhow!("expected sec-websocket-key"))?,
 939            };
 940
 941            let user_id = process_auth_header(&request).await?;
 942
 943            let mut response = Response::new(StatusCode::SwitchingProtocols);
 944            response.insert_header(UPGRADE, "websocket");
 945            response.insert_header(CONNECTION, "Upgrade");
 946            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 947            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 948            response.insert_header("Sec-Websocket-Version", "13");
 949
 950            let http_res: &mut tide::http::Response = response.as_mut();
 951            let upgrade_receiver = http_res.recv_upgrade().await;
 952            let addr = request.remote().unwrap_or("unknown").to_string();
 953            task::spawn(async move {
 954                if let Some(stream) = upgrade_receiver.await {
 955                    server
 956                        .handle_connection(
 957                            Connection::new(
 958                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 959                            ),
 960                            addr,
 961                            user_id,
 962                            None,
 963                        )
 964                        .await;
 965                }
 966            });
 967
 968            Ok(response)
 969        }
 970    });
 971}
 972
 973fn header_contains_ignore_case<T>(
 974    request: &tide::Request<T>,
 975    header_name: HeaderName,
 976    value: &str,
 977) -> bool {
 978    request
 979        .header(header_name)
 980        .map(|h| {
 981            h.as_str()
 982                .split(',')
 983                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 984        })
 985        .unwrap_or(false)
 986}
 987
 988#[cfg(test)]
 989mod tests {
 990    use super::*;
 991    use crate::{
 992        auth,
 993        db::{tests::TestDb, UserId},
 994        github, AppState, Config,
 995    };
 996    use ::rpc::Peer;
 997    use async_std::task;
 998    use gpui::{ModelHandle, TestAppContext};
 999    use parking_lot::Mutex;
1000    use postage::{mpsc, watch};
1001    use rpc::PeerId;
1002    use serde_json::json;
1003    use sqlx::types::time::OffsetDateTime;
1004    use std::{
1005        ops::Deref,
1006        path::Path,
1007        sync::{
1008            atomic::{AtomicBool, Ordering::SeqCst},
1009            Arc,
1010        },
1011        time::Duration,
1012    };
1013    use zed::{
1014        client::{
1015            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1016            EstablishConnectionError, UserStore,
1017        },
1018        editor::{Editor, EditorSettings, Input, MultiBuffer},
1019        fs::{FakeFs, Fs as _},
1020        language::{
1021            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1022            LanguageRegistry, LanguageServerConfig, Point,
1023        },
1024        lsp,
1025        project::Project,
1026    };
1027
1028    #[gpui::test]
1029    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1030        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1031        let lang_registry = Arc::new(LanguageRegistry::new());
1032        let fs = Arc::new(FakeFs::new());
1033        cx_a.foreground().forbid_parking();
1034
1035        // Connect to a server as 2 clients.
1036        let mut server = TestServer::start().await;
1037        let client_a = server.create_client(&mut cx_a, "user_a").await;
1038        let client_b = server.create_client(&mut cx_b, "user_b").await;
1039
1040        // Share a project as client A
1041        fs.insert_tree(
1042            "/a",
1043            json!({
1044                ".zed.toml": r#"collaborators = ["user_b"]"#,
1045                "a.txt": "a-contents",
1046                "b.txt": "b-contents",
1047            }),
1048        )
1049        .await;
1050        let project_a = cx_a.update(|cx| {
1051            Project::local(
1052                client_a.clone(),
1053                client_a.user_store.clone(),
1054                lang_registry.clone(),
1055                fs.clone(),
1056                cx,
1057            )
1058        });
1059        let worktree_a = project_a
1060            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1061            .await
1062            .unwrap();
1063        worktree_a
1064            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1065            .await;
1066        let project_id = project_a
1067            .update(&mut cx_a, |project, _| project.next_remote_id())
1068            .await;
1069        project_a
1070            .update(&mut cx_a, |project, cx| project.share(cx))
1071            .await
1072            .unwrap();
1073
1074        // Join that project as client B
1075        let project_b = Project::remote(
1076            project_id,
1077            client_b.clone(),
1078            client_b.user_store.clone(),
1079            lang_registry.clone(),
1080            fs.clone(),
1081            &mut cx_b.to_async(),
1082        )
1083        .await
1084        .unwrap();
1085        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1086
1087        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1088            assert_eq!(
1089                project
1090                    .collaborators()
1091                    .get(&client_a.peer_id)
1092                    .unwrap()
1093                    .user
1094                    .github_login,
1095                "user_a"
1096            );
1097            project.replica_id()
1098        });
1099        project_a
1100            .condition(&cx_a, |tree, _| {
1101                tree.collaborators()
1102                    .get(&client_b.peer_id)
1103                    .map_or(false, |collaborator| {
1104                        collaborator.replica_id == replica_id_b
1105                            && collaborator.user.github_login == "user_b"
1106                    })
1107            })
1108            .await;
1109
1110        // Open the same file as client B and client A.
1111        let buffer_b = worktree_b
1112            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1113            .await
1114            .unwrap();
1115        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1116        buffer_b.read_with(&cx_b, |buf, cx| {
1117            assert_eq!(buf.read(cx).text(), "b-contents")
1118        });
1119        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1120        let buffer_a = worktree_a
1121            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1122            .await
1123            .unwrap();
1124
1125        let editor_b = cx_b.add_view(window_b, |cx| {
1126            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1127        });
1128        // TODO
1129        // // Create a selection set as client B and see that selection set as client A.
1130        // buffer_a
1131        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1132        //     .await;
1133
1134        // Edit the buffer as client B and see that edit as client A.
1135        editor_b.update(&mut cx_b, |editor, cx| {
1136            editor.handle_input(&Input("ok, ".into()), cx)
1137        });
1138        buffer_a
1139            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1140            .await;
1141
1142        // TODO
1143        // // Remove the selection set as client B, see those selections disappear as client A.
1144        cx_b.update(move |_| drop(editor_b));
1145        // buffer_a
1146        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1147        //     .await;
1148
1149        // Close the buffer as client A, see that the buffer is closed.
1150        cx_a.update(move |_| drop(buffer_a));
1151        worktree_a
1152            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1153            .await;
1154
1155        // Dropping the client B's project removes client B from client A's collaborators.
1156        cx_b.update(move |_| drop(project_b));
1157        project_a
1158            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1159            .await;
1160    }
1161
1162    #[gpui::test]
1163    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1164        cx_b.update(zed::contacts_panel::init);
1165        let lang_registry = Arc::new(LanguageRegistry::new());
1166        let fs = Arc::new(FakeFs::new());
1167        cx_a.foreground().forbid_parking();
1168
1169        // Connect to a server as 2 clients.
1170        let mut server = TestServer::start().await;
1171        let client_a = server.create_client(&mut cx_a, "user_a").await;
1172        let client_b = server.create_client(&mut cx_b, "user_b").await;
1173
1174        // Share a project as client A
1175        fs.insert_tree(
1176            "/a",
1177            json!({
1178                ".zed.toml": r#"collaborators = ["user_b"]"#,
1179                "a.txt": "a-contents",
1180                "b.txt": "b-contents",
1181            }),
1182        )
1183        .await;
1184        let project_a = cx_a.update(|cx| {
1185            Project::local(
1186                client_a.clone(),
1187                client_a.user_store.clone(),
1188                lang_registry.clone(),
1189                fs.clone(),
1190                cx,
1191            )
1192        });
1193        let worktree_a = project_a
1194            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1195            .await
1196            .unwrap();
1197        worktree_a
1198            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1199            .await;
1200        let project_id = project_a
1201            .update(&mut cx_a, |project, _| project.next_remote_id())
1202            .await;
1203        project_a
1204            .update(&mut cx_a, |project, cx| project.share(cx))
1205            .await
1206            .unwrap();
1207
1208        // Join that project as client B
1209        let project_b = Project::remote(
1210            project_id,
1211            client_b.clone(),
1212            client_b.user_store.clone(),
1213            lang_registry.clone(),
1214            fs.clone(),
1215            &mut cx_b.to_async(),
1216        )
1217        .await
1218        .unwrap();
1219
1220        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1221        worktree_b
1222            .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1223            .await
1224            .unwrap();
1225
1226        project_a
1227            .update(&mut cx_a, |project, cx| project.unshare(cx))
1228            .await
1229            .unwrap();
1230        project_b
1231            .condition(&mut cx_b, |project, _| project.is_read_only())
1232            .await;
1233    }
1234
1235    #[gpui::test]
1236    async fn test_propagate_saves_and_fs_changes(
1237        mut cx_a: TestAppContext,
1238        mut cx_b: TestAppContext,
1239        mut cx_c: TestAppContext,
1240    ) {
1241        let lang_registry = Arc::new(LanguageRegistry::new());
1242        let fs = Arc::new(FakeFs::new());
1243        cx_a.foreground().forbid_parking();
1244
1245        // Connect to a server as 3 clients.
1246        let mut server = TestServer::start().await;
1247        let client_a = server.create_client(&mut cx_a, "user_a").await;
1248        let client_b = server.create_client(&mut cx_b, "user_b").await;
1249        let client_c = server.create_client(&mut cx_c, "user_c").await;
1250
1251        // Share a worktree as client A.
1252        fs.insert_tree(
1253            "/a",
1254            json!({
1255                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1256                "file1": "",
1257                "file2": ""
1258            }),
1259        )
1260        .await;
1261        let project_a = cx_a.update(|cx| {
1262            Project::local(
1263                client_a.clone(),
1264                client_a.user_store.clone(),
1265                lang_registry.clone(),
1266                fs.clone(),
1267                cx,
1268            )
1269        });
1270        let worktree_a = project_a
1271            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1272            .await
1273            .unwrap();
1274        worktree_a
1275            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1276            .await;
1277        let project_id = project_a
1278            .update(&mut cx_a, |project, _| project.next_remote_id())
1279            .await;
1280        project_a
1281            .update(&mut cx_a, |project, cx| project.share(cx))
1282            .await
1283            .unwrap();
1284
1285        // Join that worktree as clients B and C.
1286        let project_b = Project::remote(
1287            project_id,
1288            client_b.clone(),
1289            client_b.user_store.clone(),
1290            lang_registry.clone(),
1291            fs.clone(),
1292            &mut cx_b.to_async(),
1293        )
1294        .await
1295        .unwrap();
1296        let project_c = Project::remote(
1297            project_id,
1298            client_c.clone(),
1299            client_c.user_store.clone(),
1300            lang_registry.clone(),
1301            fs.clone(),
1302            &mut cx_c.to_async(),
1303        )
1304        .await
1305        .unwrap();
1306
1307        // Open and edit a buffer as both guests B and C.
1308        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1309        let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1310        let buffer_b = worktree_b
1311            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1312            .await
1313            .unwrap();
1314        let buffer_c = worktree_c
1315            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1316            .await
1317            .unwrap();
1318        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1319        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1320
1321        // Open and edit that buffer as the host.
1322        let buffer_a = worktree_a
1323            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1324            .await
1325            .unwrap();
1326
1327        buffer_a
1328            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1329            .await;
1330        buffer_a.update(&mut cx_a, |buf, cx| {
1331            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1332        });
1333
1334        // Wait for edits to propagate
1335        buffer_a
1336            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1337            .await;
1338        buffer_b
1339            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1340            .await;
1341        buffer_c
1342            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1343            .await;
1344
1345        // Edit the buffer as the host and concurrently save as guest B.
1346        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1347        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1348        save_b.await.unwrap();
1349        assert_eq!(
1350            fs.load("/a/file1".as_ref()).await.unwrap(),
1351            "hi-a, i-am-c, i-am-b, i-am-a"
1352        );
1353        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1354        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1355        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1356
1357        // Make changes on host's file system, see those changes on the guests.
1358        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1359            .await
1360            .unwrap();
1361        fs.insert_file(Path::new("/a/file4"), "4".into())
1362            .await
1363            .unwrap();
1364
1365        worktree_b
1366            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1367            .await;
1368        worktree_c
1369            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1370            .await;
1371        worktree_b.read_with(&cx_b, |tree, _| {
1372            assert_eq!(
1373                tree.paths()
1374                    .map(|p| p.to_string_lossy())
1375                    .collect::<Vec<_>>(),
1376                &[".zed.toml", "file1", "file3", "file4"]
1377            )
1378        });
1379        worktree_c.read_with(&cx_c, |tree, _| {
1380            assert_eq!(
1381                tree.paths()
1382                    .map(|p| p.to_string_lossy())
1383                    .collect::<Vec<_>>(),
1384                &[".zed.toml", "file1", "file3", "file4"]
1385            )
1386        });
1387    }
1388
1389    #[gpui::test]
1390    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1391        cx_a.foreground().forbid_parking();
1392        let lang_registry = Arc::new(LanguageRegistry::new());
1393        let fs = Arc::new(FakeFs::new());
1394
1395        // Connect to a server as 2 clients.
1396        let mut server = TestServer::start().await;
1397        let client_a = server.create_client(&mut cx_a, "user_a").await;
1398        let client_b = server.create_client(&mut cx_b, "user_b").await;
1399
1400        // Share a project as client A
1401        fs.insert_tree(
1402            "/dir",
1403            json!({
1404                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1405                "a.txt": "a-contents",
1406            }),
1407        )
1408        .await;
1409
1410        let project_a = cx_a.update(|cx| {
1411            Project::local(
1412                client_a.clone(),
1413                client_a.user_store.clone(),
1414                lang_registry.clone(),
1415                fs.clone(),
1416                cx,
1417            )
1418        });
1419        let worktree_a = project_a
1420            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1421            .await
1422            .unwrap();
1423        worktree_a
1424            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1425            .await;
1426        let project_id = project_a
1427            .update(&mut cx_a, |project, _| project.next_remote_id())
1428            .await;
1429        project_a
1430            .update(&mut cx_a, |project, cx| project.share(cx))
1431            .await
1432            .unwrap();
1433
1434        // Join that project as client B
1435        let project_b = Project::remote(
1436            project_id,
1437            client_b.clone(),
1438            client_b.user_store.clone(),
1439            lang_registry.clone(),
1440            fs.clone(),
1441            &mut cx_b.to_async(),
1442        )
1443        .await
1444        .unwrap();
1445        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1446
1447        // Open a buffer as client B
1448        let buffer_b = worktree_b
1449            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1450            .await
1451            .unwrap();
1452        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1453
1454        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1455        buffer_b.read_with(&cx_b, |buf, _| {
1456            assert!(buf.is_dirty());
1457            assert!(!buf.has_conflict());
1458        });
1459
1460        buffer_b
1461            .update(&mut cx_b, |buf, cx| buf.save(cx))
1462            .unwrap()
1463            .await
1464            .unwrap();
1465        worktree_b
1466            .condition(&cx_b, |_, cx| {
1467                buffer_b.read(cx).file().unwrap().mtime() != mtime
1468            })
1469            .await;
1470        buffer_b.read_with(&cx_b, |buf, _| {
1471            assert!(!buf.is_dirty());
1472            assert!(!buf.has_conflict());
1473        });
1474
1475        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1476        buffer_b.read_with(&cx_b, |buf, _| {
1477            assert!(buf.is_dirty());
1478            assert!(!buf.has_conflict());
1479        });
1480    }
1481
1482    #[gpui::test]
1483    async fn test_editing_while_guest_opens_buffer(
1484        mut cx_a: TestAppContext,
1485        mut cx_b: TestAppContext,
1486    ) {
1487        cx_a.foreground().forbid_parking();
1488        let lang_registry = Arc::new(LanguageRegistry::new());
1489        let fs = Arc::new(FakeFs::new());
1490
1491        // Connect to a server as 2 clients.
1492        let mut server = TestServer::start().await;
1493        let client_a = server.create_client(&mut cx_a, "user_a").await;
1494        let client_b = server.create_client(&mut cx_b, "user_b").await;
1495
1496        // Share a project as client A
1497        fs.insert_tree(
1498            "/dir",
1499            json!({
1500                ".zed.toml": r#"collaborators = ["user_b"]"#,
1501                "a.txt": "a-contents",
1502            }),
1503        )
1504        .await;
1505        let project_a = cx_a.update(|cx| {
1506            Project::local(
1507                client_a.clone(),
1508                client_a.user_store.clone(),
1509                lang_registry.clone(),
1510                fs.clone(),
1511                cx,
1512            )
1513        });
1514        let worktree_a = project_a
1515            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1516            .await
1517            .unwrap();
1518        worktree_a
1519            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1520            .await;
1521        let project_id = project_a
1522            .update(&mut cx_a, |project, _| project.next_remote_id())
1523            .await;
1524        project_a
1525            .update(&mut cx_a, |project, cx| project.share(cx))
1526            .await
1527            .unwrap();
1528
1529        // Join that project as client B
1530        let project_b = Project::remote(
1531            project_id,
1532            client_b.clone(),
1533            client_b.user_store.clone(),
1534            lang_registry.clone(),
1535            fs.clone(),
1536            &mut cx_b.to_async(),
1537        )
1538        .await
1539        .unwrap();
1540        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1541
1542        // Open a buffer as client A
1543        let buffer_a = worktree_a
1544            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1545            .await
1546            .unwrap();
1547
1548        // Start opening the same buffer as client B
1549        let buffer_b = cx_b
1550            .background()
1551            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1552        task::yield_now().await;
1553
1554        // Edit the buffer as client A while client B is still opening it.
1555        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1556
1557        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1558        let buffer_b = buffer_b.await.unwrap();
1559        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1560    }
1561
1562    #[gpui::test]
1563    async fn test_leaving_worktree_while_opening_buffer(
1564        mut cx_a: TestAppContext,
1565        mut cx_b: TestAppContext,
1566    ) {
1567        cx_a.foreground().forbid_parking();
1568        let lang_registry = Arc::new(LanguageRegistry::new());
1569        let fs = Arc::new(FakeFs::new());
1570
1571        // Connect to a server as 2 clients.
1572        let mut server = TestServer::start().await;
1573        let client_a = server.create_client(&mut cx_a, "user_a").await;
1574        let client_b = server.create_client(&mut cx_b, "user_b").await;
1575
1576        // Share a project as client A
1577        fs.insert_tree(
1578            "/dir",
1579            json!({
1580                ".zed.toml": r#"collaborators = ["user_b"]"#,
1581                "a.txt": "a-contents",
1582            }),
1583        )
1584        .await;
1585        let project_a = cx_a.update(|cx| {
1586            Project::local(
1587                client_a.clone(),
1588                client_a.user_store.clone(),
1589                lang_registry.clone(),
1590                fs.clone(),
1591                cx,
1592            )
1593        });
1594        let worktree_a = project_a
1595            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1596            .await
1597            .unwrap();
1598        worktree_a
1599            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1600            .await;
1601        let project_id = project_a
1602            .update(&mut cx_a, |project, _| project.next_remote_id())
1603            .await;
1604        project_a
1605            .update(&mut cx_a, |project, cx| project.share(cx))
1606            .await
1607            .unwrap();
1608
1609        // Join that project as client B
1610        let project_b = Project::remote(
1611            project_id,
1612            client_b.clone(),
1613            client_b.user_store.clone(),
1614            lang_registry.clone(),
1615            fs.clone(),
1616            &mut cx_b.to_async(),
1617        )
1618        .await
1619        .unwrap();
1620        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1621
1622        // See that a guest has joined as client A.
1623        project_a
1624            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1625            .await;
1626
1627        // Begin opening a buffer as client B, but leave the project before the open completes.
1628        let buffer_b = cx_b
1629            .background()
1630            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1631        cx_b.update(|_| drop(worktree_b));
1632        drop(buffer_b);
1633
1634        // See that the guest has left.
1635        project_a
1636            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1637            .await;
1638    }
1639
1640    #[gpui::test]
1641    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1642        cx_a.foreground().forbid_parking();
1643        let lang_registry = Arc::new(LanguageRegistry::new());
1644        let fs = Arc::new(FakeFs::new());
1645
1646        // Connect to a server as 2 clients.
1647        let mut server = TestServer::start().await;
1648        let client_a = server.create_client(&mut cx_a, "user_a").await;
1649        let client_b = server.create_client(&mut cx_b, "user_b").await;
1650
1651        // Share a project as client A
1652        fs.insert_tree(
1653            "/a",
1654            json!({
1655                ".zed.toml": r#"collaborators = ["user_b"]"#,
1656                "a.txt": "a-contents",
1657                "b.txt": "b-contents",
1658            }),
1659        )
1660        .await;
1661        let project_a = cx_a.update(|cx| {
1662            Project::local(
1663                client_a.clone(),
1664                client_a.user_store.clone(),
1665                lang_registry.clone(),
1666                fs.clone(),
1667                cx,
1668            )
1669        });
1670        let worktree_a = project_a
1671            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1672            .await
1673            .unwrap();
1674        worktree_a
1675            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1676            .await;
1677        let project_id = project_a
1678            .update(&mut cx_a, |project, _| project.next_remote_id())
1679            .await;
1680        project_a
1681            .update(&mut cx_a, |project, cx| project.share(cx))
1682            .await
1683            .unwrap();
1684
1685        // Join that project as client B
1686        let _project_b = Project::remote(
1687            project_id,
1688            client_b.clone(),
1689            client_b.user_store.clone(),
1690            lang_registry.clone(),
1691            fs.clone(),
1692            &mut cx_b.to_async(),
1693        )
1694        .await
1695        .unwrap();
1696
1697        // See that a guest has joined as client A.
1698        project_a
1699            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1700            .await;
1701
1702        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1703        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1704        project_a
1705            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1706            .await;
1707    }
1708
1709    #[gpui::test]
1710    async fn test_collaborating_with_diagnostics(
1711        mut cx_a: TestAppContext,
1712        mut cx_b: TestAppContext,
1713    ) {
1714        cx_a.foreground().forbid_parking();
1715        let mut lang_registry = Arc::new(LanguageRegistry::new());
1716        let fs = Arc::new(FakeFs::new());
1717
1718        // Set up a fake language server.
1719        let (language_server_config, mut fake_language_server) =
1720            LanguageServerConfig::fake(cx_a.background()).await;
1721        Arc::get_mut(&mut lang_registry)
1722            .unwrap()
1723            .add(Arc::new(Language::new(
1724                LanguageConfig {
1725                    name: "Rust".to_string(),
1726                    path_suffixes: vec!["rs".to_string()],
1727                    language_server: Some(language_server_config),
1728                    ..Default::default()
1729                },
1730                Some(tree_sitter_rust::language()),
1731            )));
1732
1733        // Connect to a server as 2 clients.
1734        let mut server = TestServer::start().await;
1735        let client_a = server.create_client(&mut cx_a, "user_a").await;
1736        let client_b = server.create_client(&mut cx_b, "user_b").await;
1737
1738        // Share a project as client A
1739        fs.insert_tree(
1740            "/a",
1741            json!({
1742                ".zed.toml": r#"collaborators = ["user_b"]"#,
1743                "a.rs": "let one = two",
1744                "other.rs": "",
1745            }),
1746        )
1747        .await;
1748        let project_a = cx_a.update(|cx| {
1749            Project::local(
1750                client_a.clone(),
1751                client_a.user_store.clone(),
1752                lang_registry.clone(),
1753                fs.clone(),
1754                cx,
1755            )
1756        });
1757        let worktree_a = project_a
1758            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1759            .await
1760            .unwrap();
1761        worktree_a
1762            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1763            .await;
1764        let project_id = project_a
1765            .update(&mut cx_a, |project, _| project.next_remote_id())
1766            .await;
1767        project_a
1768            .update(&mut cx_a, |project, cx| project.share(cx))
1769            .await
1770            .unwrap();
1771
1772        // Cause the language server to start.
1773        let _ = cx_a
1774            .background()
1775            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1776                worktree.open_buffer("other.rs", cx)
1777            }))
1778            .await
1779            .unwrap();
1780
1781        // Simulate a language server reporting errors for a file.
1782        fake_language_server
1783            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1784                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1785                version: None,
1786                diagnostics: vec![
1787                    lsp::Diagnostic {
1788                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1789                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1790                        message: "message 1".to_string(),
1791                        ..Default::default()
1792                    },
1793                    lsp::Diagnostic {
1794                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1795                        range: lsp::Range::new(
1796                            lsp::Position::new(0, 10),
1797                            lsp::Position::new(0, 13),
1798                        ),
1799                        message: "message 2".to_string(),
1800                        ..Default::default()
1801                    },
1802                ],
1803            })
1804            .await;
1805
1806        // Join the worktree as client B.
1807        let project_b = Project::remote(
1808            project_id,
1809            client_b.clone(),
1810            client_b.user_store.clone(),
1811            lang_registry.clone(),
1812            fs.clone(),
1813            &mut cx_b.to_async(),
1814        )
1815        .await
1816        .unwrap();
1817        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1818
1819        // Open the file with the errors.
1820        let buffer_b = cx_b
1821            .background()
1822            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1823            .await
1824            .unwrap();
1825
1826        buffer_b.read_with(&cx_b, |buffer, _| {
1827            assert_eq!(
1828                buffer
1829                    .snapshot()
1830                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1831                    .collect::<Vec<_>>(),
1832                &[
1833                    DiagnosticEntry {
1834                        range: Point::new(0, 4)..Point::new(0, 7),
1835                        diagnostic: Diagnostic {
1836                            group_id: 0,
1837                            message: "message 1".to_string(),
1838                            severity: lsp::DiagnosticSeverity::ERROR,
1839                            is_primary: true,
1840                            ..Default::default()
1841                        }
1842                    },
1843                    DiagnosticEntry {
1844                        range: Point::new(0, 10)..Point::new(0, 13),
1845                        diagnostic: Diagnostic {
1846                            group_id: 1,
1847                            severity: lsp::DiagnosticSeverity::WARNING,
1848                            message: "message 2".to_string(),
1849                            is_primary: true,
1850                            ..Default::default()
1851                        }
1852                    }
1853                ]
1854            );
1855        });
1856    }
1857
1858    #[gpui::test]
1859    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1860        cx_a.foreground().forbid_parking();
1861
1862        // Connect to a server as 2 clients.
1863        let mut server = TestServer::start().await;
1864        let client_a = server.create_client(&mut cx_a, "user_a").await;
1865        let client_b = server.create_client(&mut cx_b, "user_b").await;
1866
1867        // Create an org that includes these 2 users.
1868        let db = &server.app_state.db;
1869        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1870        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1871            .await
1872            .unwrap();
1873        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1874            .await
1875            .unwrap();
1876
1877        // Create a channel that includes all the users.
1878        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1879        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1880            .await
1881            .unwrap();
1882        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1883            .await
1884            .unwrap();
1885        db.create_channel_message(
1886            channel_id,
1887            client_b.current_user_id(&cx_b),
1888            "hello A, it's B.",
1889            OffsetDateTime::now_utc(),
1890            1,
1891        )
1892        .await
1893        .unwrap();
1894
1895        let channels_a = cx_a
1896            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1897        channels_a
1898            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1899            .await;
1900        channels_a.read_with(&cx_a, |list, _| {
1901            assert_eq!(
1902                list.available_channels().unwrap(),
1903                &[ChannelDetails {
1904                    id: channel_id.to_proto(),
1905                    name: "test-channel".to_string()
1906                }]
1907            )
1908        });
1909        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1910            this.get_channel(channel_id.to_proto(), cx).unwrap()
1911        });
1912        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1913        channel_a
1914            .condition(&cx_a, |channel, _| {
1915                channel_messages(channel)
1916                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1917            })
1918            .await;
1919
1920        let channels_b = cx_b
1921            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1922        channels_b
1923            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1924            .await;
1925        channels_b.read_with(&cx_b, |list, _| {
1926            assert_eq!(
1927                list.available_channels().unwrap(),
1928                &[ChannelDetails {
1929                    id: channel_id.to_proto(),
1930                    name: "test-channel".to_string()
1931                }]
1932            )
1933        });
1934
1935        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1936            this.get_channel(channel_id.to_proto(), cx).unwrap()
1937        });
1938        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1939        channel_b
1940            .condition(&cx_b, |channel, _| {
1941                channel_messages(channel)
1942                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1943            })
1944            .await;
1945
1946        channel_a
1947            .update(&mut cx_a, |channel, cx| {
1948                channel
1949                    .send_message("oh, hi B.".to_string(), cx)
1950                    .unwrap()
1951                    .detach();
1952                let task = channel.send_message("sup".to_string(), cx).unwrap();
1953                assert_eq!(
1954                    channel_messages(channel),
1955                    &[
1956                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1957                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1958                        ("user_a".to_string(), "sup".to_string(), true)
1959                    ]
1960                );
1961                task
1962            })
1963            .await
1964            .unwrap();
1965
1966        channel_b
1967            .condition(&cx_b, |channel, _| {
1968                channel_messages(channel)
1969                    == [
1970                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1971                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1972                        ("user_a".to_string(), "sup".to_string(), false),
1973                    ]
1974            })
1975            .await;
1976
1977        assert_eq!(
1978            server
1979                .state()
1980                .await
1981                .channel(channel_id)
1982                .unwrap()
1983                .connection_ids
1984                .len(),
1985            2
1986        );
1987        cx_b.update(|_| drop(channel_b));
1988        server
1989            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1990            .await;
1991
1992        cx_a.update(|_| drop(channel_a));
1993        server
1994            .condition(|state| state.channel(channel_id).is_none())
1995            .await;
1996    }
1997
1998    #[gpui::test]
1999    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2000        cx_a.foreground().forbid_parking();
2001
2002        let mut server = TestServer::start().await;
2003        let client_a = server.create_client(&mut cx_a, "user_a").await;
2004
2005        let db = &server.app_state.db;
2006        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2007        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2008        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2009            .await
2010            .unwrap();
2011        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2012            .await
2013            .unwrap();
2014
2015        let channels_a = cx_a
2016            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2017        channels_a
2018            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2019            .await;
2020        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2021            this.get_channel(channel_id.to_proto(), cx).unwrap()
2022        });
2023
2024        // Messages aren't allowed to be too long.
2025        channel_a
2026            .update(&mut cx_a, |channel, cx| {
2027                let long_body = "this is long.\n".repeat(1024);
2028                channel.send_message(long_body, cx).unwrap()
2029            })
2030            .await
2031            .unwrap_err();
2032
2033        // Messages aren't allowed to be blank.
2034        channel_a.update(&mut cx_a, |channel, cx| {
2035            channel.send_message(String::new(), cx).unwrap_err()
2036        });
2037
2038        // Leading and trailing whitespace are trimmed.
2039        channel_a
2040            .update(&mut cx_a, |channel, cx| {
2041                channel
2042                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2043                    .unwrap()
2044            })
2045            .await
2046            .unwrap();
2047        assert_eq!(
2048            db.get_channel_messages(channel_id, 10, None)
2049                .await
2050                .unwrap()
2051                .iter()
2052                .map(|m| &m.body)
2053                .collect::<Vec<_>>(),
2054            &["surrounded by whitespace"]
2055        );
2056    }
2057
2058    #[gpui::test]
2059    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2060        cx_a.foreground().forbid_parking();
2061
2062        // Connect to a server as 2 clients.
2063        let mut server = TestServer::start().await;
2064        let client_a = server.create_client(&mut cx_a, "user_a").await;
2065        let client_b = server.create_client(&mut cx_b, "user_b").await;
2066        let mut status_b = client_b.status();
2067
2068        // Create an org that includes these 2 users.
2069        let db = &server.app_state.db;
2070        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2071        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2072            .await
2073            .unwrap();
2074        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2075            .await
2076            .unwrap();
2077
2078        // Create a channel that includes all the users.
2079        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2080        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2081            .await
2082            .unwrap();
2083        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2084            .await
2085            .unwrap();
2086        db.create_channel_message(
2087            channel_id,
2088            client_b.current_user_id(&cx_b),
2089            "hello A, it's B.",
2090            OffsetDateTime::now_utc(),
2091            2,
2092        )
2093        .await
2094        .unwrap();
2095
2096        let channels_a = cx_a
2097            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2098        channels_a
2099            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2100            .await;
2101
2102        channels_a.read_with(&cx_a, |list, _| {
2103            assert_eq!(
2104                list.available_channels().unwrap(),
2105                &[ChannelDetails {
2106                    id: channel_id.to_proto(),
2107                    name: "test-channel".to_string()
2108                }]
2109            )
2110        });
2111        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2112            this.get_channel(channel_id.to_proto(), cx).unwrap()
2113        });
2114        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2115        channel_a
2116            .condition(&cx_a, |channel, _| {
2117                channel_messages(channel)
2118                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2119            })
2120            .await;
2121
2122        let channels_b = cx_b
2123            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2124        channels_b
2125            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2126            .await;
2127        channels_b.read_with(&cx_b, |list, _| {
2128            assert_eq!(
2129                list.available_channels().unwrap(),
2130                &[ChannelDetails {
2131                    id: channel_id.to_proto(),
2132                    name: "test-channel".to_string()
2133                }]
2134            )
2135        });
2136
2137        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2138            this.get_channel(channel_id.to_proto(), cx).unwrap()
2139        });
2140        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2141        channel_b
2142            .condition(&cx_b, |channel, _| {
2143                channel_messages(channel)
2144                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2145            })
2146            .await;
2147
2148        // Disconnect client B, ensuring we can still access its cached channel data.
2149        server.forbid_connections();
2150        server.disconnect_client(client_b.current_user_id(&cx_b));
2151        while !matches!(
2152            status_b.recv().await,
2153            Some(client::Status::ReconnectionError { .. })
2154        ) {}
2155
2156        channels_b.read_with(&cx_b, |channels, _| {
2157            assert_eq!(
2158                channels.available_channels().unwrap(),
2159                [ChannelDetails {
2160                    id: channel_id.to_proto(),
2161                    name: "test-channel".to_string()
2162                }]
2163            )
2164        });
2165        channel_b.read_with(&cx_b, |channel, _| {
2166            assert_eq!(
2167                channel_messages(channel),
2168                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2169            )
2170        });
2171
2172        // Send a message from client B while it is disconnected.
2173        channel_b
2174            .update(&mut cx_b, |channel, cx| {
2175                let task = channel
2176                    .send_message("can you see this?".to_string(), cx)
2177                    .unwrap();
2178                assert_eq!(
2179                    channel_messages(channel),
2180                    &[
2181                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2182                        ("user_b".to_string(), "can you see this?".to_string(), true)
2183                    ]
2184                );
2185                task
2186            })
2187            .await
2188            .unwrap_err();
2189
2190        // Send a message from client A while B is disconnected.
2191        channel_a
2192            .update(&mut cx_a, |channel, cx| {
2193                channel
2194                    .send_message("oh, hi B.".to_string(), cx)
2195                    .unwrap()
2196                    .detach();
2197                let task = channel.send_message("sup".to_string(), cx).unwrap();
2198                assert_eq!(
2199                    channel_messages(channel),
2200                    &[
2201                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2202                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2203                        ("user_a".to_string(), "sup".to_string(), true)
2204                    ]
2205                );
2206                task
2207            })
2208            .await
2209            .unwrap();
2210
2211        // Give client B a chance to reconnect.
2212        server.allow_connections();
2213        cx_b.foreground().advance_clock(Duration::from_secs(10));
2214
2215        // Verify that B sees the new messages upon reconnection, as well as the message client B
2216        // sent while offline.
2217        channel_b
2218            .condition(&cx_b, |channel, _| {
2219                channel_messages(channel)
2220                    == [
2221                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2222                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2223                        ("user_a".to_string(), "sup".to_string(), false),
2224                        ("user_b".to_string(), "can you see this?".to_string(), false),
2225                    ]
2226            })
2227            .await;
2228
2229        // Ensure client A and B can communicate normally after reconnection.
2230        channel_a
2231            .update(&mut cx_a, |channel, cx| {
2232                channel.send_message("you online?".to_string(), cx).unwrap()
2233            })
2234            .await
2235            .unwrap();
2236        channel_b
2237            .condition(&cx_b, |channel, _| {
2238                channel_messages(channel)
2239                    == [
2240                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2241                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2242                        ("user_a".to_string(), "sup".to_string(), false),
2243                        ("user_b".to_string(), "can you see this?".to_string(), false),
2244                        ("user_a".to_string(), "you online?".to_string(), false),
2245                    ]
2246            })
2247            .await;
2248
2249        channel_b
2250            .update(&mut cx_b, |channel, cx| {
2251                channel.send_message("yep".to_string(), cx).unwrap()
2252            })
2253            .await
2254            .unwrap();
2255        channel_a
2256            .condition(&cx_a, |channel, _| {
2257                channel_messages(channel)
2258                    == [
2259                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2260                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2261                        ("user_a".to_string(), "sup".to_string(), false),
2262                        ("user_b".to_string(), "can you see this?".to_string(), false),
2263                        ("user_a".to_string(), "you online?".to_string(), false),
2264                        ("user_b".to_string(), "yep".to_string(), false),
2265                    ]
2266            })
2267            .await;
2268    }
2269
2270    #[gpui::test]
2271    async fn test_contacts(
2272        mut cx_a: TestAppContext,
2273        mut cx_b: TestAppContext,
2274        mut cx_c: TestAppContext,
2275    ) {
2276        cx_a.foreground().forbid_parking();
2277        let lang_registry = Arc::new(LanguageRegistry::new());
2278        let fs = Arc::new(FakeFs::new());
2279
2280        // Connect to a server as 3 clients.
2281        let mut server = TestServer::start().await;
2282        let client_a = server.create_client(&mut cx_a, "user_a").await;
2283        let client_b = server.create_client(&mut cx_b, "user_b").await;
2284        let client_c = server.create_client(&mut cx_c, "user_c").await;
2285
2286        // Share a worktree as client A.
2287        fs.insert_tree(
2288            "/a",
2289            json!({
2290                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2291            }),
2292        )
2293        .await;
2294
2295        let project_a = cx_a.update(|cx| {
2296            Project::local(
2297                client_a.clone(),
2298                client_a.user_store.clone(),
2299                lang_registry.clone(),
2300                fs.clone(),
2301                cx,
2302            )
2303        });
2304        let worktree_a = project_a
2305            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2306            .await
2307            .unwrap();
2308        worktree_a
2309            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2310            .await;
2311
2312        client_a
2313            .user_store
2314            .condition(&cx_a, |user_store, _| {
2315                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2316            })
2317            .await;
2318        client_b
2319            .user_store
2320            .condition(&cx_b, |user_store, _| {
2321                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2322            })
2323            .await;
2324        client_c
2325            .user_store
2326            .condition(&cx_c, |user_store, _| {
2327                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2328            })
2329            .await;
2330
2331        let project_id = project_a
2332            .update(&mut cx_a, |project, _| project.next_remote_id())
2333            .await;
2334        project_a
2335            .update(&mut cx_a, |project, cx| project.share(cx))
2336            .await
2337            .unwrap();
2338
2339        let _project_b = Project::remote(
2340            project_id,
2341            client_b.clone(),
2342            client_b.user_store.clone(),
2343            lang_registry.clone(),
2344            fs.clone(),
2345            &mut cx_b.to_async(),
2346        )
2347        .await
2348        .unwrap();
2349
2350        client_a
2351            .user_store
2352            .condition(&cx_a, |user_store, _| {
2353                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2354            })
2355            .await;
2356        client_b
2357            .user_store
2358            .condition(&cx_b, |user_store, _| {
2359                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2360            })
2361            .await;
2362        client_c
2363            .user_store
2364            .condition(&cx_c, |user_store, _| {
2365                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2366            })
2367            .await;
2368
2369        project_a
2370            .condition(&cx_a, |project, _| {
2371                project.collaborators().contains_key(&client_b.peer_id)
2372            })
2373            .await;
2374
2375        cx_a.update(move |_| drop(project_a));
2376        client_a
2377            .user_store
2378            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2379            .await;
2380        client_b
2381            .user_store
2382            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2383            .await;
2384        client_c
2385            .user_store
2386            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2387            .await;
2388
2389        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2390            user_store
2391                .contacts()
2392                .iter()
2393                .map(|contact| {
2394                    let worktrees = contact
2395                        .projects
2396                        .iter()
2397                        .map(|p| {
2398                            (
2399                                p.worktree_root_names[0].as_str(),
2400                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2401                            )
2402                        })
2403                        .collect();
2404                    (contact.user.github_login.as_str(), worktrees)
2405                })
2406                .collect()
2407        }
2408    }
2409
2410    struct TestServer {
2411        peer: Arc<Peer>,
2412        app_state: Arc<AppState>,
2413        server: Arc<Server>,
2414        notifications: mpsc::Receiver<()>,
2415        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2416        forbid_connections: Arc<AtomicBool>,
2417        _test_db: TestDb,
2418    }
2419
2420    impl TestServer {
2421        async fn start() -> Self {
2422            let test_db = TestDb::new();
2423            let app_state = Self::build_app_state(&test_db).await;
2424            let peer = Peer::new();
2425            let notifications = mpsc::channel(128);
2426            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2427            Self {
2428                peer,
2429                app_state,
2430                server,
2431                notifications: notifications.1,
2432                connection_killers: Default::default(),
2433                forbid_connections: Default::default(),
2434                _test_db: test_db,
2435            }
2436        }
2437
2438        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2439            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2440            let client_name = name.to_string();
2441            let mut client = Client::new();
2442            let server = self.server.clone();
2443            let connection_killers = self.connection_killers.clone();
2444            let forbid_connections = self.forbid_connections.clone();
2445            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2446
2447            Arc::get_mut(&mut client)
2448                .unwrap()
2449                .override_authenticate(move |cx| {
2450                    cx.spawn(|_| async move {
2451                        let access_token = "the-token".to_string();
2452                        Ok(Credentials {
2453                            user_id: user_id.0 as u64,
2454                            access_token,
2455                        })
2456                    })
2457                })
2458                .override_establish_connection(move |credentials, cx| {
2459                    assert_eq!(credentials.user_id, user_id.0 as u64);
2460                    assert_eq!(credentials.access_token, "the-token");
2461
2462                    let server = server.clone();
2463                    let connection_killers = connection_killers.clone();
2464                    let forbid_connections = forbid_connections.clone();
2465                    let client_name = client_name.clone();
2466                    let connection_id_tx = connection_id_tx.clone();
2467                    cx.spawn(move |cx| async move {
2468                        if forbid_connections.load(SeqCst) {
2469                            Err(EstablishConnectionError::other(anyhow!(
2470                                "server is forbidding connections"
2471                            )))
2472                        } else {
2473                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2474                            connection_killers.lock().insert(user_id, kill_conn);
2475                            cx.background()
2476                                .spawn(server.handle_connection(
2477                                    server_conn,
2478                                    client_name,
2479                                    user_id,
2480                                    Some(connection_id_tx),
2481                                ))
2482                                .detach();
2483                            Ok(client_conn)
2484                        }
2485                    })
2486                });
2487
2488            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2489            client
2490                .authenticate_and_connect(&cx.to_async())
2491                .await
2492                .unwrap();
2493
2494            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2495            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2496            let mut authed_user =
2497                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2498            while authed_user.recv().await.unwrap().is_none() {}
2499
2500            TestClient {
2501                client,
2502                peer_id,
2503                user_store,
2504            }
2505        }
2506
2507        fn disconnect_client(&self, user_id: UserId) {
2508            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2509                let _ = kill_conn.try_send(Some(()));
2510            }
2511        }
2512
2513        fn forbid_connections(&self) {
2514            self.forbid_connections.store(true, SeqCst);
2515        }
2516
2517        fn allow_connections(&self) {
2518            self.forbid_connections.store(false, SeqCst);
2519        }
2520
2521        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2522            let mut config = Config::default();
2523            config.session_secret = "a".repeat(32);
2524            config.database_url = test_db.url.clone();
2525            let github_client = github::AppClient::test();
2526            Arc::new(AppState {
2527                db: test_db.db().clone(),
2528                handlebars: Default::default(),
2529                auth_client: auth::build_client("", ""),
2530                repo_client: github::RepoClient::test(&github_client),
2531                github_client,
2532                config,
2533            })
2534        }
2535
2536        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2537            self.server.store.read()
2538        }
2539
2540        async fn condition<F>(&mut self, mut predicate: F)
2541        where
2542            F: FnMut(&Store) -> bool,
2543        {
2544            async_std::future::timeout(Duration::from_millis(500), async {
2545                while !(predicate)(&*self.server.store.read()) {
2546                    self.notifications.recv().await;
2547                }
2548            })
2549            .await
2550            .expect("condition timed out");
2551        }
2552    }
2553
2554    impl Drop for TestServer {
2555        fn drop(&mut self) {
2556            task::block_on(self.peer.reset());
2557        }
2558    }
2559
2560    struct TestClient {
2561        client: Arc<Client>,
2562        pub peer_id: PeerId,
2563        pub user_store: ModelHandle<UserStore>,
2564    }
2565
2566    impl Deref for TestClient {
2567        type Target = Arc<Client>;
2568
2569        fn deref(&self) -> &Self::Target {
2570            &self.client
2571        }
2572    }
2573
2574    impl TestClient {
2575        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2576            UserId::from_proto(
2577                self.user_store
2578                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2579            )
2580        }
2581    }
2582
2583    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2584        channel
2585            .messages()
2586            .cursor::<()>()
2587            .map(|m| {
2588                (
2589                    m.sender.github_login.clone(),
2590                    m.body.clone(),
2591                    m.is_pending(),
2592                )
2593            })
2594            .collect()
2595    }
2596
2597    struct EmptyView;
2598
2599    impl gpui::Entity for EmptyView {
2600        type Event = ();
2601    }
2602
2603    impl gpui::View for EmptyView {
2604        fn ui_name() -> &'static str {
2605            "empty view"
2606        }
2607
2608        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2609            gpui::Element::boxed(gpui::elements::Empty)
2610        }
2611    }
2612}