rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::open_buffer)
  75            .add_handler(Server::close_buffer)
  76            .add_handler(Server::update_buffer)
  77            .add_handler(Server::buffer_saved)
  78            .add_handler(Server::save_buffer)
  79            .add_handler(Server::get_channels)
  80            .add_handler(Server::get_users)
  81            .add_handler(Server::join_channel)
  82            .add_handler(Server::leave_channel)
  83            .add_handler(Server::send_channel_message)
  84            .add_handler(Server::get_channel_messages);
  85
  86        Arc::new(server)
  87    }
  88
  89    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  90    where
  91        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  92        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  93        M: EnvelopedMessage,
  94    {
  95        let prev_handler = self.handlers.insert(
  96            TypeId::of::<M>(),
  97            Box::new(move |server, envelope| {
  98                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
  99                (handler)(server, *envelope).boxed()
 100            }),
 101        );
 102        if prev_handler.is_some() {
 103            panic!("registered a handler for the same message twice");
 104        }
 105        self
 106    }
 107
 108    pub fn handle_connection(
 109        self: &Arc<Self>,
 110        connection: Connection,
 111        addr: String,
 112        user_id: UserId,
 113        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 114    ) -> impl Future<Output = ()> {
 115        let mut this = self.clone();
 116        async move {
 117            let (connection_id, handle_io, mut incoming_rx) =
 118                this.peer.add_connection(connection).await;
 119
 120            if let Some(send_connection_id) = send_connection_id.as_mut() {
 121                let _ = send_connection_id.send(connection_id).await;
 122            }
 123
 124            this.state_mut().add_connection(connection_id, user_id);
 125            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 126                log::error!("error updating contacts for {:?}: {}", user_id, err);
 127            }
 128
 129            let handle_io = handle_io.fuse();
 130            futures::pin_mut!(handle_io);
 131            loop {
 132                let next_message = incoming_rx.recv().fuse();
 133                futures::pin_mut!(next_message);
 134                futures::select_biased! {
 135                    message = next_message => {
 136                        if let Some(message) = message {
 137                            let start_time = Instant::now();
 138                            log::info!("RPC message received: {}", message.payload_type_name());
 139                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 140                                if let Err(err) = (handler)(this.clone(), message).await {
 141                                    log::error!("error handling message: {:?}", err);
 142                                } else {
 143                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 144                                }
 145
 146                                if let Some(mut notifications) = this.notifications.clone() {
 147                                    let _ = notifications.send(()).await;
 148                                }
 149                            } else {
 150                                log::warn!("unhandled message: {}", message.payload_type_name());
 151                            }
 152                        } else {
 153                            log::info!("rpc connection closed {:?}", addr);
 154                            break;
 155                        }
 156                    }
 157                    handle_io = handle_io => {
 158                        if let Err(err) = handle_io {
 159                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 160                        }
 161                        break;
 162                    }
 163                }
 164            }
 165
 166            if let Err(err) = this.sign_out(connection_id).await {
 167                log::error!("error signing out connection {:?} - {:?}", addr, err);
 168            }
 169        }
 170    }
 171
 172    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 173        self.peer.disconnect(connection_id).await;
 174        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 175
 176        for (project_id, project) in removed_connection.hosted_projects {
 177            if let Some(share) = project.share {
 178                broadcast(
 179                    connection_id,
 180                    share.guests.keys().copied().collect(),
 181                    |conn_id| {
 182                        self.peer
 183                            .send(conn_id, proto::UnshareProject { project_id })
 184                    },
 185                )
 186                .await?;
 187            }
 188        }
 189
 190        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 191            broadcast(connection_id, peer_ids, |conn_id| {
 192                self.peer.send(
 193                    conn_id,
 194                    proto::RemoveProjectCollaborator {
 195                        project_id,
 196                        peer_id: connection_id.0,
 197                    },
 198                )
 199            })
 200            .await?;
 201        }
 202
 203        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 204            .await?;
 205
 206        Ok(())
 207    }
 208
 209    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 210        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 211        Ok(())
 212    }
 213
 214    async fn register_project(
 215        mut self: Arc<Server>,
 216        request: TypedEnvelope<proto::RegisterProject>,
 217    ) -> tide::Result<()> {
 218        let project_id = {
 219            let mut state = self.state_mut();
 220            let user_id = state.user_id_for_connection(request.sender_id)?;
 221            state.register_project(request.sender_id, user_id)
 222        };
 223        self.peer
 224            .respond(
 225                request.receipt(),
 226                proto::RegisterProjectResponse { project_id },
 227            )
 228            .await?;
 229        Ok(())
 230    }
 231
 232    async fn unregister_project(
 233        mut self: Arc<Server>,
 234        request: TypedEnvelope<proto::UnregisterProject>,
 235    ) -> tide::Result<()> {
 236        let project = self
 237            .state_mut()
 238            .unregister_project(request.payload.project_id, request.sender_id)
 239            .ok_or_else(|| anyhow!("no such project"))?;
 240        self.update_contacts_for_users(project.authorized_user_ids().iter())
 241            .await?;
 242        Ok(())
 243    }
 244
 245    async fn share_project(
 246        mut self: Arc<Server>,
 247        request: TypedEnvelope<proto::ShareProject>,
 248    ) -> tide::Result<()> {
 249        self.state_mut()
 250            .share_project(request.payload.project_id, request.sender_id);
 251        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 252        Ok(())
 253    }
 254
 255    async fn unshare_project(
 256        mut self: Arc<Server>,
 257        request: TypedEnvelope<proto::UnshareProject>,
 258    ) -> tide::Result<()> {
 259        let project_id = request.payload.project_id;
 260        let project = self
 261            .state_mut()
 262            .unshare_project(project_id, request.sender_id)?;
 263
 264        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 265            self.peer
 266                .send(conn_id, proto::UnshareProject { project_id })
 267        })
 268        .await?;
 269        self.update_contacts_for_users(&project.authorized_user_ids)
 270            .await?;
 271
 272        Ok(())
 273    }
 274
 275    async fn join_project(
 276        mut self: Arc<Server>,
 277        request: TypedEnvelope<proto::JoinProject>,
 278    ) -> tide::Result<()> {
 279        let project_id = request.payload.project_id;
 280
 281        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 282        let response_data = self
 283            .state_mut()
 284            .join_project(request.sender_id, user_id, project_id)
 285            .and_then(|joined| {
 286                let share = joined.project.share()?;
 287                let peer_count = share.guests.len();
 288                let mut collaborators = Vec::with_capacity(peer_count);
 289                collaborators.push(proto::Collaborator {
 290                    peer_id: joined.project.host_connection_id.0,
 291                    replica_id: 0,
 292                    user_id: joined.project.host_user_id.to_proto(),
 293                });
 294                let worktrees = joined
 295                    .project
 296                    .worktrees
 297                    .iter()
 298                    .filter_map(|(id, worktree)| {
 299                        worktree.share.as_ref().map(|share| proto::Worktree {
 300                            id: *id,
 301                            root_name: worktree.root_name.clone(),
 302                            entries: share.entries.values().cloned().collect(),
 303                        })
 304                    })
 305                    .collect();
 306                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 307                    if *peer_conn_id != request.sender_id {
 308                        collaborators.push(proto::Collaborator {
 309                            peer_id: peer_conn_id.0,
 310                            replica_id: *peer_replica_id as u32,
 311                            user_id: peer_user_id.to_proto(),
 312                        });
 313                    }
 314                }
 315                let response = proto::JoinProjectResponse {
 316                    worktrees,
 317                    replica_id: joined.replica_id as u32,
 318                    collaborators,
 319                };
 320                let connection_ids = joined.project.connection_ids();
 321                let contact_user_ids = joined.project.authorized_user_ids();
 322                Ok((response, connection_ids, contact_user_ids))
 323            });
 324
 325        match response_data {
 326            Ok((response, connection_ids, contact_user_ids)) => {
 327                broadcast(request.sender_id, connection_ids, |conn_id| {
 328                    self.peer.send(
 329                        conn_id,
 330                        proto::AddProjectCollaborator {
 331                            project_id: project_id,
 332                            collaborator: Some(proto::Collaborator {
 333                                peer_id: request.sender_id.0,
 334                                replica_id: response.replica_id,
 335                                user_id: user_id.to_proto(),
 336                            }),
 337                        },
 338                    )
 339                })
 340                .await?;
 341                self.peer.respond(request.receipt(), response).await?;
 342                self.update_contacts_for_users(&contact_user_ids).await?;
 343            }
 344            Err(error) => {
 345                self.peer
 346                    .respond_with_error(
 347                        request.receipt(),
 348                        proto::Error {
 349                            message: error.to_string(),
 350                        },
 351                    )
 352                    .await?;
 353            }
 354        }
 355
 356        Ok(())
 357    }
 358
 359    async fn leave_project(
 360        mut self: Arc<Server>,
 361        request: TypedEnvelope<proto::LeaveProject>,
 362    ) -> tide::Result<()> {
 363        let sender_id = request.sender_id;
 364        let project_id = request.payload.project_id;
 365        let worktree = self.state_mut().leave_project(sender_id, project_id);
 366        if let Some(worktree) = worktree {
 367            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 368                self.peer.send(
 369                    conn_id,
 370                    proto::RemoveProjectCollaborator {
 371                        project_id,
 372                        peer_id: sender_id.0,
 373                    },
 374                )
 375            })
 376            .await?;
 377            self.update_contacts_for_users(&worktree.authorized_user_ids)
 378                .await?;
 379        }
 380        Ok(())
 381    }
 382
 383    async fn register_worktree(
 384        mut self: Arc<Server>,
 385        request: TypedEnvelope<proto::RegisterWorktree>,
 386    ) -> tide::Result<()> {
 387        let receipt = request.receipt();
 388        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 389
 390        let mut contact_user_ids = HashSet::default();
 391        contact_user_ids.insert(host_user_id);
 392        for github_login in request.payload.authorized_logins {
 393            match self.app_state.db.create_user(&github_login, false).await {
 394                Ok(contact_user_id) => {
 395                    contact_user_ids.insert(contact_user_id);
 396                }
 397                Err(err) => {
 398                    let message = err.to_string();
 399                    self.peer
 400                        .respond_with_error(receipt, proto::Error { message })
 401                        .await?;
 402                    return Ok(());
 403                }
 404            }
 405        }
 406
 407        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 408        let ok = self.state_mut().register_worktree(
 409            request.payload.project_id,
 410            request.payload.worktree_id,
 411            Worktree {
 412                authorized_user_ids: contact_user_ids.clone(),
 413                root_name: request.payload.root_name,
 414                share: None,
 415            },
 416        );
 417
 418        if ok {
 419            self.peer.respond(receipt, proto::Ack {}).await?;
 420            self.update_contacts_for_users(&contact_user_ids).await?;
 421        } else {
 422            self.peer
 423                .respond_with_error(
 424                    receipt,
 425                    proto::Error {
 426                        message: NO_SUCH_PROJECT.to_string(),
 427                    },
 428                )
 429                .await?;
 430        }
 431
 432        Ok(())
 433    }
 434
 435    async fn unregister_worktree(
 436        mut self: Arc<Server>,
 437        request: TypedEnvelope<proto::UnregisterWorktree>,
 438    ) -> tide::Result<()> {
 439        let project_id = request.payload.project_id;
 440        let worktree_id = request.payload.worktree_id;
 441        let (worktree, guest_connection_ids) =
 442            self.state_mut()
 443                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 444
 445        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 446            self.peer.send(
 447                conn_id,
 448                proto::UnregisterWorktree {
 449                    project_id,
 450                    worktree_id,
 451                },
 452            )
 453        })
 454        .await?;
 455        self.update_contacts_for_users(&worktree.authorized_user_ids)
 456            .await?;
 457        Ok(())
 458    }
 459
 460    async fn share_worktree(
 461        mut self: Arc<Server>,
 462        mut request: TypedEnvelope<proto::ShareWorktree>,
 463    ) -> tide::Result<()> {
 464        let worktree = request
 465            .payload
 466            .worktree
 467            .as_mut()
 468            .ok_or_else(|| anyhow!("missing worktree"))?;
 469        let entries = mem::take(&mut worktree.entries)
 470            .into_iter()
 471            .map(|entry| (entry.id, entry))
 472            .collect();
 473
 474        let contact_user_ids = self.state_mut().share_worktree(
 475            request.payload.project_id,
 476            worktree.id,
 477            request.sender_id,
 478            entries,
 479        );
 480        if let Some(contact_user_ids) = contact_user_ids {
 481            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 482            self.update_contacts_for_users(&contact_user_ids).await?;
 483        } else {
 484            self.peer
 485                .respond_with_error(
 486                    request.receipt(),
 487                    proto::Error {
 488                        message: "no such worktree".to_string(),
 489                    },
 490                )
 491                .await?;
 492        }
 493        Ok(())
 494    }
 495
 496    async fn update_worktree(
 497        mut self: Arc<Server>,
 498        request: TypedEnvelope<proto::UpdateWorktree>,
 499    ) -> tide::Result<()> {
 500        let connection_ids = self
 501            .state_mut()
 502            .update_worktree(
 503                request.sender_id,
 504                request.payload.project_id,
 505                request.payload.worktree_id,
 506                &request.payload.removed_entries,
 507                &request.payload.updated_entries,
 508            )
 509            .ok_or_else(|| anyhow!("no such worktree"))?;
 510
 511        broadcast(request.sender_id, connection_ids, |connection_id| {
 512            self.peer
 513                .forward_send(request.sender_id, connection_id, request.payload.clone())
 514        })
 515        .await?;
 516
 517        Ok(())
 518    }
 519
 520    async fn open_buffer(
 521        self: Arc<Server>,
 522        request: TypedEnvelope<proto::OpenBuffer>,
 523    ) -> tide::Result<()> {
 524        let receipt = request.receipt();
 525        let host_connection_id = self
 526            .state()
 527            .read_project(request.payload.project_id, request.sender_id)
 528            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 529            .host_connection_id;
 530        let response = self
 531            .peer
 532            .forward_request(request.sender_id, host_connection_id, request.payload)
 533            .await?;
 534        self.peer.respond(receipt, response).await?;
 535        Ok(())
 536    }
 537
 538    async fn close_buffer(
 539        self: Arc<Server>,
 540        request: TypedEnvelope<proto::CloseBuffer>,
 541    ) -> tide::Result<()> {
 542        let host_connection_id = self
 543            .state()
 544            .read_project(request.payload.project_id, request.sender_id)
 545            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 546            .host_connection_id;
 547        self.peer
 548            .forward_send(request.sender_id, host_connection_id, request.payload)
 549            .await?;
 550        Ok(())
 551    }
 552
 553    async fn save_buffer(
 554        self: Arc<Server>,
 555        request: TypedEnvelope<proto::SaveBuffer>,
 556    ) -> tide::Result<()> {
 557        let host;
 558        let guests;
 559        {
 560            let state = self.state();
 561            let project = state
 562                .read_project(request.payload.project_id, request.sender_id)
 563                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 564            host = project.host_connection_id;
 565            guests = project.guest_connection_ids()
 566        }
 567
 568        let sender = request.sender_id;
 569        let receipt = request.receipt();
 570        let response = self
 571            .peer
 572            .forward_request(sender, host, request.payload.clone())
 573            .await?;
 574
 575        broadcast(host, guests, |conn_id| {
 576            let response = response.clone();
 577            let peer = &self.peer;
 578            async move {
 579                if conn_id == sender {
 580                    peer.respond(receipt, response).await
 581                } else {
 582                    peer.forward_send(host, conn_id, response).await
 583                }
 584            }
 585        })
 586        .await?;
 587
 588        Ok(())
 589    }
 590
 591    async fn update_buffer(
 592        self: Arc<Server>,
 593        request: TypedEnvelope<proto::UpdateBuffer>,
 594    ) -> tide::Result<()> {
 595        let receiver_ids = self
 596            .state()
 597            .project_connection_ids(request.payload.project_id, request.sender_id)
 598            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 599        broadcast(request.sender_id, receiver_ids, |connection_id| {
 600            self.peer
 601                .forward_send(request.sender_id, connection_id, request.payload.clone())
 602        })
 603        .await?;
 604        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 605        Ok(())
 606    }
 607
 608    async fn buffer_saved(
 609        self: Arc<Server>,
 610        request: TypedEnvelope<proto::BufferSaved>,
 611    ) -> tide::Result<()> {
 612        let receiver_ids = self
 613            .state()
 614            .project_connection_ids(request.payload.project_id, request.sender_id)
 615            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 616        broadcast(request.sender_id, receiver_ids, |connection_id| {
 617            self.peer
 618                .forward_send(request.sender_id, connection_id, request.payload.clone())
 619        })
 620        .await?;
 621        Ok(())
 622    }
 623
 624    async fn get_channels(
 625        self: Arc<Server>,
 626        request: TypedEnvelope<proto::GetChannels>,
 627    ) -> tide::Result<()> {
 628        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 629        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 630        self.peer
 631            .respond(
 632                request.receipt(),
 633                proto::GetChannelsResponse {
 634                    channels: channels
 635                        .into_iter()
 636                        .map(|chan| proto::Channel {
 637                            id: chan.id.to_proto(),
 638                            name: chan.name,
 639                        })
 640                        .collect(),
 641                },
 642            )
 643            .await?;
 644        Ok(())
 645    }
 646
 647    async fn get_users(
 648        self: Arc<Server>,
 649        request: TypedEnvelope<proto::GetUsers>,
 650    ) -> tide::Result<()> {
 651        let receipt = request.receipt();
 652        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 653        let users = self
 654            .app_state
 655            .db
 656            .get_users_by_ids(user_ids)
 657            .await?
 658            .into_iter()
 659            .map(|user| proto::User {
 660                id: user.id.to_proto(),
 661                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 662                github_login: user.github_login,
 663            })
 664            .collect();
 665        self.peer
 666            .respond(receipt, proto::GetUsersResponse { users })
 667            .await?;
 668        Ok(())
 669    }
 670
 671    async fn update_contacts_for_users<'a>(
 672        self: &Arc<Server>,
 673        user_ids: impl IntoIterator<Item = &'a UserId>,
 674    ) -> tide::Result<()> {
 675        let mut send_futures = Vec::new();
 676
 677        {
 678            let state = self.state();
 679            for user_id in user_ids {
 680                let contacts = state.contacts_for_user(*user_id);
 681                for connection_id in state.connection_ids_for_user(*user_id) {
 682                    send_futures.push(self.peer.send(
 683                        connection_id,
 684                        proto::UpdateContacts {
 685                            contacts: contacts.clone(),
 686                        },
 687                    ));
 688                }
 689            }
 690        }
 691        futures::future::try_join_all(send_futures).await?;
 692
 693        Ok(())
 694    }
 695
 696    async fn join_channel(
 697        mut self: Arc<Self>,
 698        request: TypedEnvelope<proto::JoinChannel>,
 699    ) -> tide::Result<()> {
 700        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 701        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 702        if !self
 703            .app_state
 704            .db
 705            .can_user_access_channel(user_id, channel_id)
 706            .await?
 707        {
 708            Err(anyhow!("access denied"))?;
 709        }
 710
 711        self.state_mut().join_channel(request.sender_id, channel_id);
 712        let messages = self
 713            .app_state
 714            .db
 715            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 716            .await?
 717            .into_iter()
 718            .map(|msg| proto::ChannelMessage {
 719                id: msg.id.to_proto(),
 720                body: msg.body,
 721                timestamp: msg.sent_at.unix_timestamp() as u64,
 722                sender_id: msg.sender_id.to_proto(),
 723                nonce: Some(msg.nonce.as_u128().into()),
 724            })
 725            .collect::<Vec<_>>();
 726        self.peer
 727            .respond(
 728                request.receipt(),
 729                proto::JoinChannelResponse {
 730                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 731                    messages,
 732                },
 733            )
 734            .await?;
 735        Ok(())
 736    }
 737
 738    async fn leave_channel(
 739        mut self: Arc<Self>,
 740        request: TypedEnvelope<proto::LeaveChannel>,
 741    ) -> tide::Result<()> {
 742        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 743        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 744        if !self
 745            .app_state
 746            .db
 747            .can_user_access_channel(user_id, channel_id)
 748            .await?
 749        {
 750            Err(anyhow!("access denied"))?;
 751        }
 752
 753        self.state_mut()
 754            .leave_channel(request.sender_id, channel_id);
 755
 756        Ok(())
 757    }
 758
 759    async fn send_channel_message(
 760        self: Arc<Self>,
 761        request: TypedEnvelope<proto::SendChannelMessage>,
 762    ) -> tide::Result<()> {
 763        let receipt = request.receipt();
 764        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 765        let user_id;
 766        let connection_ids;
 767        {
 768            let state = self.state();
 769            user_id = state.user_id_for_connection(request.sender_id)?;
 770            if let Some(ids) = state.channel_connection_ids(channel_id) {
 771                connection_ids = ids;
 772            } else {
 773                return Ok(());
 774            }
 775        }
 776
 777        // Validate the message body.
 778        let body = request.payload.body.trim().to_string();
 779        if body.len() > MAX_MESSAGE_LEN {
 780            self.peer
 781                .respond_with_error(
 782                    receipt,
 783                    proto::Error {
 784                        message: "message is too long".to_string(),
 785                    },
 786                )
 787                .await?;
 788            return Ok(());
 789        }
 790        if body.is_empty() {
 791            self.peer
 792                .respond_with_error(
 793                    receipt,
 794                    proto::Error {
 795                        message: "message can't be blank".to_string(),
 796                    },
 797                )
 798                .await?;
 799            return Ok(());
 800        }
 801
 802        let timestamp = OffsetDateTime::now_utc();
 803        let nonce = if let Some(nonce) = request.payload.nonce {
 804            nonce
 805        } else {
 806            self.peer
 807                .respond_with_error(
 808                    receipt,
 809                    proto::Error {
 810                        message: "nonce can't be blank".to_string(),
 811                    },
 812                )
 813                .await?;
 814            return Ok(());
 815        };
 816
 817        let message_id = self
 818            .app_state
 819            .db
 820            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 821            .await?
 822            .to_proto();
 823        let message = proto::ChannelMessage {
 824            sender_id: user_id.to_proto(),
 825            id: message_id,
 826            body,
 827            timestamp: timestamp.unix_timestamp() as u64,
 828            nonce: Some(nonce),
 829        };
 830        broadcast(request.sender_id, connection_ids, |conn_id| {
 831            self.peer.send(
 832                conn_id,
 833                proto::ChannelMessageSent {
 834                    channel_id: channel_id.to_proto(),
 835                    message: Some(message.clone()),
 836                },
 837            )
 838        })
 839        .await?;
 840        self.peer
 841            .respond(
 842                receipt,
 843                proto::SendChannelMessageResponse {
 844                    message: Some(message),
 845                },
 846            )
 847            .await?;
 848        Ok(())
 849    }
 850
 851    async fn get_channel_messages(
 852        self: Arc<Self>,
 853        request: TypedEnvelope<proto::GetChannelMessages>,
 854    ) -> tide::Result<()> {
 855        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 856        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 857        if !self
 858            .app_state
 859            .db
 860            .can_user_access_channel(user_id, channel_id)
 861            .await?
 862        {
 863            Err(anyhow!("access denied"))?;
 864        }
 865
 866        let messages = self
 867            .app_state
 868            .db
 869            .get_channel_messages(
 870                channel_id,
 871                MESSAGE_COUNT_PER_PAGE,
 872                Some(MessageId::from_proto(request.payload.before_message_id)),
 873            )
 874            .await?
 875            .into_iter()
 876            .map(|msg| proto::ChannelMessage {
 877                id: msg.id.to_proto(),
 878                body: msg.body,
 879                timestamp: msg.sent_at.unix_timestamp() as u64,
 880                sender_id: msg.sender_id.to_proto(),
 881                nonce: Some(msg.nonce.as_u128().into()),
 882            })
 883            .collect::<Vec<_>>();
 884        self.peer
 885            .respond(
 886                request.receipt(),
 887                proto::GetChannelMessagesResponse {
 888                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 889                    messages,
 890                },
 891            )
 892            .await?;
 893        Ok(())
 894    }
 895
 896    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 897        self.store.read()
 898    }
 899
 900    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 901        self.store.write()
 902    }
 903}
 904
 905pub async fn broadcast<F, T>(
 906    sender_id: ConnectionId,
 907    receiver_ids: Vec<ConnectionId>,
 908    mut f: F,
 909) -> anyhow::Result<()>
 910where
 911    F: FnMut(ConnectionId) -> T,
 912    T: Future<Output = anyhow::Result<()>>,
 913{
 914    let futures = receiver_ids
 915        .into_iter()
 916        .filter(|id| *id != sender_id)
 917        .map(|id| f(id));
 918    futures::future::try_join_all(futures).await?;
 919    Ok(())
 920}
 921
 922pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 923    let server = Server::new(app.state().clone(), rpc.clone(), None);
 924    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 925        let server = server.clone();
 926        async move {
 927            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 928
 929            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 930            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 931            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 932            let client_protocol_version: Option<u32> = request
 933                .header("X-Zed-Protocol-Version")
 934                .and_then(|v| v.as_str().parse().ok());
 935
 936            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 937                return Ok(Response::new(StatusCode::UpgradeRequired));
 938            }
 939
 940            let header = match request.header("Sec-Websocket-Key") {
 941                Some(h) => h.as_str(),
 942                None => return Err(anyhow!("expected sec-websocket-key"))?,
 943            };
 944
 945            let user_id = process_auth_header(&request).await?;
 946
 947            let mut response = Response::new(StatusCode::SwitchingProtocols);
 948            response.insert_header(UPGRADE, "websocket");
 949            response.insert_header(CONNECTION, "Upgrade");
 950            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 951            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 952            response.insert_header("Sec-Websocket-Version", "13");
 953
 954            let http_res: &mut tide::http::Response = response.as_mut();
 955            let upgrade_receiver = http_res.recv_upgrade().await;
 956            let addr = request.remote().unwrap_or("unknown").to_string();
 957            task::spawn(async move {
 958                if let Some(stream) = upgrade_receiver.await {
 959                    server
 960                        .handle_connection(
 961                            Connection::new(
 962                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 963                            ),
 964                            addr,
 965                            user_id,
 966                            None,
 967                        )
 968                        .await;
 969                }
 970            });
 971
 972            Ok(response)
 973        }
 974    });
 975}
 976
 977fn header_contains_ignore_case<T>(
 978    request: &tide::Request<T>,
 979    header_name: HeaderName,
 980    value: &str,
 981) -> bool {
 982    request
 983        .header(header_name)
 984        .map(|h| {
 985            h.as_str()
 986                .split(',')
 987                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 988        })
 989        .unwrap_or(false)
 990}
 991
 992#[cfg(test)]
 993mod tests {
 994    use super::*;
 995    use crate::{
 996        auth,
 997        db::{tests::TestDb, UserId},
 998        github, AppState, Config,
 999    };
1000    use ::rpc::Peer;
1001    use async_std::task;
1002    use gpui::{ModelHandle, TestAppContext};
1003    use parking_lot::Mutex;
1004    use postage::{mpsc, watch};
1005    use rpc::PeerId;
1006    use serde_json::json;
1007    use sqlx::types::time::OffsetDateTime;
1008    use std::{
1009        ops::Deref,
1010        path::Path,
1011        sync::{
1012            atomic::{AtomicBool, Ordering::SeqCst},
1013            Arc,
1014        },
1015        time::Duration,
1016    };
1017    use zed::{
1018        client::{
1019            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1020            EstablishConnectionError, UserStore,
1021        },
1022        editor::{Editor, EditorSettings, Input, MultiBuffer},
1023        fs::{FakeFs, Fs as _},
1024        language::{
1025            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1026            LanguageRegistry, LanguageServerConfig, Point,
1027        },
1028        lsp,
1029        project::Project,
1030    };
1031
1032    #[gpui::test]
1033    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1034        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1035        let lang_registry = Arc::new(LanguageRegistry::new());
1036        let fs = Arc::new(FakeFs::new());
1037        cx_a.foreground().forbid_parking();
1038
1039        // Connect to a server as 2 clients.
1040        let mut server = TestServer::start().await;
1041        let client_a = server.create_client(&mut cx_a, "user_a").await;
1042        let client_b = server.create_client(&mut cx_b, "user_b").await;
1043
1044        // Share a project as client A
1045        fs.insert_tree(
1046            "/a",
1047            json!({
1048                ".zed.toml": r#"collaborators = ["user_b"]"#,
1049                "a.txt": "a-contents",
1050                "b.txt": "b-contents",
1051            }),
1052        )
1053        .await;
1054        let project_a = cx_a.update(|cx| {
1055            Project::local(
1056                client_a.clone(),
1057                client_a.user_store.clone(),
1058                lang_registry.clone(),
1059                fs.clone(),
1060                cx,
1061            )
1062        });
1063        let worktree_a = project_a
1064            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1065            .await
1066            .unwrap();
1067        worktree_a
1068            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1069            .await;
1070        let project_id = project_a
1071            .update(&mut cx_a, |project, _| project.next_remote_id())
1072            .await;
1073        project_a
1074            .update(&mut cx_a, |project, cx| project.share(cx))
1075            .await
1076            .unwrap();
1077
1078        // Join that project as client B
1079        let project_b = Project::remote(
1080            project_id,
1081            client_b.clone(),
1082            client_b.user_store.clone(),
1083            lang_registry.clone(),
1084            fs.clone(),
1085            &mut cx_b.to_async(),
1086        )
1087        .await
1088        .unwrap();
1089        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1090
1091        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1092            assert_eq!(
1093                project
1094                    .collaborators()
1095                    .get(&client_a.peer_id)
1096                    .unwrap()
1097                    .user
1098                    .github_login,
1099                "user_a"
1100            );
1101            project.replica_id()
1102        });
1103        project_a
1104            .condition(&cx_a, |tree, _| {
1105                tree.collaborators()
1106                    .get(&client_b.peer_id)
1107                    .map_or(false, |collaborator| {
1108                        collaborator.replica_id == replica_id_b
1109                            && collaborator.user.github_login == "user_b"
1110                    })
1111            })
1112            .await;
1113
1114        // Open the same file as client B and client A.
1115        let buffer_b = worktree_b
1116            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1117            .await
1118            .unwrap();
1119        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1120        buffer_b.read_with(&cx_b, |buf, cx| {
1121            assert_eq!(buf.read(cx).text(), "b-contents")
1122        });
1123        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1124        let buffer_a = worktree_a
1125            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1126            .await
1127            .unwrap();
1128
1129        let editor_b = cx_b.add_view(window_b, |cx| {
1130            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1131        });
1132        // TODO
1133        // // Create a selection set as client B and see that selection set as client A.
1134        // buffer_a
1135        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1136        //     .await;
1137
1138        // Edit the buffer as client B and see that edit as client A.
1139        editor_b.update(&mut cx_b, |editor, cx| {
1140            editor.handle_input(&Input("ok, ".into()), cx)
1141        });
1142        buffer_a
1143            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1144            .await;
1145
1146        // TODO
1147        // // Remove the selection set as client B, see those selections disappear as client A.
1148        cx_b.update(move |_| drop(editor_b));
1149        // buffer_a
1150        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1151        //     .await;
1152
1153        // Close the buffer as client A, see that the buffer is closed.
1154        cx_a.update(move |_| drop(buffer_a));
1155        worktree_a
1156            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1157            .await;
1158
1159        // Dropping the client B's project removes client B from client A's collaborators.
1160        cx_b.update(move |_| drop(project_b));
1161        project_a
1162            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1163            .await;
1164    }
1165
1166    #[gpui::test]
1167    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1168        let lang_registry = Arc::new(LanguageRegistry::new());
1169        let fs = Arc::new(FakeFs::new());
1170        cx_a.foreground().forbid_parking();
1171
1172        // Connect to a server as 2 clients.
1173        let mut server = TestServer::start().await;
1174        let client_a = server.create_client(&mut cx_a, "user_a").await;
1175        let client_b = server.create_client(&mut cx_b, "user_b").await;
1176
1177        // Share a project as client A
1178        fs.insert_tree(
1179            "/a",
1180            json!({
1181                ".zed.toml": r#"collaborators = ["user_b"]"#,
1182                "a.txt": "a-contents",
1183                "b.txt": "b-contents",
1184            }),
1185        )
1186        .await;
1187        let project_a = cx_a.update(|cx| {
1188            Project::local(
1189                client_a.clone(),
1190                client_a.user_store.clone(),
1191                lang_registry.clone(),
1192                fs.clone(),
1193                cx,
1194            )
1195        });
1196        let worktree_a = project_a
1197            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1198            .await
1199            .unwrap();
1200        worktree_a
1201            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1202            .await;
1203        let project_id = project_a
1204            .update(&mut cx_a, |project, _| project.next_remote_id())
1205            .await;
1206        project_a
1207            .update(&mut cx_a, |project, cx| project.share(cx))
1208            .await
1209            .unwrap();
1210
1211        // Join that project as client B
1212        let project_b = Project::remote(
1213            project_id,
1214            client_b.clone(),
1215            client_b.user_store.clone(),
1216            lang_registry.clone(),
1217            fs.clone(),
1218            &mut cx_b.to_async(),
1219        )
1220        .await
1221        .unwrap();
1222
1223        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1224        worktree_b
1225            .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1226            .await
1227            .unwrap();
1228
1229        project_a
1230            .update(&mut cx_a, |project, cx| project.unshare(cx))
1231            .await
1232            .unwrap();
1233        project_b
1234            .condition(&mut cx_b, |project, _| project.is_read_only())
1235            .await;
1236    }
1237
1238    #[gpui::test]
1239    async fn test_propagate_saves_and_fs_changes(
1240        mut cx_a: TestAppContext,
1241        mut cx_b: TestAppContext,
1242        mut cx_c: TestAppContext,
1243    ) {
1244        let lang_registry = Arc::new(LanguageRegistry::new());
1245        let fs = Arc::new(FakeFs::new());
1246        cx_a.foreground().forbid_parking();
1247
1248        // Connect to a server as 3 clients.
1249        let mut server = TestServer::start().await;
1250        let client_a = server.create_client(&mut cx_a, "user_a").await;
1251        let client_b = server.create_client(&mut cx_b, "user_b").await;
1252        let client_c = server.create_client(&mut cx_c, "user_c").await;
1253
1254        // Share a worktree as client A.
1255        fs.insert_tree(
1256            "/a",
1257            json!({
1258                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1259                "file1": "",
1260                "file2": ""
1261            }),
1262        )
1263        .await;
1264        let project_a = cx_a.update(|cx| {
1265            Project::local(
1266                client_a.clone(),
1267                client_a.user_store.clone(),
1268                lang_registry.clone(),
1269                fs.clone(),
1270                cx,
1271            )
1272        });
1273        let worktree_a = project_a
1274            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1275            .await
1276            .unwrap();
1277        worktree_a
1278            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1279            .await;
1280        let project_id = project_a
1281            .update(&mut cx_a, |project, _| project.next_remote_id())
1282            .await;
1283        project_a
1284            .update(&mut cx_a, |project, cx| project.share(cx))
1285            .await
1286            .unwrap();
1287
1288        // Join that worktree as clients B and C.
1289        let project_b = Project::remote(
1290            project_id,
1291            client_b.clone(),
1292            client_b.user_store.clone(),
1293            lang_registry.clone(),
1294            fs.clone(),
1295            &mut cx_b.to_async(),
1296        )
1297        .await
1298        .unwrap();
1299        let project_c = Project::remote(
1300            project_id,
1301            client_c.clone(),
1302            client_c.user_store.clone(),
1303            lang_registry.clone(),
1304            fs.clone(),
1305            &mut cx_c.to_async(),
1306        )
1307        .await
1308        .unwrap();
1309
1310        // Open and edit a buffer as both guests B and C.
1311        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1312        let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1313        let buffer_b = worktree_b
1314            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1315            .await
1316            .unwrap();
1317        let buffer_c = worktree_c
1318            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1319            .await
1320            .unwrap();
1321        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1322        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1323
1324        // Open and edit that buffer as the host.
1325        let buffer_a = worktree_a
1326            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1327            .await
1328            .unwrap();
1329
1330        buffer_a
1331            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1332            .await;
1333        buffer_a.update(&mut cx_a, |buf, cx| {
1334            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1335        });
1336
1337        // Wait for edits to propagate
1338        buffer_a
1339            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1340            .await;
1341        buffer_b
1342            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1343            .await;
1344        buffer_c
1345            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1346            .await;
1347
1348        // Edit the buffer as the host and concurrently save as guest B.
1349        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1350        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1351        save_b.await.unwrap();
1352        assert_eq!(
1353            fs.load("/a/file1".as_ref()).await.unwrap(),
1354            "hi-a, i-am-c, i-am-b, i-am-a"
1355        );
1356        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1357        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1358        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1359
1360        // Make changes on host's file system, see those changes on the guests.
1361        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1362            .await
1363            .unwrap();
1364        fs.insert_file(Path::new("/a/file4"), "4".into())
1365            .await
1366            .unwrap();
1367
1368        worktree_b
1369            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1370            .await;
1371        worktree_c
1372            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1373            .await;
1374        worktree_b.read_with(&cx_b, |tree, _| {
1375            assert_eq!(
1376                tree.paths()
1377                    .map(|p| p.to_string_lossy())
1378                    .collect::<Vec<_>>(),
1379                &[".zed.toml", "file1", "file3", "file4"]
1380            )
1381        });
1382        worktree_c.read_with(&cx_c, |tree, _| {
1383            assert_eq!(
1384                tree.paths()
1385                    .map(|p| p.to_string_lossy())
1386                    .collect::<Vec<_>>(),
1387                &[".zed.toml", "file1", "file3", "file4"]
1388            )
1389        });
1390    }
1391
1392    #[gpui::test]
1393    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1394        cx_a.foreground().forbid_parking();
1395        let lang_registry = Arc::new(LanguageRegistry::new());
1396        let fs = Arc::new(FakeFs::new());
1397
1398        // Connect to a server as 2 clients.
1399        let mut server = TestServer::start().await;
1400        let client_a = server.create_client(&mut cx_a, "user_a").await;
1401        let client_b = server.create_client(&mut cx_b, "user_b").await;
1402
1403        // Share a project as client A
1404        fs.insert_tree(
1405            "/dir",
1406            json!({
1407                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1408                "a.txt": "a-contents",
1409            }),
1410        )
1411        .await;
1412
1413        let project_a = cx_a.update(|cx| {
1414            Project::local(
1415                client_a.clone(),
1416                client_a.user_store.clone(),
1417                lang_registry.clone(),
1418                fs.clone(),
1419                cx,
1420            )
1421        });
1422        let worktree_a = project_a
1423            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1424            .await
1425            .unwrap();
1426        worktree_a
1427            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1428            .await;
1429        let project_id = project_a
1430            .update(&mut cx_a, |project, _| project.next_remote_id())
1431            .await;
1432        project_a
1433            .update(&mut cx_a, |project, cx| project.share(cx))
1434            .await
1435            .unwrap();
1436
1437        // Join that project as client B
1438        let project_b = Project::remote(
1439            project_id,
1440            client_b.clone(),
1441            client_b.user_store.clone(),
1442            lang_registry.clone(),
1443            fs.clone(),
1444            &mut cx_b.to_async(),
1445        )
1446        .await
1447        .unwrap();
1448        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1449
1450        // Open a buffer as client B
1451        let buffer_b = worktree_b
1452            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1453            .await
1454            .unwrap();
1455        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1456
1457        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1458        buffer_b.read_with(&cx_b, |buf, _| {
1459            assert!(buf.is_dirty());
1460            assert!(!buf.has_conflict());
1461        });
1462
1463        buffer_b
1464            .update(&mut cx_b, |buf, cx| buf.save(cx))
1465            .unwrap()
1466            .await
1467            .unwrap();
1468        worktree_b
1469            .condition(&cx_b, |_, cx| {
1470                buffer_b.read(cx).file().unwrap().mtime() != mtime
1471            })
1472            .await;
1473        buffer_b.read_with(&cx_b, |buf, _| {
1474            assert!(!buf.is_dirty());
1475            assert!(!buf.has_conflict());
1476        });
1477
1478        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1479        buffer_b.read_with(&cx_b, |buf, _| {
1480            assert!(buf.is_dirty());
1481            assert!(!buf.has_conflict());
1482        });
1483    }
1484
1485    #[gpui::test]
1486    async fn test_editing_while_guest_opens_buffer(
1487        mut cx_a: TestAppContext,
1488        mut cx_b: TestAppContext,
1489    ) {
1490        cx_a.foreground().forbid_parking();
1491        let lang_registry = Arc::new(LanguageRegistry::new());
1492        let fs = Arc::new(FakeFs::new());
1493
1494        // Connect to a server as 2 clients.
1495        let mut server = TestServer::start().await;
1496        let client_a = server.create_client(&mut cx_a, "user_a").await;
1497        let client_b = server.create_client(&mut cx_b, "user_b").await;
1498
1499        // Share a project as client A
1500        fs.insert_tree(
1501            "/dir",
1502            json!({
1503                ".zed.toml": r#"collaborators = ["user_b"]"#,
1504                "a.txt": "a-contents",
1505            }),
1506        )
1507        .await;
1508        let project_a = cx_a.update(|cx| {
1509            Project::local(
1510                client_a.clone(),
1511                client_a.user_store.clone(),
1512                lang_registry.clone(),
1513                fs.clone(),
1514                cx,
1515            )
1516        });
1517        let worktree_a = project_a
1518            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1519            .await
1520            .unwrap();
1521        worktree_a
1522            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1523            .await;
1524        let project_id = project_a
1525            .update(&mut cx_a, |project, _| project.next_remote_id())
1526            .await;
1527        project_a
1528            .update(&mut cx_a, |project, cx| project.share(cx))
1529            .await
1530            .unwrap();
1531
1532        // Join that project as client B
1533        let project_b = Project::remote(
1534            project_id,
1535            client_b.clone(),
1536            client_b.user_store.clone(),
1537            lang_registry.clone(),
1538            fs.clone(),
1539            &mut cx_b.to_async(),
1540        )
1541        .await
1542        .unwrap();
1543        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1544
1545        // Open a buffer as client A
1546        let buffer_a = worktree_a
1547            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1548            .await
1549            .unwrap();
1550
1551        // Start opening the same buffer as client B
1552        let buffer_b = cx_b
1553            .background()
1554            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1555        task::yield_now().await;
1556
1557        // Edit the buffer as client A while client B is still opening it.
1558        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1559
1560        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1561        let buffer_b = buffer_b.await.unwrap();
1562        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1563    }
1564
1565    #[gpui::test]
1566    async fn test_leaving_worktree_while_opening_buffer(
1567        mut cx_a: TestAppContext,
1568        mut cx_b: TestAppContext,
1569    ) {
1570        cx_a.foreground().forbid_parking();
1571        let lang_registry = Arc::new(LanguageRegistry::new());
1572        let fs = Arc::new(FakeFs::new());
1573
1574        // Connect to a server as 2 clients.
1575        let mut server = TestServer::start().await;
1576        let client_a = server.create_client(&mut cx_a, "user_a").await;
1577        let client_b = server.create_client(&mut cx_b, "user_b").await;
1578
1579        // Share a project as client A
1580        fs.insert_tree(
1581            "/dir",
1582            json!({
1583                ".zed.toml": r#"collaborators = ["user_b"]"#,
1584                "a.txt": "a-contents",
1585            }),
1586        )
1587        .await;
1588        let project_a = cx_a.update(|cx| {
1589            Project::local(
1590                client_a.clone(),
1591                client_a.user_store.clone(),
1592                lang_registry.clone(),
1593                fs.clone(),
1594                cx,
1595            )
1596        });
1597        let worktree_a = project_a
1598            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1599            .await
1600            .unwrap();
1601        worktree_a
1602            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1603            .await;
1604        let project_id = project_a
1605            .update(&mut cx_a, |project, _| project.next_remote_id())
1606            .await;
1607        project_a
1608            .update(&mut cx_a, |project, cx| project.share(cx))
1609            .await
1610            .unwrap();
1611
1612        // Join that project as client B
1613        let project_b = Project::remote(
1614            project_id,
1615            client_b.clone(),
1616            client_b.user_store.clone(),
1617            lang_registry.clone(),
1618            fs.clone(),
1619            &mut cx_b.to_async(),
1620        )
1621        .await
1622        .unwrap();
1623        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1624
1625        // See that a guest has joined as client A.
1626        project_a
1627            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1628            .await;
1629
1630        // Begin opening a buffer as client B, but leave the project before the open completes.
1631        let buffer_b = cx_b
1632            .background()
1633            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1634        cx_b.update(|_| drop(project_b));
1635        drop(buffer_b);
1636
1637        // See that the guest has left.
1638        project_a
1639            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1640            .await;
1641    }
1642
1643    #[gpui::test]
1644    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1645        cx_a.foreground().forbid_parking();
1646        let lang_registry = Arc::new(LanguageRegistry::new());
1647        let fs = Arc::new(FakeFs::new());
1648
1649        // Connect to a server as 2 clients.
1650        let mut server = TestServer::start().await;
1651        let client_a = server.create_client(&mut cx_a, "user_a").await;
1652        let client_b = server.create_client(&mut cx_b, "user_b").await;
1653
1654        // Share a project as client A
1655        fs.insert_tree(
1656            "/a",
1657            json!({
1658                ".zed.toml": r#"collaborators = ["user_b"]"#,
1659                "a.txt": "a-contents",
1660                "b.txt": "b-contents",
1661            }),
1662        )
1663        .await;
1664        let project_a = cx_a.update(|cx| {
1665            Project::local(
1666                client_a.clone(),
1667                client_a.user_store.clone(),
1668                lang_registry.clone(),
1669                fs.clone(),
1670                cx,
1671            )
1672        });
1673        let worktree_a = project_a
1674            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1675            .await
1676            .unwrap();
1677        worktree_a
1678            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1679            .await;
1680        let project_id = project_a
1681            .update(&mut cx_a, |project, _| project.next_remote_id())
1682            .await;
1683        project_a
1684            .update(&mut cx_a, |project, cx| project.share(cx))
1685            .await
1686            .unwrap();
1687
1688        // Join that project as client B
1689        let _project_b = Project::remote(
1690            project_id,
1691            client_b.clone(),
1692            client_b.user_store.clone(),
1693            lang_registry.clone(),
1694            fs.clone(),
1695            &mut cx_b.to_async(),
1696        )
1697        .await
1698        .unwrap();
1699
1700        // See that a guest has joined as client A.
1701        project_a
1702            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1703            .await;
1704
1705        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1706        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1707        project_a
1708            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1709            .await;
1710    }
1711
1712    #[gpui::test]
1713    async fn test_collaborating_with_diagnostics(
1714        mut cx_a: TestAppContext,
1715        mut cx_b: TestAppContext,
1716    ) {
1717        cx_a.foreground().forbid_parking();
1718        let mut lang_registry = Arc::new(LanguageRegistry::new());
1719        let fs = Arc::new(FakeFs::new());
1720
1721        // Set up a fake language server.
1722        let (language_server_config, mut fake_language_server) =
1723            LanguageServerConfig::fake(cx_a.background()).await;
1724        Arc::get_mut(&mut lang_registry)
1725            .unwrap()
1726            .add(Arc::new(Language::new(
1727                LanguageConfig {
1728                    name: "Rust".to_string(),
1729                    path_suffixes: vec!["rs".to_string()],
1730                    language_server: Some(language_server_config),
1731                    ..Default::default()
1732                },
1733                Some(tree_sitter_rust::language()),
1734            )));
1735
1736        // Connect to a server as 2 clients.
1737        let mut server = TestServer::start().await;
1738        let client_a = server.create_client(&mut cx_a, "user_a").await;
1739        let client_b = server.create_client(&mut cx_b, "user_b").await;
1740
1741        // Share a project as client A
1742        fs.insert_tree(
1743            "/a",
1744            json!({
1745                ".zed.toml": r#"collaborators = ["user_b"]"#,
1746                "a.rs": "let one = two",
1747                "other.rs": "",
1748            }),
1749        )
1750        .await;
1751        let project_a = cx_a.update(|cx| {
1752            Project::local(
1753                client_a.clone(),
1754                client_a.user_store.clone(),
1755                lang_registry.clone(),
1756                fs.clone(),
1757                cx,
1758            )
1759        });
1760        let worktree_a = project_a
1761            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1762            .await
1763            .unwrap();
1764        worktree_a
1765            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1766            .await;
1767        let project_id = project_a
1768            .update(&mut cx_a, |project, _| project.next_remote_id())
1769            .await;
1770        project_a
1771            .update(&mut cx_a, |project, cx| project.share(cx))
1772            .await
1773            .unwrap();
1774
1775        // Cause the language server to start.
1776        let _ = cx_a
1777            .background()
1778            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1779                worktree.open_buffer("other.rs", cx)
1780            }))
1781            .await
1782            .unwrap();
1783
1784        // Simulate a language server reporting errors for a file.
1785        fake_language_server
1786            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1787                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1788                version: None,
1789                diagnostics: vec![
1790                    lsp::Diagnostic {
1791                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1792                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1793                        message: "message 1".to_string(),
1794                        ..Default::default()
1795                    },
1796                    lsp::Diagnostic {
1797                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1798                        range: lsp::Range::new(
1799                            lsp::Position::new(0, 10),
1800                            lsp::Position::new(0, 13),
1801                        ),
1802                        message: "message 2".to_string(),
1803                        ..Default::default()
1804                    },
1805                ],
1806            })
1807            .await;
1808
1809        // Join the worktree as client B.
1810        let project_b = Project::remote(
1811            project_id,
1812            client_b.clone(),
1813            client_b.user_store.clone(),
1814            lang_registry.clone(),
1815            fs.clone(),
1816            &mut cx_b.to_async(),
1817        )
1818        .await
1819        .unwrap();
1820        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1821
1822        // Open the file with the errors.
1823        let buffer_b = cx_b
1824            .background()
1825            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1826            .await
1827            .unwrap();
1828
1829        buffer_b.read_with(&cx_b, |buffer, _| {
1830            assert_eq!(
1831                buffer
1832                    .snapshot()
1833                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1834                    .map(|(_, entry)| entry)
1835                    .collect::<Vec<_>>(),
1836                &[
1837                    DiagnosticEntry {
1838                        range: Point::new(0, 4)..Point::new(0, 7),
1839                        diagnostic: Diagnostic {
1840                            group_id: 0,
1841                            message: "message 1".to_string(),
1842                            severity: lsp::DiagnosticSeverity::ERROR,
1843                            is_primary: true,
1844                            ..Default::default()
1845                        }
1846                    },
1847                    DiagnosticEntry {
1848                        range: Point::new(0, 10)..Point::new(0, 13),
1849                        diagnostic: Diagnostic {
1850                            group_id: 1,
1851                            severity: lsp::DiagnosticSeverity::WARNING,
1852                            message: "message 2".to_string(),
1853                            is_primary: true,
1854                            ..Default::default()
1855                        }
1856                    }
1857                ]
1858            );
1859        });
1860    }
1861
1862    #[gpui::test]
1863    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1864        cx_a.foreground().forbid_parking();
1865
1866        // Connect to a server as 2 clients.
1867        let mut server = TestServer::start().await;
1868        let client_a = server.create_client(&mut cx_a, "user_a").await;
1869        let client_b = server.create_client(&mut cx_b, "user_b").await;
1870
1871        // Create an org that includes these 2 users.
1872        let db = &server.app_state.db;
1873        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1874        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1875            .await
1876            .unwrap();
1877        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1878            .await
1879            .unwrap();
1880
1881        // Create a channel that includes all the users.
1882        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1883        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1884            .await
1885            .unwrap();
1886        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1887            .await
1888            .unwrap();
1889        db.create_channel_message(
1890            channel_id,
1891            client_b.current_user_id(&cx_b),
1892            "hello A, it's B.",
1893            OffsetDateTime::now_utc(),
1894            1,
1895        )
1896        .await
1897        .unwrap();
1898
1899        let channels_a = cx_a
1900            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1901        channels_a
1902            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1903            .await;
1904        channels_a.read_with(&cx_a, |list, _| {
1905            assert_eq!(
1906                list.available_channels().unwrap(),
1907                &[ChannelDetails {
1908                    id: channel_id.to_proto(),
1909                    name: "test-channel".to_string()
1910                }]
1911            )
1912        });
1913        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1914            this.get_channel(channel_id.to_proto(), cx).unwrap()
1915        });
1916        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1917        channel_a
1918            .condition(&cx_a, |channel, _| {
1919                channel_messages(channel)
1920                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1921            })
1922            .await;
1923
1924        let channels_b = cx_b
1925            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1926        channels_b
1927            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1928            .await;
1929        channels_b.read_with(&cx_b, |list, _| {
1930            assert_eq!(
1931                list.available_channels().unwrap(),
1932                &[ChannelDetails {
1933                    id: channel_id.to_proto(),
1934                    name: "test-channel".to_string()
1935                }]
1936            )
1937        });
1938
1939        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1940            this.get_channel(channel_id.to_proto(), cx).unwrap()
1941        });
1942        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1943        channel_b
1944            .condition(&cx_b, |channel, _| {
1945                channel_messages(channel)
1946                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1947            })
1948            .await;
1949
1950        channel_a
1951            .update(&mut cx_a, |channel, cx| {
1952                channel
1953                    .send_message("oh, hi B.".to_string(), cx)
1954                    .unwrap()
1955                    .detach();
1956                let task = channel.send_message("sup".to_string(), cx).unwrap();
1957                assert_eq!(
1958                    channel_messages(channel),
1959                    &[
1960                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1961                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1962                        ("user_a".to_string(), "sup".to_string(), true)
1963                    ]
1964                );
1965                task
1966            })
1967            .await
1968            .unwrap();
1969
1970        channel_b
1971            .condition(&cx_b, |channel, _| {
1972                channel_messages(channel)
1973                    == [
1974                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1975                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1976                        ("user_a".to_string(), "sup".to_string(), false),
1977                    ]
1978            })
1979            .await;
1980
1981        assert_eq!(
1982            server
1983                .state()
1984                .await
1985                .channel(channel_id)
1986                .unwrap()
1987                .connection_ids
1988                .len(),
1989            2
1990        );
1991        cx_b.update(|_| drop(channel_b));
1992        server
1993            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1994            .await;
1995
1996        cx_a.update(|_| drop(channel_a));
1997        server
1998            .condition(|state| state.channel(channel_id).is_none())
1999            .await;
2000    }
2001
2002    #[gpui::test]
2003    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2004        cx_a.foreground().forbid_parking();
2005
2006        let mut server = TestServer::start().await;
2007        let client_a = server.create_client(&mut cx_a, "user_a").await;
2008
2009        let db = &server.app_state.db;
2010        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2011        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2012        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2013            .await
2014            .unwrap();
2015        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2016            .await
2017            .unwrap();
2018
2019        let channels_a = cx_a
2020            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2021        channels_a
2022            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2023            .await;
2024        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2025            this.get_channel(channel_id.to_proto(), cx).unwrap()
2026        });
2027
2028        // Messages aren't allowed to be too long.
2029        channel_a
2030            .update(&mut cx_a, |channel, cx| {
2031                let long_body = "this is long.\n".repeat(1024);
2032                channel.send_message(long_body, cx).unwrap()
2033            })
2034            .await
2035            .unwrap_err();
2036
2037        // Messages aren't allowed to be blank.
2038        channel_a.update(&mut cx_a, |channel, cx| {
2039            channel.send_message(String::new(), cx).unwrap_err()
2040        });
2041
2042        // Leading and trailing whitespace are trimmed.
2043        channel_a
2044            .update(&mut cx_a, |channel, cx| {
2045                channel
2046                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2047                    .unwrap()
2048            })
2049            .await
2050            .unwrap();
2051        assert_eq!(
2052            db.get_channel_messages(channel_id, 10, None)
2053                .await
2054                .unwrap()
2055                .iter()
2056                .map(|m| &m.body)
2057                .collect::<Vec<_>>(),
2058            &["surrounded by whitespace"]
2059        );
2060    }
2061
2062    #[gpui::test]
2063    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2064        cx_a.foreground().forbid_parking();
2065
2066        // Connect to a server as 2 clients.
2067        let mut server = TestServer::start().await;
2068        let client_a = server.create_client(&mut cx_a, "user_a").await;
2069        let client_b = server.create_client(&mut cx_b, "user_b").await;
2070        let mut status_b = client_b.status();
2071
2072        // Create an org that includes these 2 users.
2073        let db = &server.app_state.db;
2074        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2075        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2076            .await
2077            .unwrap();
2078        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2079            .await
2080            .unwrap();
2081
2082        // Create a channel that includes all the users.
2083        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2084        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2085            .await
2086            .unwrap();
2087        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2088            .await
2089            .unwrap();
2090        db.create_channel_message(
2091            channel_id,
2092            client_b.current_user_id(&cx_b),
2093            "hello A, it's B.",
2094            OffsetDateTime::now_utc(),
2095            2,
2096        )
2097        .await
2098        .unwrap();
2099
2100        let channels_a = cx_a
2101            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2102        channels_a
2103            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2104            .await;
2105
2106        channels_a.read_with(&cx_a, |list, _| {
2107            assert_eq!(
2108                list.available_channels().unwrap(),
2109                &[ChannelDetails {
2110                    id: channel_id.to_proto(),
2111                    name: "test-channel".to_string()
2112                }]
2113            )
2114        });
2115        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2116            this.get_channel(channel_id.to_proto(), cx).unwrap()
2117        });
2118        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2119        channel_a
2120            .condition(&cx_a, |channel, _| {
2121                channel_messages(channel)
2122                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2123            })
2124            .await;
2125
2126        let channels_b = cx_b
2127            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2128        channels_b
2129            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2130            .await;
2131        channels_b.read_with(&cx_b, |list, _| {
2132            assert_eq!(
2133                list.available_channels().unwrap(),
2134                &[ChannelDetails {
2135                    id: channel_id.to_proto(),
2136                    name: "test-channel".to_string()
2137                }]
2138            )
2139        });
2140
2141        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2142            this.get_channel(channel_id.to_proto(), cx).unwrap()
2143        });
2144        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2145        channel_b
2146            .condition(&cx_b, |channel, _| {
2147                channel_messages(channel)
2148                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2149            })
2150            .await;
2151
2152        // Disconnect client B, ensuring we can still access its cached channel data.
2153        server.forbid_connections();
2154        server.disconnect_client(client_b.current_user_id(&cx_b));
2155        while !matches!(
2156            status_b.recv().await,
2157            Some(client::Status::ReconnectionError { .. })
2158        ) {}
2159
2160        channels_b.read_with(&cx_b, |channels, _| {
2161            assert_eq!(
2162                channels.available_channels().unwrap(),
2163                [ChannelDetails {
2164                    id: channel_id.to_proto(),
2165                    name: "test-channel".to_string()
2166                }]
2167            )
2168        });
2169        channel_b.read_with(&cx_b, |channel, _| {
2170            assert_eq!(
2171                channel_messages(channel),
2172                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2173            )
2174        });
2175
2176        // Send a message from client B while it is disconnected.
2177        channel_b
2178            .update(&mut cx_b, |channel, cx| {
2179                let task = channel
2180                    .send_message("can you see this?".to_string(), cx)
2181                    .unwrap();
2182                assert_eq!(
2183                    channel_messages(channel),
2184                    &[
2185                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2186                        ("user_b".to_string(), "can you see this?".to_string(), true)
2187                    ]
2188                );
2189                task
2190            })
2191            .await
2192            .unwrap_err();
2193
2194        // Send a message from client A while B is disconnected.
2195        channel_a
2196            .update(&mut cx_a, |channel, cx| {
2197                channel
2198                    .send_message("oh, hi B.".to_string(), cx)
2199                    .unwrap()
2200                    .detach();
2201                let task = channel.send_message("sup".to_string(), cx).unwrap();
2202                assert_eq!(
2203                    channel_messages(channel),
2204                    &[
2205                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2206                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2207                        ("user_a".to_string(), "sup".to_string(), true)
2208                    ]
2209                );
2210                task
2211            })
2212            .await
2213            .unwrap();
2214
2215        // Give client B a chance to reconnect.
2216        server.allow_connections();
2217        cx_b.foreground().advance_clock(Duration::from_secs(10));
2218
2219        // Verify that B sees the new messages upon reconnection, as well as the message client B
2220        // sent while offline.
2221        channel_b
2222            .condition(&cx_b, |channel, _| {
2223                channel_messages(channel)
2224                    == [
2225                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2226                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2227                        ("user_a".to_string(), "sup".to_string(), false),
2228                        ("user_b".to_string(), "can you see this?".to_string(), false),
2229                    ]
2230            })
2231            .await;
2232
2233        // Ensure client A and B can communicate normally after reconnection.
2234        channel_a
2235            .update(&mut cx_a, |channel, cx| {
2236                channel.send_message("you online?".to_string(), cx).unwrap()
2237            })
2238            .await
2239            .unwrap();
2240        channel_b
2241            .condition(&cx_b, |channel, _| {
2242                channel_messages(channel)
2243                    == [
2244                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2245                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2246                        ("user_a".to_string(), "sup".to_string(), false),
2247                        ("user_b".to_string(), "can you see this?".to_string(), false),
2248                        ("user_a".to_string(), "you online?".to_string(), false),
2249                    ]
2250            })
2251            .await;
2252
2253        channel_b
2254            .update(&mut cx_b, |channel, cx| {
2255                channel.send_message("yep".to_string(), cx).unwrap()
2256            })
2257            .await
2258            .unwrap();
2259        channel_a
2260            .condition(&cx_a, |channel, _| {
2261                channel_messages(channel)
2262                    == [
2263                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2264                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2265                        ("user_a".to_string(), "sup".to_string(), false),
2266                        ("user_b".to_string(), "can you see this?".to_string(), false),
2267                        ("user_a".to_string(), "you online?".to_string(), false),
2268                        ("user_b".to_string(), "yep".to_string(), false),
2269                    ]
2270            })
2271            .await;
2272    }
2273
2274    #[gpui::test]
2275    async fn test_contacts(
2276        mut cx_a: TestAppContext,
2277        mut cx_b: TestAppContext,
2278        mut cx_c: TestAppContext,
2279    ) {
2280        cx_a.foreground().forbid_parking();
2281        let lang_registry = Arc::new(LanguageRegistry::new());
2282        let fs = Arc::new(FakeFs::new());
2283
2284        // Connect to a server as 3 clients.
2285        let mut server = TestServer::start().await;
2286        let client_a = server.create_client(&mut cx_a, "user_a").await;
2287        let client_b = server.create_client(&mut cx_b, "user_b").await;
2288        let client_c = server.create_client(&mut cx_c, "user_c").await;
2289
2290        // Share a worktree as client A.
2291        fs.insert_tree(
2292            "/a",
2293            json!({
2294                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2295            }),
2296        )
2297        .await;
2298
2299        let project_a = cx_a.update(|cx| {
2300            Project::local(
2301                client_a.clone(),
2302                client_a.user_store.clone(),
2303                lang_registry.clone(),
2304                fs.clone(),
2305                cx,
2306            )
2307        });
2308        let worktree_a = project_a
2309            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2310            .await
2311            .unwrap();
2312        worktree_a
2313            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2314            .await;
2315
2316        client_a
2317            .user_store
2318            .condition(&cx_a, |user_store, _| {
2319                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2320            })
2321            .await;
2322        client_b
2323            .user_store
2324            .condition(&cx_b, |user_store, _| {
2325                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2326            })
2327            .await;
2328        client_c
2329            .user_store
2330            .condition(&cx_c, |user_store, _| {
2331                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2332            })
2333            .await;
2334
2335        let project_id = project_a
2336            .update(&mut cx_a, |project, _| project.next_remote_id())
2337            .await;
2338        project_a
2339            .update(&mut cx_a, |project, cx| project.share(cx))
2340            .await
2341            .unwrap();
2342
2343        let _project_b = Project::remote(
2344            project_id,
2345            client_b.clone(),
2346            client_b.user_store.clone(),
2347            lang_registry.clone(),
2348            fs.clone(),
2349            &mut cx_b.to_async(),
2350        )
2351        .await
2352        .unwrap();
2353
2354        client_a
2355            .user_store
2356            .condition(&cx_a, |user_store, _| {
2357                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2358            })
2359            .await;
2360        client_b
2361            .user_store
2362            .condition(&cx_b, |user_store, _| {
2363                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2364            })
2365            .await;
2366        client_c
2367            .user_store
2368            .condition(&cx_c, |user_store, _| {
2369                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2370            })
2371            .await;
2372
2373        project_a
2374            .condition(&cx_a, |project, _| {
2375                project.collaborators().contains_key(&client_b.peer_id)
2376            })
2377            .await;
2378
2379        cx_a.update(move |_| drop(project_a));
2380        client_a
2381            .user_store
2382            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2383            .await;
2384        client_b
2385            .user_store
2386            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2387            .await;
2388        client_c
2389            .user_store
2390            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2391            .await;
2392
2393        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2394            user_store
2395                .contacts()
2396                .iter()
2397                .map(|contact| {
2398                    let worktrees = contact
2399                        .projects
2400                        .iter()
2401                        .map(|p| {
2402                            (
2403                                p.worktree_root_names[0].as_str(),
2404                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2405                            )
2406                        })
2407                        .collect();
2408                    (contact.user.github_login.as_str(), worktrees)
2409                })
2410                .collect()
2411        }
2412    }
2413
2414    struct TestServer {
2415        peer: Arc<Peer>,
2416        app_state: Arc<AppState>,
2417        server: Arc<Server>,
2418        notifications: mpsc::Receiver<()>,
2419        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2420        forbid_connections: Arc<AtomicBool>,
2421        _test_db: TestDb,
2422    }
2423
2424    impl TestServer {
2425        async fn start() -> Self {
2426            let test_db = TestDb::new();
2427            let app_state = Self::build_app_state(&test_db).await;
2428            let peer = Peer::new();
2429            let notifications = mpsc::channel(128);
2430            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2431            Self {
2432                peer,
2433                app_state,
2434                server,
2435                notifications: notifications.1,
2436                connection_killers: Default::default(),
2437                forbid_connections: Default::default(),
2438                _test_db: test_db,
2439            }
2440        }
2441
2442        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2443            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2444            let client_name = name.to_string();
2445            let mut client = Client::new();
2446            let server = self.server.clone();
2447            let connection_killers = self.connection_killers.clone();
2448            let forbid_connections = self.forbid_connections.clone();
2449            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2450
2451            Arc::get_mut(&mut client)
2452                .unwrap()
2453                .override_authenticate(move |cx| {
2454                    cx.spawn(|_| async move {
2455                        let access_token = "the-token".to_string();
2456                        Ok(Credentials {
2457                            user_id: user_id.0 as u64,
2458                            access_token,
2459                        })
2460                    })
2461                })
2462                .override_establish_connection(move |credentials, cx| {
2463                    assert_eq!(credentials.user_id, user_id.0 as u64);
2464                    assert_eq!(credentials.access_token, "the-token");
2465
2466                    let server = server.clone();
2467                    let connection_killers = connection_killers.clone();
2468                    let forbid_connections = forbid_connections.clone();
2469                    let client_name = client_name.clone();
2470                    let connection_id_tx = connection_id_tx.clone();
2471                    cx.spawn(move |cx| async move {
2472                        if forbid_connections.load(SeqCst) {
2473                            Err(EstablishConnectionError::other(anyhow!(
2474                                "server is forbidding connections"
2475                            )))
2476                        } else {
2477                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2478                            connection_killers.lock().insert(user_id, kill_conn);
2479                            cx.background()
2480                                .spawn(server.handle_connection(
2481                                    server_conn,
2482                                    client_name,
2483                                    user_id,
2484                                    Some(connection_id_tx),
2485                                ))
2486                                .detach();
2487                            Ok(client_conn)
2488                        }
2489                    })
2490                });
2491
2492            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2493            client
2494                .authenticate_and_connect(&cx.to_async())
2495                .await
2496                .unwrap();
2497
2498            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2499            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2500            let mut authed_user =
2501                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2502            while authed_user.recv().await.unwrap().is_none() {}
2503
2504            TestClient {
2505                client,
2506                peer_id,
2507                user_store,
2508            }
2509        }
2510
2511        fn disconnect_client(&self, user_id: UserId) {
2512            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2513                let _ = kill_conn.try_send(Some(()));
2514            }
2515        }
2516
2517        fn forbid_connections(&self) {
2518            self.forbid_connections.store(true, SeqCst);
2519        }
2520
2521        fn allow_connections(&self) {
2522            self.forbid_connections.store(false, SeqCst);
2523        }
2524
2525        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2526            let mut config = Config::default();
2527            config.session_secret = "a".repeat(32);
2528            config.database_url = test_db.url.clone();
2529            let github_client = github::AppClient::test();
2530            Arc::new(AppState {
2531                db: test_db.db().clone(),
2532                handlebars: Default::default(),
2533                auth_client: auth::build_client("", ""),
2534                repo_client: github::RepoClient::test(&github_client),
2535                github_client,
2536                config,
2537            })
2538        }
2539
2540        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2541            self.server.store.read()
2542        }
2543
2544        async fn condition<F>(&mut self, mut predicate: F)
2545        where
2546            F: FnMut(&Store) -> bool,
2547        {
2548            async_std::future::timeout(Duration::from_millis(500), async {
2549                while !(predicate)(&*self.server.store.read()) {
2550                    self.notifications.recv().await;
2551                }
2552            })
2553            .await
2554            .expect("condition timed out");
2555        }
2556    }
2557
2558    impl Drop for TestServer {
2559        fn drop(&mut self) {
2560            task::block_on(self.peer.reset());
2561        }
2562    }
2563
2564    struct TestClient {
2565        client: Arc<Client>,
2566        pub peer_id: PeerId,
2567        pub user_store: ModelHandle<UserStore>,
2568    }
2569
2570    impl Deref for TestClient {
2571        type Target = Arc<Client>;
2572
2573        fn deref(&self) -> &Self::Target {
2574            &self.client
2575        }
2576    }
2577
2578    impl TestClient {
2579        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2580            UserId::from_proto(
2581                self.user_store
2582                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2583            )
2584        }
2585    }
2586
2587    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2588        channel
2589            .messages()
2590            .cursor::<()>()
2591            .map(|m| {
2592                (
2593                    m.sender.github_login.clone(),
2594                    m.body.clone(),
2595                    m.is_pending(),
2596                )
2597            })
2598            .collect()
2599    }
2600
2601    struct EmptyView;
2602
2603    impl gpui::Entity for EmptyView {
2604        type Event = ();
2605    }
2606
2607    impl gpui::View for EmptyView {
2608        fn ui_name() -> &'static str {
2609            "empty view"
2610        }
2611
2612        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2613            gpui::Element::boxed(gpui::elements::Empty)
2614        }
2615    }
2616}