rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::open_buffer)
  75            .add_handler(Server::close_buffer)
  76            .add_handler(Server::update_buffer)
  77            .add_handler(Server::buffer_saved)
  78            .add_handler(Server::save_buffer)
  79            .add_handler(Server::get_channels)
  80            .add_handler(Server::get_users)
  81            .add_handler(Server::join_channel)
  82            .add_handler(Server::leave_channel)
  83            .add_handler(Server::send_channel_message)
  84            .add_handler(Server::get_channel_messages);
  85
  86        Arc::new(server)
  87    }
  88
  89    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  90    where
  91        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  92        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  93        M: EnvelopedMessage,
  94    {
  95        let prev_handler = self.handlers.insert(
  96            TypeId::of::<M>(),
  97            Box::new(move |server, envelope| {
  98                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
  99                (handler)(server, *envelope).boxed()
 100            }),
 101        );
 102        if prev_handler.is_some() {
 103            panic!("registered a handler for the same message twice");
 104        }
 105        self
 106    }
 107
 108    pub fn handle_connection(
 109        self: &Arc<Self>,
 110        connection: Connection,
 111        addr: String,
 112        user_id: UserId,
 113        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 114    ) -> impl Future<Output = ()> {
 115        let mut this = self.clone();
 116        async move {
 117            let (connection_id, handle_io, mut incoming_rx) =
 118                this.peer.add_connection(connection).await;
 119
 120            if let Some(send_connection_id) = send_connection_id.as_mut() {
 121                let _ = send_connection_id.send(connection_id).await;
 122            }
 123
 124            this.state_mut().add_connection(connection_id, user_id);
 125            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 126                log::error!("error updating contacts for {:?}: {}", user_id, err);
 127            }
 128
 129            let handle_io = handle_io.fuse();
 130            futures::pin_mut!(handle_io);
 131            loop {
 132                let next_message = incoming_rx.recv().fuse();
 133                futures::pin_mut!(next_message);
 134                futures::select_biased! {
 135                    message = next_message => {
 136                        if let Some(message) = message {
 137                            let start_time = Instant::now();
 138                            log::info!("RPC message received: {}", message.payload_type_name());
 139                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 140                                if let Err(err) = (handler)(this.clone(), message).await {
 141                                    log::error!("error handling message: {:?}", err);
 142                                } else {
 143                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 144                                }
 145
 146                                if let Some(mut notifications) = this.notifications.clone() {
 147                                    let _ = notifications.send(()).await;
 148                                }
 149                            } else {
 150                                log::warn!("unhandled message: {}", message.payload_type_name());
 151                            }
 152                        } else {
 153                            log::info!("rpc connection closed {:?}", addr);
 154                            break;
 155                        }
 156                    }
 157                    handle_io = handle_io => {
 158                        if let Err(err) = handle_io {
 159                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 160                        }
 161                        break;
 162                    }
 163                }
 164            }
 165
 166            if let Err(err) = this.sign_out(connection_id).await {
 167                log::error!("error signing out connection {:?} - {:?}", addr, err);
 168            }
 169        }
 170    }
 171
 172    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 173        self.peer.disconnect(connection_id).await;
 174        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 175
 176        for (project_id, project) in removed_connection.hosted_projects {
 177            if let Some(share) = project.share {
 178                broadcast(
 179                    connection_id,
 180                    share.guests.keys().copied().collect(),
 181                    |conn_id| {
 182                        self.peer
 183                            .send(conn_id, proto::UnshareProject { project_id })
 184                    },
 185                )
 186                .await?;
 187            }
 188        }
 189
 190        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 191            broadcast(connection_id, peer_ids, |conn_id| {
 192                self.peer.send(
 193                    conn_id,
 194                    proto::RemoveProjectCollaborator {
 195                        project_id,
 196                        peer_id: connection_id.0,
 197                    },
 198                )
 199            })
 200            .await?;
 201        }
 202
 203        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 204            .await?;
 205
 206        Ok(())
 207    }
 208
 209    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 210        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 211        Ok(())
 212    }
 213
 214    async fn register_project(
 215        mut self: Arc<Server>,
 216        request: TypedEnvelope<proto::RegisterProject>,
 217    ) -> tide::Result<()> {
 218        let project_id = {
 219            let mut state = self.state_mut();
 220            let user_id = state.user_id_for_connection(request.sender_id)?;
 221            state.register_project(request.sender_id, user_id)
 222        };
 223        self.peer
 224            .respond(
 225                request.receipt(),
 226                proto::RegisterProjectResponse { project_id },
 227            )
 228            .await?;
 229        Ok(())
 230    }
 231
 232    async fn unregister_project(
 233        mut self: Arc<Server>,
 234        request: TypedEnvelope<proto::UnregisterProject>,
 235    ) -> tide::Result<()> {
 236        let project = self
 237            .state_mut()
 238            .unregister_project(request.payload.project_id, request.sender_id)
 239            .ok_or_else(|| anyhow!("no such project"))?;
 240        self.update_contacts_for_users(project.authorized_user_ids().iter())
 241            .await?;
 242        Ok(())
 243    }
 244
 245    async fn share_project(
 246        mut self: Arc<Server>,
 247        request: TypedEnvelope<proto::ShareProject>,
 248    ) -> tide::Result<()> {
 249        self.state_mut()
 250            .share_project(request.payload.project_id, request.sender_id);
 251        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 252        Ok(())
 253    }
 254
 255    async fn unshare_project(
 256        mut self: Arc<Server>,
 257        request: TypedEnvelope<proto::UnshareProject>,
 258    ) -> tide::Result<()> {
 259        let project_id = request.payload.project_id;
 260        let project = self
 261            .state_mut()
 262            .unshare_project(project_id, request.sender_id)?;
 263
 264        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 265            self.peer
 266                .send(conn_id, proto::UnshareProject { project_id })
 267        })
 268        .await?;
 269        self.update_contacts_for_users(&project.authorized_user_ids)
 270            .await?;
 271
 272        Ok(())
 273    }
 274
 275    async fn join_project(
 276        mut self: Arc<Server>,
 277        request: TypedEnvelope<proto::JoinProject>,
 278    ) -> tide::Result<()> {
 279        let project_id = request.payload.project_id;
 280
 281        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 282        let response_data = self
 283            .state_mut()
 284            .join_project(request.sender_id, user_id, project_id)
 285            .and_then(|joined| {
 286                let share = joined.project.share()?;
 287                let peer_count = share.guests.len();
 288                let mut collaborators = Vec::with_capacity(peer_count);
 289                collaborators.push(proto::Collaborator {
 290                    peer_id: joined.project.host_connection_id.0,
 291                    replica_id: 0,
 292                    user_id: joined.project.host_user_id.to_proto(),
 293                });
 294                let worktrees = joined
 295                    .project
 296                    .worktrees
 297                    .iter()
 298                    .filter_map(|(id, worktree)| {
 299                        worktree.share.as_ref().map(|share| proto::Worktree {
 300                            id: *id,
 301                            root_name: worktree.root_name.clone(),
 302                            entries: share.entries.values().cloned().collect(),
 303                        })
 304                    })
 305                    .collect();
 306                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 307                    if *peer_conn_id != request.sender_id {
 308                        collaborators.push(proto::Collaborator {
 309                            peer_id: peer_conn_id.0,
 310                            replica_id: *peer_replica_id as u32,
 311                            user_id: peer_user_id.to_proto(),
 312                        });
 313                    }
 314                }
 315                let response = proto::JoinProjectResponse {
 316                    worktrees,
 317                    replica_id: joined.replica_id as u32,
 318                    collaborators,
 319                };
 320                let connection_ids = joined.project.connection_ids();
 321                let contact_user_ids = joined.project.authorized_user_ids();
 322                Ok((response, connection_ids, contact_user_ids))
 323            });
 324
 325        match response_data {
 326            Ok((response, connection_ids, contact_user_ids)) => {
 327                broadcast(request.sender_id, connection_ids, |conn_id| {
 328                    self.peer.send(
 329                        conn_id,
 330                        proto::AddProjectCollaborator {
 331                            project_id: project_id,
 332                            collaborator: Some(proto::Collaborator {
 333                                peer_id: request.sender_id.0,
 334                                replica_id: response.replica_id,
 335                                user_id: user_id.to_proto(),
 336                            }),
 337                        },
 338                    )
 339                })
 340                .await?;
 341                self.peer.respond(request.receipt(), response).await?;
 342                self.update_contacts_for_users(&contact_user_ids).await?;
 343            }
 344            Err(error) => {
 345                self.peer
 346                    .respond_with_error(
 347                        request.receipt(),
 348                        proto::Error {
 349                            message: error.to_string(),
 350                        },
 351                    )
 352                    .await?;
 353            }
 354        }
 355
 356        Ok(())
 357    }
 358
 359    async fn leave_project(
 360        mut self: Arc<Server>,
 361        request: TypedEnvelope<proto::LeaveProject>,
 362    ) -> tide::Result<()> {
 363        let sender_id = request.sender_id;
 364        let project_id = request.payload.project_id;
 365        let worktree = self.state_mut().leave_project(sender_id, project_id);
 366        if let Some(worktree) = worktree {
 367            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 368                self.peer.send(
 369                    conn_id,
 370                    proto::RemoveProjectCollaborator {
 371                        project_id,
 372                        peer_id: sender_id.0,
 373                    },
 374                )
 375            })
 376            .await?;
 377            self.update_contacts_for_users(&worktree.authorized_user_ids)
 378                .await?;
 379        }
 380        Ok(())
 381    }
 382
 383    async fn register_worktree(
 384        mut self: Arc<Server>,
 385        request: TypedEnvelope<proto::RegisterWorktree>,
 386    ) -> tide::Result<()> {
 387        let receipt = request.receipt();
 388        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 389
 390        let mut contact_user_ids = HashSet::default();
 391        contact_user_ids.insert(host_user_id);
 392        for github_login in request.payload.authorized_logins {
 393            match self.app_state.db.create_user(&github_login, false).await {
 394                Ok(contact_user_id) => {
 395                    contact_user_ids.insert(contact_user_id);
 396                }
 397                Err(err) => {
 398                    let message = err.to_string();
 399                    self.peer
 400                        .respond_with_error(receipt, proto::Error { message })
 401                        .await?;
 402                    return Ok(());
 403                }
 404            }
 405        }
 406
 407        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 408        let ok = self.state_mut().register_worktree(
 409            request.payload.project_id,
 410            request.payload.worktree_id,
 411            Worktree {
 412                authorized_user_ids: contact_user_ids.clone(),
 413                root_name: request.payload.root_name,
 414                share: None,
 415            },
 416        );
 417
 418        if ok {
 419            self.peer.respond(receipt, proto::Ack {}).await?;
 420            self.update_contacts_for_users(&contact_user_ids).await?;
 421        } else {
 422            self.peer
 423                .respond_with_error(
 424                    receipt,
 425                    proto::Error {
 426                        message: NO_SUCH_PROJECT.to_string(),
 427                    },
 428                )
 429                .await?;
 430        }
 431
 432        Ok(())
 433    }
 434
 435    async fn unregister_worktree(
 436        mut self: Arc<Server>,
 437        request: TypedEnvelope<proto::UnregisterWorktree>,
 438    ) -> tide::Result<()> {
 439        let project_id = request.payload.project_id;
 440        let worktree_id = request.payload.worktree_id;
 441        let (worktree, guest_connection_ids) =
 442            self.state_mut()
 443                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 444
 445        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 446            self.peer.send(
 447                conn_id,
 448                proto::UnregisterWorktree {
 449                    project_id,
 450                    worktree_id,
 451                },
 452            )
 453        })
 454        .await?;
 455        self.update_contacts_for_users(&worktree.authorized_user_ids)
 456            .await?;
 457        Ok(())
 458    }
 459
 460    async fn share_worktree(
 461        mut self: Arc<Server>,
 462        mut request: TypedEnvelope<proto::ShareWorktree>,
 463    ) -> tide::Result<()> {
 464        let worktree = request
 465            .payload
 466            .worktree
 467            .as_mut()
 468            .ok_or_else(|| anyhow!("missing worktree"))?;
 469        let entries = mem::take(&mut worktree.entries)
 470            .into_iter()
 471            .map(|entry| (entry.id, entry))
 472            .collect();
 473
 474        let contact_user_ids = self.state_mut().share_worktree(
 475            request.payload.project_id,
 476            worktree.id,
 477            request.sender_id,
 478            entries,
 479        );
 480        if let Some(contact_user_ids) = contact_user_ids {
 481            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 482            self.update_contacts_for_users(&contact_user_ids).await?;
 483        } else {
 484            self.peer
 485                .respond_with_error(
 486                    request.receipt(),
 487                    proto::Error {
 488                        message: "no such worktree".to_string(),
 489                    },
 490                )
 491                .await?;
 492        }
 493        Ok(())
 494    }
 495
 496    async fn update_worktree(
 497        mut self: Arc<Server>,
 498        request: TypedEnvelope<proto::UpdateWorktree>,
 499    ) -> tide::Result<()> {
 500        let connection_ids = self
 501            .state_mut()
 502            .update_worktree(
 503                request.sender_id,
 504                request.payload.project_id,
 505                request.payload.worktree_id,
 506                &request.payload.removed_entries,
 507                &request.payload.updated_entries,
 508            )
 509            .ok_or_else(|| anyhow!("no such worktree"))?;
 510
 511        broadcast(request.sender_id, connection_ids, |connection_id| {
 512            self.peer
 513                .forward_send(request.sender_id, connection_id, request.payload.clone())
 514        })
 515        .await?;
 516
 517        Ok(())
 518    }
 519
 520    async fn open_buffer(
 521        self: Arc<Server>,
 522        request: TypedEnvelope<proto::OpenBuffer>,
 523    ) -> tide::Result<()> {
 524        let receipt = request.receipt();
 525        let host_connection_id = self
 526            .state()
 527            .read_project(request.payload.project_id, request.sender_id)
 528            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 529            .host_connection_id;
 530        let response = self
 531            .peer
 532            .forward_request(request.sender_id, host_connection_id, request.payload)
 533            .await?;
 534        self.peer.respond(receipt, response).await?;
 535        Ok(())
 536    }
 537
 538    async fn close_buffer(
 539        self: Arc<Server>,
 540        request: TypedEnvelope<proto::CloseBuffer>,
 541    ) -> tide::Result<()> {
 542        let host_connection_id = self
 543            .state()
 544            .read_project(request.payload.project_id, request.sender_id)
 545            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 546            .host_connection_id;
 547        self.peer
 548            .forward_send(request.sender_id, host_connection_id, request.payload)
 549            .await?;
 550        Ok(())
 551    }
 552
 553    async fn save_buffer(
 554        self: Arc<Server>,
 555        request: TypedEnvelope<proto::SaveBuffer>,
 556    ) -> tide::Result<()> {
 557        let host;
 558        let guests;
 559        {
 560            let state = self.state();
 561            let project = state
 562                .read_project(request.payload.project_id, request.sender_id)
 563                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 564            host = project.host_connection_id;
 565            guests = project.guest_connection_ids()
 566        }
 567
 568        let sender = request.sender_id;
 569        let receipt = request.receipt();
 570        let response = self
 571            .peer
 572            .forward_request(sender, host, request.payload.clone())
 573            .await?;
 574
 575        broadcast(host, guests, |conn_id| {
 576            let response = response.clone();
 577            let peer = &self.peer;
 578            async move {
 579                if conn_id == sender {
 580                    peer.respond(receipt, response).await
 581                } else {
 582                    peer.forward_send(host, conn_id, response).await
 583                }
 584            }
 585        })
 586        .await?;
 587
 588        Ok(())
 589    }
 590
 591    async fn update_buffer(
 592        self: Arc<Server>,
 593        request: TypedEnvelope<proto::UpdateBuffer>,
 594    ) -> tide::Result<()> {
 595        let receiver_ids = self
 596            .state()
 597            .project_connection_ids(request.payload.project_id, request.sender_id)
 598            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 599        broadcast(request.sender_id, receiver_ids, |connection_id| {
 600            self.peer
 601                .forward_send(request.sender_id, connection_id, request.payload.clone())
 602        })
 603        .await?;
 604        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 605        Ok(())
 606    }
 607
 608    async fn buffer_saved(
 609        self: Arc<Server>,
 610        request: TypedEnvelope<proto::BufferSaved>,
 611    ) -> tide::Result<()> {
 612        let receiver_ids = self
 613            .state()
 614            .project_connection_ids(request.payload.project_id, request.sender_id)
 615            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 616        broadcast(request.sender_id, receiver_ids, |connection_id| {
 617            self.peer
 618                .forward_send(request.sender_id, connection_id, request.payload.clone())
 619        })
 620        .await?;
 621        Ok(())
 622    }
 623
 624    async fn get_channels(
 625        self: Arc<Server>,
 626        request: TypedEnvelope<proto::GetChannels>,
 627    ) -> tide::Result<()> {
 628        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 629        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 630        self.peer
 631            .respond(
 632                request.receipt(),
 633                proto::GetChannelsResponse {
 634                    channels: channels
 635                        .into_iter()
 636                        .map(|chan| proto::Channel {
 637                            id: chan.id.to_proto(),
 638                            name: chan.name,
 639                        })
 640                        .collect(),
 641                },
 642            )
 643            .await?;
 644        Ok(())
 645    }
 646
 647    async fn get_users(
 648        self: Arc<Server>,
 649        request: TypedEnvelope<proto::GetUsers>,
 650    ) -> tide::Result<()> {
 651        let receipt = request.receipt();
 652        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 653        let users = self
 654            .app_state
 655            .db
 656            .get_users_by_ids(user_ids)
 657            .await?
 658            .into_iter()
 659            .map(|user| proto::User {
 660                id: user.id.to_proto(),
 661                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 662                github_login: user.github_login,
 663            })
 664            .collect();
 665        self.peer
 666            .respond(receipt, proto::GetUsersResponse { users })
 667            .await?;
 668        Ok(())
 669    }
 670
 671    async fn update_contacts_for_users<'a>(
 672        self: &Arc<Server>,
 673        user_ids: impl IntoIterator<Item = &'a UserId>,
 674    ) -> tide::Result<()> {
 675        let mut send_futures = Vec::new();
 676
 677        {
 678            let state = self.state();
 679            for user_id in user_ids {
 680                let contacts = state.contacts_for_user(*user_id);
 681                for connection_id in state.connection_ids_for_user(*user_id) {
 682                    send_futures.push(self.peer.send(
 683                        connection_id,
 684                        proto::UpdateContacts {
 685                            contacts: contacts.clone(),
 686                        },
 687                    ));
 688                }
 689            }
 690        }
 691        futures::future::try_join_all(send_futures).await?;
 692
 693        Ok(())
 694    }
 695
 696    async fn join_channel(
 697        mut self: Arc<Self>,
 698        request: TypedEnvelope<proto::JoinChannel>,
 699    ) -> tide::Result<()> {
 700        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 701        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 702        if !self
 703            .app_state
 704            .db
 705            .can_user_access_channel(user_id, channel_id)
 706            .await?
 707        {
 708            Err(anyhow!("access denied"))?;
 709        }
 710
 711        self.state_mut().join_channel(request.sender_id, channel_id);
 712        let messages = self
 713            .app_state
 714            .db
 715            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 716            .await?
 717            .into_iter()
 718            .map(|msg| proto::ChannelMessage {
 719                id: msg.id.to_proto(),
 720                body: msg.body,
 721                timestamp: msg.sent_at.unix_timestamp() as u64,
 722                sender_id: msg.sender_id.to_proto(),
 723                nonce: Some(msg.nonce.as_u128().into()),
 724            })
 725            .collect::<Vec<_>>();
 726        self.peer
 727            .respond(
 728                request.receipt(),
 729                proto::JoinChannelResponse {
 730                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 731                    messages,
 732                },
 733            )
 734            .await?;
 735        Ok(())
 736    }
 737
 738    async fn leave_channel(
 739        mut self: Arc<Self>,
 740        request: TypedEnvelope<proto::LeaveChannel>,
 741    ) -> tide::Result<()> {
 742        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 743        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 744        if !self
 745            .app_state
 746            .db
 747            .can_user_access_channel(user_id, channel_id)
 748            .await?
 749        {
 750            Err(anyhow!("access denied"))?;
 751        }
 752
 753        self.state_mut()
 754            .leave_channel(request.sender_id, channel_id);
 755
 756        Ok(())
 757    }
 758
 759    async fn send_channel_message(
 760        self: Arc<Self>,
 761        request: TypedEnvelope<proto::SendChannelMessage>,
 762    ) -> tide::Result<()> {
 763        let receipt = request.receipt();
 764        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 765        let user_id;
 766        let connection_ids;
 767        {
 768            let state = self.state();
 769            user_id = state.user_id_for_connection(request.sender_id)?;
 770            if let Some(ids) = state.channel_connection_ids(channel_id) {
 771                connection_ids = ids;
 772            } else {
 773                return Ok(());
 774            }
 775        }
 776
 777        // Validate the message body.
 778        let body = request.payload.body.trim().to_string();
 779        if body.len() > MAX_MESSAGE_LEN {
 780            self.peer
 781                .respond_with_error(
 782                    receipt,
 783                    proto::Error {
 784                        message: "message is too long".to_string(),
 785                    },
 786                )
 787                .await?;
 788            return Ok(());
 789        }
 790        if body.is_empty() {
 791            self.peer
 792                .respond_with_error(
 793                    receipt,
 794                    proto::Error {
 795                        message: "message can't be blank".to_string(),
 796                    },
 797                )
 798                .await?;
 799            return Ok(());
 800        }
 801
 802        let timestamp = OffsetDateTime::now_utc();
 803        let nonce = if let Some(nonce) = request.payload.nonce {
 804            nonce
 805        } else {
 806            self.peer
 807                .respond_with_error(
 808                    receipt,
 809                    proto::Error {
 810                        message: "nonce can't be blank".to_string(),
 811                    },
 812                )
 813                .await?;
 814            return Ok(());
 815        };
 816
 817        let message_id = self
 818            .app_state
 819            .db
 820            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 821            .await?
 822            .to_proto();
 823        let message = proto::ChannelMessage {
 824            sender_id: user_id.to_proto(),
 825            id: message_id,
 826            body,
 827            timestamp: timestamp.unix_timestamp() as u64,
 828            nonce: Some(nonce),
 829        };
 830        broadcast(request.sender_id, connection_ids, |conn_id| {
 831            self.peer.send(
 832                conn_id,
 833                proto::ChannelMessageSent {
 834                    channel_id: channel_id.to_proto(),
 835                    message: Some(message.clone()),
 836                },
 837            )
 838        })
 839        .await?;
 840        self.peer
 841            .respond(
 842                receipt,
 843                proto::SendChannelMessageResponse {
 844                    message: Some(message),
 845                },
 846            )
 847            .await?;
 848        Ok(())
 849    }
 850
 851    async fn get_channel_messages(
 852        self: Arc<Self>,
 853        request: TypedEnvelope<proto::GetChannelMessages>,
 854    ) -> tide::Result<()> {
 855        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 856        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 857        if !self
 858            .app_state
 859            .db
 860            .can_user_access_channel(user_id, channel_id)
 861            .await?
 862        {
 863            Err(anyhow!("access denied"))?;
 864        }
 865
 866        let messages = self
 867            .app_state
 868            .db
 869            .get_channel_messages(
 870                channel_id,
 871                MESSAGE_COUNT_PER_PAGE,
 872                Some(MessageId::from_proto(request.payload.before_message_id)),
 873            )
 874            .await?
 875            .into_iter()
 876            .map(|msg| proto::ChannelMessage {
 877                id: msg.id.to_proto(),
 878                body: msg.body,
 879                timestamp: msg.sent_at.unix_timestamp() as u64,
 880                sender_id: msg.sender_id.to_proto(),
 881                nonce: Some(msg.nonce.as_u128().into()),
 882            })
 883            .collect::<Vec<_>>();
 884        self.peer
 885            .respond(
 886                request.receipt(),
 887                proto::GetChannelMessagesResponse {
 888                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 889                    messages,
 890                },
 891            )
 892            .await?;
 893        Ok(())
 894    }
 895
 896    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 897        self.store.read()
 898    }
 899
 900    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 901        self.store.write()
 902    }
 903}
 904
 905pub async fn broadcast<F, T>(
 906    sender_id: ConnectionId,
 907    receiver_ids: Vec<ConnectionId>,
 908    mut f: F,
 909) -> anyhow::Result<()>
 910where
 911    F: FnMut(ConnectionId) -> T,
 912    T: Future<Output = anyhow::Result<()>>,
 913{
 914    let futures = receiver_ids
 915        .into_iter()
 916        .filter(|id| *id != sender_id)
 917        .map(|id| f(id));
 918    futures::future::try_join_all(futures).await?;
 919    Ok(())
 920}
 921
 922pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 923    let server = Server::new(app.state().clone(), rpc.clone(), None);
 924    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 925        let server = server.clone();
 926        async move {
 927            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 928
 929            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 930            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 931            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 932            let client_protocol_version: Option<u32> = request
 933                .header("X-Zed-Protocol-Version")
 934                .and_then(|v| v.as_str().parse().ok());
 935
 936            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 937                return Ok(Response::new(StatusCode::UpgradeRequired));
 938            }
 939
 940            let header = match request.header("Sec-Websocket-Key") {
 941                Some(h) => h.as_str(),
 942                None => return Err(anyhow!("expected sec-websocket-key"))?,
 943            };
 944
 945            let user_id = process_auth_header(&request).await?;
 946
 947            let mut response = Response::new(StatusCode::SwitchingProtocols);
 948            response.insert_header(UPGRADE, "websocket");
 949            response.insert_header(CONNECTION, "Upgrade");
 950            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 951            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 952            response.insert_header("Sec-Websocket-Version", "13");
 953
 954            let http_res: &mut tide::http::Response = response.as_mut();
 955            let upgrade_receiver = http_res.recv_upgrade().await;
 956            let addr = request.remote().unwrap_or("unknown").to_string();
 957            task::spawn(async move {
 958                if let Some(stream) = upgrade_receiver.await {
 959                    server
 960                        .handle_connection(
 961                            Connection::new(
 962                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 963                            ),
 964                            addr,
 965                            user_id,
 966                            None,
 967                        )
 968                        .await;
 969                }
 970            });
 971
 972            Ok(response)
 973        }
 974    });
 975}
 976
 977fn header_contains_ignore_case<T>(
 978    request: &tide::Request<T>,
 979    header_name: HeaderName,
 980    value: &str,
 981) -> bool {
 982    request
 983        .header(header_name)
 984        .map(|h| {
 985            h.as_str()
 986                .split(',')
 987                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 988        })
 989        .unwrap_or(false)
 990}
 991
 992#[cfg(test)]
 993mod tests {
 994    use super::*;
 995    use crate::{
 996        auth,
 997        db::{tests::TestDb, UserId},
 998        github, AppState, Config,
 999    };
1000    use ::rpc::Peer;
1001    use async_std::task;
1002    use gpui::{ModelHandle, TestAppContext};
1003    use parking_lot::Mutex;
1004    use postage::{mpsc, watch};
1005    use rpc::PeerId;
1006    use serde_json::json;
1007    use sqlx::types::time::OffsetDateTime;
1008    use std::{
1009        ops::Deref,
1010        path::Path,
1011        sync::{
1012            atomic::{AtomicBool, Ordering::SeqCst},
1013            Arc,
1014        },
1015        time::Duration,
1016    };
1017    use zed::{
1018        client::{
1019            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1020            EstablishConnectionError, UserStore,
1021        },
1022        editor::{Editor, EditorSettings, Input, MultiBuffer},
1023        fs::{FakeFs, Fs as _},
1024        language::{
1025            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1026            LanguageRegistry, LanguageServerConfig, Point,
1027        },
1028        lsp,
1029        project::Project,
1030    };
1031
1032    #[gpui::test]
1033    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1034        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1035        let lang_registry = Arc::new(LanguageRegistry::new());
1036        let fs = Arc::new(FakeFs::new());
1037        cx_a.foreground().forbid_parking();
1038
1039        // Connect to a server as 2 clients.
1040        let mut server = TestServer::start().await;
1041        let client_a = server.create_client(&mut cx_a, "user_a").await;
1042        let client_b = server.create_client(&mut cx_b, "user_b").await;
1043
1044        // Share a project as client A
1045        fs.insert_tree(
1046            "/a",
1047            json!({
1048                ".zed.toml": r#"collaborators = ["user_b"]"#,
1049                "a.txt": "a-contents",
1050                "b.txt": "b-contents",
1051            }),
1052        )
1053        .await;
1054        let project_a = cx_a.update(|cx| {
1055            Project::local(
1056                client_a.clone(),
1057                client_a.user_store.clone(),
1058                lang_registry.clone(),
1059                fs.clone(),
1060                cx,
1061            )
1062        });
1063        let worktree_a = project_a
1064            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1065            .await
1066            .unwrap();
1067        worktree_a
1068            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1069            .await;
1070        let project_id = project_a
1071            .update(&mut cx_a, |project, _| project.next_remote_id())
1072            .await;
1073        project_a
1074            .update(&mut cx_a, |project, cx| project.share(cx))
1075            .await
1076            .unwrap();
1077
1078        // Join that project as client B
1079        let project_b = Project::remote(
1080            project_id,
1081            client_b.clone(),
1082            client_b.user_store.clone(),
1083            lang_registry.clone(),
1084            fs.clone(),
1085            &mut cx_b.to_async(),
1086        )
1087        .await
1088        .unwrap();
1089        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1090
1091        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1092            assert_eq!(
1093                project
1094                    .collaborators()
1095                    .get(&client_a.peer_id)
1096                    .unwrap()
1097                    .user
1098                    .github_login,
1099                "user_a"
1100            );
1101            project.replica_id()
1102        });
1103        project_a
1104            .condition(&cx_a, |tree, _| {
1105                tree.collaborators()
1106                    .get(&client_b.peer_id)
1107                    .map_or(false, |collaborator| {
1108                        collaborator.replica_id == replica_id_b
1109                            && collaborator.user.github_login == "user_b"
1110                    })
1111            })
1112            .await;
1113
1114        // Open the same file as client B and client A.
1115        let buffer_b = worktree_b
1116            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1117            .await
1118            .unwrap();
1119        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1120        buffer_b.read_with(&cx_b, |buf, cx| {
1121            assert_eq!(buf.read(cx).text(), "b-contents")
1122        });
1123        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1124        let buffer_a = worktree_a
1125            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1126            .await
1127            .unwrap();
1128
1129        let editor_b = cx_b.add_view(window_b, |cx| {
1130            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1131        });
1132        // TODO
1133        // // Create a selection set as client B and see that selection set as client A.
1134        // buffer_a
1135        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1136        //     .await;
1137
1138        // Edit the buffer as client B and see that edit as client A.
1139        editor_b.update(&mut cx_b, |editor, cx| {
1140            editor.handle_input(&Input("ok, ".into()), cx)
1141        });
1142        buffer_a
1143            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1144            .await;
1145
1146        // TODO
1147        // // Remove the selection set as client B, see those selections disappear as client A.
1148        cx_b.update(move |_| drop(editor_b));
1149        // buffer_a
1150        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1151        //     .await;
1152
1153        // Close the buffer as client A, see that the buffer is closed.
1154        cx_a.update(move |_| drop(buffer_a));
1155        worktree_a
1156            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1157            .await;
1158
1159        // Dropping the client B's project removes client B from client A's collaborators.
1160        cx_b.update(move |_| drop(project_b));
1161        project_a
1162            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1163            .await;
1164    }
1165
1166    #[gpui::test]
1167    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1168        let lang_registry = Arc::new(LanguageRegistry::new());
1169        let fs = Arc::new(FakeFs::new());
1170        cx_a.foreground().forbid_parking();
1171
1172        // Connect to a server as 2 clients.
1173        let mut server = TestServer::start().await;
1174        let client_a = server.create_client(&mut cx_a, "user_a").await;
1175        let client_b = server.create_client(&mut cx_b, "user_b").await;
1176
1177        // Share a project as client A
1178        fs.insert_tree(
1179            "/a",
1180            json!({
1181                ".zed.toml": r#"collaborators = ["user_b"]"#,
1182                "a.txt": "a-contents",
1183                "b.txt": "b-contents",
1184            }),
1185        )
1186        .await;
1187        let project_a = cx_a.update(|cx| {
1188            Project::local(
1189                client_a.clone(),
1190                client_a.user_store.clone(),
1191                lang_registry.clone(),
1192                fs.clone(),
1193                cx,
1194            )
1195        });
1196        let worktree_a = project_a
1197            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1198            .await
1199            .unwrap();
1200        worktree_a
1201            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1202            .await;
1203        let project_id = project_a
1204            .update(&mut cx_a, |project, _| project.next_remote_id())
1205            .await;
1206        project_a
1207            .update(&mut cx_a, |project, cx| project.share(cx))
1208            .await
1209            .unwrap();
1210
1211        // Join that project as client B
1212        let project_b = Project::remote(
1213            project_id,
1214            client_b.clone(),
1215            client_b.user_store.clone(),
1216            lang_registry.clone(),
1217            fs.clone(),
1218            &mut cx_b.to_async(),
1219        )
1220        .await
1221        .unwrap();
1222
1223        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1224        worktree_b
1225            .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1226            .await
1227            .unwrap();
1228
1229        project_a
1230            .update(&mut cx_a, |project, cx| project.unshare(cx))
1231            .await
1232            .unwrap();
1233        project_b
1234            .condition(&mut cx_b, |project, _| project.is_read_only())
1235            .await;
1236    }
1237
1238    #[gpui::test]
1239    async fn test_propagate_saves_and_fs_changes(
1240        mut cx_a: TestAppContext,
1241        mut cx_b: TestAppContext,
1242        mut cx_c: TestAppContext,
1243    ) {
1244        let lang_registry = Arc::new(LanguageRegistry::new());
1245        let fs = Arc::new(FakeFs::new());
1246        cx_a.foreground().forbid_parking();
1247
1248        // Connect to a server as 3 clients.
1249        let mut server = TestServer::start().await;
1250        let client_a = server.create_client(&mut cx_a, "user_a").await;
1251        let client_b = server.create_client(&mut cx_b, "user_b").await;
1252        let client_c = server.create_client(&mut cx_c, "user_c").await;
1253
1254        // Share a worktree as client A.
1255        fs.insert_tree(
1256            "/a",
1257            json!({
1258                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1259                "file1": "",
1260                "file2": ""
1261            }),
1262        )
1263        .await;
1264        let project_a = cx_a.update(|cx| {
1265            Project::local(
1266                client_a.clone(),
1267                client_a.user_store.clone(),
1268                lang_registry.clone(),
1269                fs.clone(),
1270                cx,
1271            )
1272        });
1273        let worktree_a = project_a
1274            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1275            .await
1276            .unwrap();
1277        worktree_a
1278            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1279            .await;
1280        let project_id = project_a
1281            .update(&mut cx_a, |project, _| project.next_remote_id())
1282            .await;
1283        project_a
1284            .update(&mut cx_a, |project, cx| project.share(cx))
1285            .await
1286            .unwrap();
1287
1288        // Join that worktree as clients B and C.
1289        let project_b = Project::remote(
1290            project_id,
1291            client_b.clone(),
1292            client_b.user_store.clone(),
1293            lang_registry.clone(),
1294            fs.clone(),
1295            &mut cx_b.to_async(),
1296        )
1297        .await
1298        .unwrap();
1299        let project_c = Project::remote(
1300            project_id,
1301            client_c.clone(),
1302            client_c.user_store.clone(),
1303            lang_registry.clone(),
1304            fs.clone(),
1305            &mut cx_c.to_async(),
1306        )
1307        .await
1308        .unwrap();
1309
1310        // Open and edit a buffer as both guests B and C.
1311        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1312        let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1313        let buffer_b = worktree_b
1314            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1315            .await
1316            .unwrap();
1317        let buffer_c = worktree_c
1318            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1319            .await
1320            .unwrap();
1321        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1322        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1323
1324        // Open and edit that buffer as the host.
1325        let buffer_a = worktree_a
1326            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1327            .await
1328            .unwrap();
1329
1330        buffer_a
1331            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1332            .await;
1333        buffer_a.update(&mut cx_a, |buf, cx| {
1334            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1335        });
1336
1337        // Wait for edits to propagate
1338        buffer_a
1339            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1340            .await;
1341        buffer_b
1342            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1343            .await;
1344        buffer_c
1345            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1346            .await;
1347
1348        // Edit the buffer as the host and concurrently save as guest B.
1349        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1350        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1351        save_b.await.unwrap();
1352        assert_eq!(
1353            fs.load("/a/file1".as_ref()).await.unwrap(),
1354            "hi-a, i-am-c, i-am-b, i-am-a"
1355        );
1356        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1357        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1358        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1359
1360        // Make changes on host's file system, see those changes on the guests.
1361        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1362            .await
1363            .unwrap();
1364        fs.insert_file(Path::new("/a/file4"), "4".into())
1365            .await
1366            .unwrap();
1367
1368        worktree_b
1369            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1370            .await;
1371        worktree_c
1372            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1373            .await;
1374        worktree_b.read_with(&cx_b, |tree, _| {
1375            assert_eq!(
1376                tree.paths()
1377                    .map(|p| p.to_string_lossy())
1378                    .collect::<Vec<_>>(),
1379                &[".zed.toml", "file1", "file3", "file4"]
1380            )
1381        });
1382        worktree_c.read_with(&cx_c, |tree, _| {
1383            assert_eq!(
1384                tree.paths()
1385                    .map(|p| p.to_string_lossy())
1386                    .collect::<Vec<_>>(),
1387                &[".zed.toml", "file1", "file3", "file4"]
1388            )
1389        });
1390    }
1391
1392    #[gpui::test]
1393    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1394        cx_a.foreground().forbid_parking();
1395        let lang_registry = Arc::new(LanguageRegistry::new());
1396        let fs = Arc::new(FakeFs::new());
1397
1398        // Connect to a server as 2 clients.
1399        let mut server = TestServer::start().await;
1400        let client_a = server.create_client(&mut cx_a, "user_a").await;
1401        let client_b = server.create_client(&mut cx_b, "user_b").await;
1402
1403        // Share a project as client A
1404        fs.insert_tree(
1405            "/dir",
1406            json!({
1407                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1408                "a.txt": "a-contents",
1409            }),
1410        )
1411        .await;
1412
1413        let project_a = cx_a.update(|cx| {
1414            Project::local(
1415                client_a.clone(),
1416                client_a.user_store.clone(),
1417                lang_registry.clone(),
1418                fs.clone(),
1419                cx,
1420            )
1421        });
1422        let worktree_a = project_a
1423            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1424            .await
1425            .unwrap();
1426        worktree_a
1427            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1428            .await;
1429        let project_id = project_a
1430            .update(&mut cx_a, |project, _| project.next_remote_id())
1431            .await;
1432        project_a
1433            .update(&mut cx_a, |project, cx| project.share(cx))
1434            .await
1435            .unwrap();
1436
1437        // Join that project as client B
1438        let project_b = Project::remote(
1439            project_id,
1440            client_b.clone(),
1441            client_b.user_store.clone(),
1442            lang_registry.clone(),
1443            fs.clone(),
1444            &mut cx_b.to_async(),
1445        )
1446        .await
1447        .unwrap();
1448        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1449
1450        // Open a buffer as client B
1451        let buffer_b = worktree_b
1452            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1453            .await
1454            .unwrap();
1455        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1456
1457        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1458        buffer_b.read_with(&cx_b, |buf, _| {
1459            assert!(buf.is_dirty());
1460            assert!(!buf.has_conflict());
1461        });
1462
1463        buffer_b
1464            .update(&mut cx_b, |buf, cx| buf.save(cx))
1465            .unwrap()
1466            .await
1467            .unwrap();
1468        worktree_b
1469            .condition(&cx_b, |_, cx| {
1470                buffer_b.read(cx).file().unwrap().mtime() != mtime
1471            })
1472            .await;
1473        buffer_b.read_with(&cx_b, |buf, _| {
1474            assert!(!buf.is_dirty());
1475            assert!(!buf.has_conflict());
1476        });
1477
1478        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1479        buffer_b.read_with(&cx_b, |buf, _| {
1480            assert!(buf.is_dirty());
1481            assert!(!buf.has_conflict());
1482        });
1483    }
1484
1485    #[gpui::test]
1486    async fn test_editing_while_guest_opens_buffer(
1487        mut cx_a: TestAppContext,
1488        mut cx_b: TestAppContext,
1489    ) {
1490        cx_a.foreground().forbid_parking();
1491        let lang_registry = Arc::new(LanguageRegistry::new());
1492        let fs = Arc::new(FakeFs::new());
1493
1494        // Connect to a server as 2 clients.
1495        let mut server = TestServer::start().await;
1496        let client_a = server.create_client(&mut cx_a, "user_a").await;
1497        let client_b = server.create_client(&mut cx_b, "user_b").await;
1498
1499        // Share a project as client A
1500        fs.insert_tree(
1501            "/dir",
1502            json!({
1503                ".zed.toml": r#"collaborators = ["user_b"]"#,
1504                "a.txt": "a-contents",
1505            }),
1506        )
1507        .await;
1508        let project_a = cx_a.update(|cx| {
1509            Project::local(
1510                client_a.clone(),
1511                client_a.user_store.clone(),
1512                lang_registry.clone(),
1513                fs.clone(),
1514                cx,
1515            )
1516        });
1517        let worktree_a = project_a
1518            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1519            .await
1520            .unwrap();
1521        worktree_a
1522            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1523            .await;
1524        let project_id = project_a
1525            .update(&mut cx_a, |project, _| project.next_remote_id())
1526            .await;
1527        project_a
1528            .update(&mut cx_a, |project, cx| project.share(cx))
1529            .await
1530            .unwrap();
1531
1532        // Join that project as client B
1533        let project_b = Project::remote(
1534            project_id,
1535            client_b.clone(),
1536            client_b.user_store.clone(),
1537            lang_registry.clone(),
1538            fs.clone(),
1539            &mut cx_b.to_async(),
1540        )
1541        .await
1542        .unwrap();
1543        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1544
1545        // Open a buffer as client A
1546        let buffer_a = worktree_a
1547            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1548            .await
1549            .unwrap();
1550
1551        // Start opening the same buffer as client B
1552        let buffer_b = cx_b
1553            .background()
1554            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1555        task::yield_now().await;
1556
1557        // Edit the buffer as client A while client B is still opening it.
1558        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1559
1560        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1561        let buffer_b = buffer_b.await.unwrap();
1562        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1563    }
1564
1565    #[gpui::test]
1566    async fn test_leaving_worktree_while_opening_buffer(
1567        mut cx_a: TestAppContext,
1568        mut cx_b: TestAppContext,
1569    ) {
1570        cx_a.foreground().forbid_parking();
1571        let lang_registry = Arc::new(LanguageRegistry::new());
1572        let fs = Arc::new(FakeFs::new());
1573
1574        // Connect to a server as 2 clients.
1575        let mut server = TestServer::start().await;
1576        let client_a = server.create_client(&mut cx_a, "user_a").await;
1577        let client_b = server.create_client(&mut cx_b, "user_b").await;
1578
1579        // Share a project as client A
1580        fs.insert_tree(
1581            "/dir",
1582            json!({
1583                ".zed.toml": r#"collaborators = ["user_b"]"#,
1584                "a.txt": "a-contents",
1585            }),
1586        )
1587        .await;
1588        let project_a = cx_a.update(|cx| {
1589            Project::local(
1590                client_a.clone(),
1591                client_a.user_store.clone(),
1592                lang_registry.clone(),
1593                fs.clone(),
1594                cx,
1595            )
1596        });
1597        let worktree_a = project_a
1598            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1599            .await
1600            .unwrap();
1601        worktree_a
1602            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1603            .await;
1604        let project_id = project_a
1605            .update(&mut cx_a, |project, _| project.next_remote_id())
1606            .await;
1607        project_a
1608            .update(&mut cx_a, |project, cx| project.share(cx))
1609            .await
1610            .unwrap();
1611
1612        // Join that project as client B
1613        let project_b = Project::remote(
1614            project_id,
1615            client_b.clone(),
1616            client_b.user_store.clone(),
1617            lang_registry.clone(),
1618            fs.clone(),
1619            &mut cx_b.to_async(),
1620        )
1621        .await
1622        .unwrap();
1623        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1624
1625        // See that a guest has joined as client A.
1626        project_a
1627            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1628            .await;
1629
1630        // Begin opening a buffer as client B, but leave the project before the open completes.
1631        let buffer_b = cx_b
1632            .background()
1633            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1634        cx_b.update(|_| drop(project_b));
1635        drop(buffer_b);
1636
1637        // See that the guest has left.
1638        project_a
1639            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1640            .await;
1641    }
1642
1643    #[gpui::test]
1644    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1645        cx_a.foreground().forbid_parking();
1646        let lang_registry = Arc::new(LanguageRegistry::new());
1647        let fs = Arc::new(FakeFs::new());
1648
1649        // Connect to a server as 2 clients.
1650        let mut server = TestServer::start().await;
1651        let client_a = server.create_client(&mut cx_a, "user_a").await;
1652        let client_b = server.create_client(&mut cx_b, "user_b").await;
1653
1654        // Share a project as client A
1655        fs.insert_tree(
1656            "/a",
1657            json!({
1658                ".zed.toml": r#"collaborators = ["user_b"]"#,
1659                "a.txt": "a-contents",
1660                "b.txt": "b-contents",
1661            }),
1662        )
1663        .await;
1664        let project_a = cx_a.update(|cx| {
1665            Project::local(
1666                client_a.clone(),
1667                client_a.user_store.clone(),
1668                lang_registry.clone(),
1669                fs.clone(),
1670                cx,
1671            )
1672        });
1673        let worktree_a = project_a
1674            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1675            .await
1676            .unwrap();
1677        worktree_a
1678            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1679            .await;
1680        let project_id = project_a
1681            .update(&mut cx_a, |project, _| project.next_remote_id())
1682            .await;
1683        project_a
1684            .update(&mut cx_a, |project, cx| project.share(cx))
1685            .await
1686            .unwrap();
1687
1688        // Join that project as client B
1689        let _project_b = Project::remote(
1690            project_id,
1691            client_b.clone(),
1692            client_b.user_store.clone(),
1693            lang_registry.clone(),
1694            fs.clone(),
1695            &mut cx_b.to_async(),
1696        )
1697        .await
1698        .unwrap();
1699
1700        // See that a guest has joined as client A.
1701        project_a
1702            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1703            .await;
1704
1705        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1706        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1707        project_a
1708            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1709            .await;
1710    }
1711
1712    #[gpui::test]
1713    async fn test_collaborating_with_diagnostics(
1714        mut cx_a: TestAppContext,
1715        mut cx_b: TestAppContext,
1716    ) {
1717        cx_a.foreground().forbid_parking();
1718        let mut lang_registry = Arc::new(LanguageRegistry::new());
1719        let fs = Arc::new(FakeFs::new());
1720
1721        // Set up a fake language server.
1722        let (language_server_config, mut fake_language_server) =
1723            LanguageServerConfig::fake(cx_a.background()).await;
1724        Arc::get_mut(&mut lang_registry)
1725            .unwrap()
1726            .add(Arc::new(Language::new(
1727                LanguageConfig {
1728                    name: "Rust".to_string(),
1729                    path_suffixes: vec!["rs".to_string()],
1730                    language_server: Some(language_server_config),
1731                    ..Default::default()
1732                },
1733                Some(tree_sitter_rust::language()),
1734            )));
1735
1736        // Connect to a server as 2 clients.
1737        let mut server = TestServer::start().await;
1738        let client_a = server.create_client(&mut cx_a, "user_a").await;
1739        let client_b = server.create_client(&mut cx_b, "user_b").await;
1740
1741        // Share a project as client A
1742        fs.insert_tree(
1743            "/a",
1744            json!({
1745                ".zed.toml": r#"collaborators = ["user_b"]"#,
1746                "a.rs": "let one = two",
1747                "other.rs": "",
1748            }),
1749        )
1750        .await;
1751        let project_a = cx_a.update(|cx| {
1752            Project::local(
1753                client_a.clone(),
1754                client_a.user_store.clone(),
1755                lang_registry.clone(),
1756                fs.clone(),
1757                cx,
1758            )
1759        });
1760        let worktree_a = project_a
1761            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1762            .await
1763            .unwrap();
1764        worktree_a
1765            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1766            .await;
1767        let project_id = project_a
1768            .update(&mut cx_a, |project, _| project.next_remote_id())
1769            .await;
1770        project_a
1771            .update(&mut cx_a, |project, cx| project.share(cx))
1772            .await
1773            .unwrap();
1774
1775        // Cause the language server to start.
1776        let _ = cx_a
1777            .background()
1778            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1779                worktree.open_buffer("other.rs", cx)
1780            }))
1781            .await
1782            .unwrap();
1783
1784        // Simulate a language server reporting errors for a file.
1785        fake_language_server
1786            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1787                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1788                version: None,
1789                diagnostics: vec![
1790                    lsp::Diagnostic {
1791                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1792                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1793                        message: "message 1".to_string(),
1794                        ..Default::default()
1795                    },
1796                    lsp::Diagnostic {
1797                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1798                        range: lsp::Range::new(
1799                            lsp::Position::new(0, 10),
1800                            lsp::Position::new(0, 13),
1801                        ),
1802                        message: "message 2".to_string(),
1803                        ..Default::default()
1804                    },
1805                ],
1806            })
1807            .await;
1808
1809        // Join the worktree as client B.
1810        let project_b = Project::remote(
1811            project_id,
1812            client_b.clone(),
1813            client_b.user_store.clone(),
1814            lang_registry.clone(),
1815            fs.clone(),
1816            &mut cx_b.to_async(),
1817        )
1818        .await
1819        .unwrap();
1820        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1821
1822        // Open the file with the errors.
1823        let buffer_b = cx_b
1824            .background()
1825            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1826            .await
1827            .unwrap();
1828
1829        buffer_b.read_with(&cx_b, |buffer, _| {
1830            assert_eq!(
1831                buffer
1832                    .snapshot()
1833                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1834                    .collect::<Vec<_>>(),
1835                &[
1836                    DiagnosticEntry {
1837                        range: Point::new(0, 4)..Point::new(0, 7),
1838                        diagnostic: Diagnostic {
1839                            group_id: 0,
1840                            message: "message 1".to_string(),
1841                            severity: lsp::DiagnosticSeverity::ERROR,
1842                            is_primary: true,
1843                            ..Default::default()
1844                        }
1845                    },
1846                    DiagnosticEntry {
1847                        range: Point::new(0, 10)..Point::new(0, 13),
1848                        diagnostic: Diagnostic {
1849                            group_id: 1,
1850                            severity: lsp::DiagnosticSeverity::WARNING,
1851                            message: "message 2".to_string(),
1852                            is_primary: true,
1853                            ..Default::default()
1854                        }
1855                    }
1856                ]
1857            );
1858        });
1859    }
1860
1861    #[gpui::test]
1862    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1863        cx_a.foreground().forbid_parking();
1864
1865        // Connect to a server as 2 clients.
1866        let mut server = TestServer::start().await;
1867        let client_a = server.create_client(&mut cx_a, "user_a").await;
1868        let client_b = server.create_client(&mut cx_b, "user_b").await;
1869
1870        // Create an org that includes these 2 users.
1871        let db = &server.app_state.db;
1872        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1873        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1874            .await
1875            .unwrap();
1876        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1877            .await
1878            .unwrap();
1879
1880        // Create a channel that includes all the users.
1881        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1882        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1883            .await
1884            .unwrap();
1885        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1886            .await
1887            .unwrap();
1888        db.create_channel_message(
1889            channel_id,
1890            client_b.current_user_id(&cx_b),
1891            "hello A, it's B.",
1892            OffsetDateTime::now_utc(),
1893            1,
1894        )
1895        .await
1896        .unwrap();
1897
1898        let channels_a = cx_a
1899            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1900        channels_a
1901            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1902            .await;
1903        channels_a.read_with(&cx_a, |list, _| {
1904            assert_eq!(
1905                list.available_channels().unwrap(),
1906                &[ChannelDetails {
1907                    id: channel_id.to_proto(),
1908                    name: "test-channel".to_string()
1909                }]
1910            )
1911        });
1912        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1913            this.get_channel(channel_id.to_proto(), cx).unwrap()
1914        });
1915        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1916        channel_a
1917            .condition(&cx_a, |channel, _| {
1918                channel_messages(channel)
1919                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1920            })
1921            .await;
1922
1923        let channels_b = cx_b
1924            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1925        channels_b
1926            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1927            .await;
1928        channels_b.read_with(&cx_b, |list, _| {
1929            assert_eq!(
1930                list.available_channels().unwrap(),
1931                &[ChannelDetails {
1932                    id: channel_id.to_proto(),
1933                    name: "test-channel".to_string()
1934                }]
1935            )
1936        });
1937
1938        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1939            this.get_channel(channel_id.to_proto(), cx).unwrap()
1940        });
1941        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1942        channel_b
1943            .condition(&cx_b, |channel, _| {
1944                channel_messages(channel)
1945                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1946            })
1947            .await;
1948
1949        channel_a
1950            .update(&mut cx_a, |channel, cx| {
1951                channel
1952                    .send_message("oh, hi B.".to_string(), cx)
1953                    .unwrap()
1954                    .detach();
1955                let task = channel.send_message("sup".to_string(), cx).unwrap();
1956                assert_eq!(
1957                    channel_messages(channel),
1958                    &[
1959                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1960                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1961                        ("user_a".to_string(), "sup".to_string(), true)
1962                    ]
1963                );
1964                task
1965            })
1966            .await
1967            .unwrap();
1968
1969        channel_b
1970            .condition(&cx_b, |channel, _| {
1971                channel_messages(channel)
1972                    == [
1973                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1974                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1975                        ("user_a".to_string(), "sup".to_string(), false),
1976                    ]
1977            })
1978            .await;
1979
1980        assert_eq!(
1981            server
1982                .state()
1983                .await
1984                .channel(channel_id)
1985                .unwrap()
1986                .connection_ids
1987                .len(),
1988            2
1989        );
1990        cx_b.update(|_| drop(channel_b));
1991        server
1992            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1993            .await;
1994
1995        cx_a.update(|_| drop(channel_a));
1996        server
1997            .condition(|state| state.channel(channel_id).is_none())
1998            .await;
1999    }
2000
2001    #[gpui::test]
2002    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2003        cx_a.foreground().forbid_parking();
2004
2005        let mut server = TestServer::start().await;
2006        let client_a = server.create_client(&mut cx_a, "user_a").await;
2007
2008        let db = &server.app_state.db;
2009        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2010        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2011        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2012            .await
2013            .unwrap();
2014        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2015            .await
2016            .unwrap();
2017
2018        let channels_a = cx_a
2019            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2020        channels_a
2021            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2022            .await;
2023        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2024            this.get_channel(channel_id.to_proto(), cx).unwrap()
2025        });
2026
2027        // Messages aren't allowed to be too long.
2028        channel_a
2029            .update(&mut cx_a, |channel, cx| {
2030                let long_body = "this is long.\n".repeat(1024);
2031                channel.send_message(long_body, cx).unwrap()
2032            })
2033            .await
2034            .unwrap_err();
2035
2036        // Messages aren't allowed to be blank.
2037        channel_a.update(&mut cx_a, |channel, cx| {
2038            channel.send_message(String::new(), cx).unwrap_err()
2039        });
2040
2041        // Leading and trailing whitespace are trimmed.
2042        channel_a
2043            .update(&mut cx_a, |channel, cx| {
2044                channel
2045                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2046                    .unwrap()
2047            })
2048            .await
2049            .unwrap();
2050        assert_eq!(
2051            db.get_channel_messages(channel_id, 10, None)
2052                .await
2053                .unwrap()
2054                .iter()
2055                .map(|m| &m.body)
2056                .collect::<Vec<_>>(),
2057            &["surrounded by whitespace"]
2058        );
2059    }
2060
2061    #[gpui::test]
2062    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2063        cx_a.foreground().forbid_parking();
2064
2065        // Connect to a server as 2 clients.
2066        let mut server = TestServer::start().await;
2067        let client_a = server.create_client(&mut cx_a, "user_a").await;
2068        let client_b = server.create_client(&mut cx_b, "user_b").await;
2069        let mut status_b = client_b.status();
2070
2071        // Create an org that includes these 2 users.
2072        let db = &server.app_state.db;
2073        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2074        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2075            .await
2076            .unwrap();
2077        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2078            .await
2079            .unwrap();
2080
2081        // Create a channel that includes all the users.
2082        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2083        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2084            .await
2085            .unwrap();
2086        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2087            .await
2088            .unwrap();
2089        db.create_channel_message(
2090            channel_id,
2091            client_b.current_user_id(&cx_b),
2092            "hello A, it's B.",
2093            OffsetDateTime::now_utc(),
2094            2,
2095        )
2096        .await
2097        .unwrap();
2098
2099        let channels_a = cx_a
2100            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2101        channels_a
2102            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2103            .await;
2104
2105        channels_a.read_with(&cx_a, |list, _| {
2106            assert_eq!(
2107                list.available_channels().unwrap(),
2108                &[ChannelDetails {
2109                    id: channel_id.to_proto(),
2110                    name: "test-channel".to_string()
2111                }]
2112            )
2113        });
2114        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2115            this.get_channel(channel_id.to_proto(), cx).unwrap()
2116        });
2117        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2118        channel_a
2119            .condition(&cx_a, |channel, _| {
2120                channel_messages(channel)
2121                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2122            })
2123            .await;
2124
2125        let channels_b = cx_b
2126            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2127        channels_b
2128            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2129            .await;
2130        channels_b.read_with(&cx_b, |list, _| {
2131            assert_eq!(
2132                list.available_channels().unwrap(),
2133                &[ChannelDetails {
2134                    id: channel_id.to_proto(),
2135                    name: "test-channel".to_string()
2136                }]
2137            )
2138        });
2139
2140        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2141            this.get_channel(channel_id.to_proto(), cx).unwrap()
2142        });
2143        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2144        channel_b
2145            .condition(&cx_b, |channel, _| {
2146                channel_messages(channel)
2147                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2148            })
2149            .await;
2150
2151        // Disconnect client B, ensuring we can still access its cached channel data.
2152        server.forbid_connections();
2153        server.disconnect_client(client_b.current_user_id(&cx_b));
2154        while !matches!(
2155            status_b.recv().await,
2156            Some(client::Status::ReconnectionError { .. })
2157        ) {}
2158
2159        channels_b.read_with(&cx_b, |channels, _| {
2160            assert_eq!(
2161                channels.available_channels().unwrap(),
2162                [ChannelDetails {
2163                    id: channel_id.to_proto(),
2164                    name: "test-channel".to_string()
2165                }]
2166            )
2167        });
2168        channel_b.read_with(&cx_b, |channel, _| {
2169            assert_eq!(
2170                channel_messages(channel),
2171                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2172            )
2173        });
2174
2175        // Send a message from client B while it is disconnected.
2176        channel_b
2177            .update(&mut cx_b, |channel, cx| {
2178                let task = channel
2179                    .send_message("can you see this?".to_string(), cx)
2180                    .unwrap();
2181                assert_eq!(
2182                    channel_messages(channel),
2183                    &[
2184                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2185                        ("user_b".to_string(), "can you see this?".to_string(), true)
2186                    ]
2187                );
2188                task
2189            })
2190            .await
2191            .unwrap_err();
2192
2193        // Send a message from client A while B is disconnected.
2194        channel_a
2195            .update(&mut cx_a, |channel, cx| {
2196                channel
2197                    .send_message("oh, hi B.".to_string(), cx)
2198                    .unwrap()
2199                    .detach();
2200                let task = channel.send_message("sup".to_string(), cx).unwrap();
2201                assert_eq!(
2202                    channel_messages(channel),
2203                    &[
2204                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2205                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2206                        ("user_a".to_string(), "sup".to_string(), true)
2207                    ]
2208                );
2209                task
2210            })
2211            .await
2212            .unwrap();
2213
2214        // Give client B a chance to reconnect.
2215        server.allow_connections();
2216        cx_b.foreground().advance_clock(Duration::from_secs(10));
2217
2218        // Verify that B sees the new messages upon reconnection, as well as the message client B
2219        // sent while offline.
2220        channel_b
2221            .condition(&cx_b, |channel, _| {
2222                channel_messages(channel)
2223                    == [
2224                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2225                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2226                        ("user_a".to_string(), "sup".to_string(), false),
2227                        ("user_b".to_string(), "can you see this?".to_string(), false),
2228                    ]
2229            })
2230            .await;
2231
2232        // Ensure client A and B can communicate normally after reconnection.
2233        channel_a
2234            .update(&mut cx_a, |channel, cx| {
2235                channel.send_message("you online?".to_string(), cx).unwrap()
2236            })
2237            .await
2238            .unwrap();
2239        channel_b
2240            .condition(&cx_b, |channel, _| {
2241                channel_messages(channel)
2242                    == [
2243                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2244                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2245                        ("user_a".to_string(), "sup".to_string(), false),
2246                        ("user_b".to_string(), "can you see this?".to_string(), false),
2247                        ("user_a".to_string(), "you online?".to_string(), false),
2248                    ]
2249            })
2250            .await;
2251
2252        channel_b
2253            .update(&mut cx_b, |channel, cx| {
2254                channel.send_message("yep".to_string(), cx).unwrap()
2255            })
2256            .await
2257            .unwrap();
2258        channel_a
2259            .condition(&cx_a, |channel, _| {
2260                channel_messages(channel)
2261                    == [
2262                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2263                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2264                        ("user_a".to_string(), "sup".to_string(), false),
2265                        ("user_b".to_string(), "can you see this?".to_string(), false),
2266                        ("user_a".to_string(), "you online?".to_string(), false),
2267                        ("user_b".to_string(), "yep".to_string(), false),
2268                    ]
2269            })
2270            .await;
2271    }
2272
2273    #[gpui::test]
2274    async fn test_contacts(
2275        mut cx_a: TestAppContext,
2276        mut cx_b: TestAppContext,
2277        mut cx_c: TestAppContext,
2278    ) {
2279        cx_a.foreground().forbid_parking();
2280        let lang_registry = Arc::new(LanguageRegistry::new());
2281        let fs = Arc::new(FakeFs::new());
2282
2283        // Connect to a server as 3 clients.
2284        let mut server = TestServer::start().await;
2285        let client_a = server.create_client(&mut cx_a, "user_a").await;
2286        let client_b = server.create_client(&mut cx_b, "user_b").await;
2287        let client_c = server.create_client(&mut cx_c, "user_c").await;
2288
2289        // Share a worktree as client A.
2290        fs.insert_tree(
2291            "/a",
2292            json!({
2293                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2294            }),
2295        )
2296        .await;
2297
2298        let project_a = cx_a.update(|cx| {
2299            Project::local(
2300                client_a.clone(),
2301                client_a.user_store.clone(),
2302                lang_registry.clone(),
2303                fs.clone(),
2304                cx,
2305            )
2306        });
2307        let worktree_a = project_a
2308            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2309            .await
2310            .unwrap();
2311        worktree_a
2312            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2313            .await;
2314
2315        client_a
2316            .user_store
2317            .condition(&cx_a, |user_store, _| {
2318                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2319            })
2320            .await;
2321        client_b
2322            .user_store
2323            .condition(&cx_b, |user_store, _| {
2324                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2325            })
2326            .await;
2327        client_c
2328            .user_store
2329            .condition(&cx_c, |user_store, _| {
2330                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2331            })
2332            .await;
2333
2334        let project_id = project_a
2335            .update(&mut cx_a, |project, _| project.next_remote_id())
2336            .await;
2337        project_a
2338            .update(&mut cx_a, |project, cx| project.share(cx))
2339            .await
2340            .unwrap();
2341
2342        let _project_b = Project::remote(
2343            project_id,
2344            client_b.clone(),
2345            client_b.user_store.clone(),
2346            lang_registry.clone(),
2347            fs.clone(),
2348            &mut cx_b.to_async(),
2349        )
2350        .await
2351        .unwrap();
2352
2353        client_a
2354            .user_store
2355            .condition(&cx_a, |user_store, _| {
2356                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2357            })
2358            .await;
2359        client_b
2360            .user_store
2361            .condition(&cx_b, |user_store, _| {
2362                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2363            })
2364            .await;
2365        client_c
2366            .user_store
2367            .condition(&cx_c, |user_store, _| {
2368                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2369            })
2370            .await;
2371
2372        project_a
2373            .condition(&cx_a, |project, _| {
2374                project.collaborators().contains_key(&client_b.peer_id)
2375            })
2376            .await;
2377
2378        cx_a.update(move |_| drop(project_a));
2379        client_a
2380            .user_store
2381            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2382            .await;
2383        client_b
2384            .user_store
2385            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2386            .await;
2387        client_c
2388            .user_store
2389            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2390            .await;
2391
2392        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2393            user_store
2394                .contacts()
2395                .iter()
2396                .map(|contact| {
2397                    let worktrees = contact
2398                        .projects
2399                        .iter()
2400                        .map(|p| {
2401                            (
2402                                p.worktree_root_names[0].as_str(),
2403                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2404                            )
2405                        })
2406                        .collect();
2407                    (contact.user.github_login.as_str(), worktrees)
2408                })
2409                .collect()
2410        }
2411    }
2412
2413    struct TestServer {
2414        peer: Arc<Peer>,
2415        app_state: Arc<AppState>,
2416        server: Arc<Server>,
2417        notifications: mpsc::Receiver<()>,
2418        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2419        forbid_connections: Arc<AtomicBool>,
2420        _test_db: TestDb,
2421    }
2422
2423    impl TestServer {
2424        async fn start() -> Self {
2425            let test_db = TestDb::new();
2426            let app_state = Self::build_app_state(&test_db).await;
2427            let peer = Peer::new();
2428            let notifications = mpsc::channel(128);
2429            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2430            Self {
2431                peer,
2432                app_state,
2433                server,
2434                notifications: notifications.1,
2435                connection_killers: Default::default(),
2436                forbid_connections: Default::default(),
2437                _test_db: test_db,
2438            }
2439        }
2440
2441        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2442            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2443            let client_name = name.to_string();
2444            let mut client = Client::new();
2445            let server = self.server.clone();
2446            let connection_killers = self.connection_killers.clone();
2447            let forbid_connections = self.forbid_connections.clone();
2448            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2449
2450            Arc::get_mut(&mut client)
2451                .unwrap()
2452                .override_authenticate(move |cx| {
2453                    cx.spawn(|_| async move {
2454                        let access_token = "the-token".to_string();
2455                        Ok(Credentials {
2456                            user_id: user_id.0 as u64,
2457                            access_token,
2458                        })
2459                    })
2460                })
2461                .override_establish_connection(move |credentials, cx| {
2462                    assert_eq!(credentials.user_id, user_id.0 as u64);
2463                    assert_eq!(credentials.access_token, "the-token");
2464
2465                    let server = server.clone();
2466                    let connection_killers = connection_killers.clone();
2467                    let forbid_connections = forbid_connections.clone();
2468                    let client_name = client_name.clone();
2469                    let connection_id_tx = connection_id_tx.clone();
2470                    cx.spawn(move |cx| async move {
2471                        if forbid_connections.load(SeqCst) {
2472                            Err(EstablishConnectionError::other(anyhow!(
2473                                "server is forbidding connections"
2474                            )))
2475                        } else {
2476                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2477                            connection_killers.lock().insert(user_id, kill_conn);
2478                            cx.background()
2479                                .spawn(server.handle_connection(
2480                                    server_conn,
2481                                    client_name,
2482                                    user_id,
2483                                    Some(connection_id_tx),
2484                                ))
2485                                .detach();
2486                            Ok(client_conn)
2487                        }
2488                    })
2489                });
2490
2491            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2492            client
2493                .authenticate_and_connect(&cx.to_async())
2494                .await
2495                .unwrap();
2496
2497            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2498            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2499            let mut authed_user =
2500                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2501            while authed_user.recv().await.unwrap().is_none() {}
2502
2503            TestClient {
2504                client,
2505                peer_id,
2506                user_store,
2507            }
2508        }
2509
2510        fn disconnect_client(&self, user_id: UserId) {
2511            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2512                let _ = kill_conn.try_send(Some(()));
2513            }
2514        }
2515
2516        fn forbid_connections(&self) {
2517            self.forbid_connections.store(true, SeqCst);
2518        }
2519
2520        fn allow_connections(&self) {
2521            self.forbid_connections.store(false, SeqCst);
2522        }
2523
2524        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2525            let mut config = Config::default();
2526            config.session_secret = "a".repeat(32);
2527            config.database_url = test_db.url.clone();
2528            let github_client = github::AppClient::test();
2529            Arc::new(AppState {
2530                db: test_db.db().clone(),
2531                handlebars: Default::default(),
2532                auth_client: auth::build_client("", ""),
2533                repo_client: github::RepoClient::test(&github_client),
2534                github_client,
2535                config,
2536            })
2537        }
2538
2539        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2540            self.server.store.read()
2541        }
2542
2543        async fn condition<F>(&mut self, mut predicate: F)
2544        where
2545            F: FnMut(&Store) -> bool,
2546        {
2547            async_std::future::timeout(Duration::from_millis(500), async {
2548                while !(predicate)(&*self.server.store.read()) {
2549                    self.notifications.recv().await;
2550                }
2551            })
2552            .await
2553            .expect("condition timed out");
2554        }
2555    }
2556
2557    impl Drop for TestServer {
2558        fn drop(&mut self) {
2559            task::block_on(self.peer.reset());
2560        }
2561    }
2562
2563    struct TestClient {
2564        client: Arc<Client>,
2565        pub peer_id: PeerId,
2566        pub user_store: ModelHandle<UserStore>,
2567    }
2568
2569    impl Deref for TestClient {
2570        type Target = Arc<Client>;
2571
2572        fn deref(&self) -> &Self::Target {
2573            &self.client
2574        }
2575    }
2576
2577    impl TestClient {
2578        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2579            UserId::from_proto(
2580                self.user_store
2581                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2582            )
2583        }
2584    }
2585
2586    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2587        channel
2588            .messages()
2589            .cursor::<()>()
2590            .map(|m| {
2591                (
2592                    m.sender.github_login.clone(),
2593                    m.body.clone(),
2594                    m.is_pending(),
2595                )
2596            })
2597            .collect()
2598    }
2599
2600    struct EmptyView;
2601
2602    impl gpui::Entity for EmptyView {
2603        type Event = ();
2604    }
2605
2606    impl gpui::View for EmptyView {
2607        fn ui_name() -> &'static str {
2608            "empty view"
2609        }
2610
2611        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2612            gpui::Element::boxed(gpui::elements::Empty)
2613        }
2614    }
2615}