rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::open_buffer)
  75            .add_handler(Server::close_buffer)
  76            .add_handler(Server::update_buffer)
  77            .add_handler(Server::buffer_saved)
  78            .add_handler(Server::save_buffer)
  79            .add_handler(Server::get_channels)
  80            .add_handler(Server::get_users)
  81            .add_handler(Server::join_channel)
  82            .add_handler(Server::leave_channel)
  83            .add_handler(Server::send_channel_message)
  84            .add_handler(Server::get_channel_messages);
  85
  86        Arc::new(server)
  87    }
  88
  89    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  90    where
  91        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  92        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  93        M: EnvelopedMessage,
  94    {
  95        let prev_handler = self.handlers.insert(
  96            TypeId::of::<M>(),
  97            Box::new(move |server, envelope| {
  98                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
  99                (handler)(server, *envelope).boxed()
 100            }),
 101        );
 102        if prev_handler.is_some() {
 103            panic!("registered a handler for the same message twice");
 104        }
 105        self
 106    }
 107
 108    pub fn handle_connection(
 109        self: &Arc<Self>,
 110        connection: Connection,
 111        addr: String,
 112        user_id: UserId,
 113        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 114    ) -> impl Future<Output = ()> {
 115        let mut this = self.clone();
 116        async move {
 117            let (connection_id, handle_io, mut incoming_rx) =
 118                this.peer.add_connection(connection).await;
 119
 120            if let Some(send_connection_id) = send_connection_id.as_mut() {
 121                let _ = send_connection_id.send(connection_id).await;
 122            }
 123
 124            this.state_mut().add_connection(connection_id, user_id);
 125            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 126                log::error!("error updating contacts for {:?}: {}", user_id, err);
 127            }
 128
 129            let handle_io = handle_io.fuse();
 130            futures::pin_mut!(handle_io);
 131            loop {
 132                let next_message = incoming_rx.recv().fuse();
 133                futures::pin_mut!(next_message);
 134                futures::select_biased! {
 135                    message = next_message => {
 136                        if let Some(message) = message {
 137                            let start_time = Instant::now();
 138                            log::info!("RPC message received: {}", message.payload_type_name());
 139                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 140                                if let Err(err) = (handler)(this.clone(), message).await {
 141                                    log::error!("error handling message: {:?}", err);
 142                                } else {
 143                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 144                                }
 145
 146                                if let Some(mut notifications) = this.notifications.clone() {
 147                                    let _ = notifications.send(()).await;
 148                                }
 149                            } else {
 150                                log::warn!("unhandled message: {}", message.payload_type_name());
 151                            }
 152                        } else {
 153                            log::info!("rpc connection closed {:?}", addr);
 154                            break;
 155                        }
 156                    }
 157                    handle_io = handle_io => {
 158                        if let Err(err) = handle_io {
 159                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 160                        }
 161                        break;
 162                    }
 163                }
 164            }
 165
 166            if let Err(err) = this.sign_out(connection_id).await {
 167                log::error!("error signing out connection {:?} - {:?}", addr, err);
 168            }
 169        }
 170    }
 171
 172    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 173        self.peer.disconnect(connection_id).await;
 174        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 175
 176        for (project_id, project) in removed_connection.hosted_projects {
 177            if let Some(share) = project.share {
 178                broadcast(
 179                    connection_id,
 180                    share.guests.keys().copied().collect(),
 181                    |conn_id| {
 182                        self.peer
 183                            .send(conn_id, proto::UnshareProject { project_id })
 184                    },
 185                )
 186                .await?;
 187            }
 188        }
 189
 190        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 191            broadcast(connection_id, peer_ids, |conn_id| {
 192                self.peer.send(
 193                    conn_id,
 194                    proto::RemoveProjectCollaborator {
 195                        project_id,
 196                        peer_id: connection_id.0,
 197                    },
 198                )
 199            })
 200            .await?;
 201        }
 202
 203        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 204            .await?;
 205
 206        Ok(())
 207    }
 208
 209    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 210        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 211        Ok(())
 212    }
 213
 214    async fn register_project(
 215        mut self: Arc<Server>,
 216        request: TypedEnvelope<proto::RegisterProject>,
 217    ) -> tide::Result<()> {
 218        let project_id = {
 219            let mut state = self.state_mut();
 220            let user_id = state.user_id_for_connection(request.sender_id)?;
 221            state.register_project(request.sender_id, user_id)
 222        };
 223        self.peer
 224            .respond(
 225                request.receipt(),
 226                proto::RegisterProjectResponse { project_id },
 227            )
 228            .await?;
 229        Ok(())
 230    }
 231
 232    async fn unregister_project(
 233        mut self: Arc<Server>,
 234        request: TypedEnvelope<proto::UnregisterProject>,
 235    ) -> tide::Result<()> {
 236        self.state_mut()
 237            .unregister_project(request.payload.project_id, request.sender_id);
 238        Ok(())
 239    }
 240
 241    async fn share_project(
 242        mut self: Arc<Server>,
 243        request: TypedEnvelope<proto::ShareProject>,
 244    ) -> tide::Result<()> {
 245        self.state_mut()
 246            .share_project(request.payload.project_id, request.sender_id);
 247        Ok(())
 248    }
 249
 250    async fn unshare_project(
 251        mut self: Arc<Server>,
 252        request: TypedEnvelope<proto::UnshareProject>,
 253    ) -> tide::Result<()> {
 254        let project_id = request.payload.project_id;
 255        let project = self
 256            .state_mut()
 257            .unshare_project(project_id, request.sender_id)?;
 258
 259        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 260            self.peer
 261                .send(conn_id, proto::UnshareProject { project_id })
 262        })
 263        .await?;
 264        self.update_contacts_for_users(&project.authorized_user_ids)
 265            .await?;
 266
 267        Ok(())
 268    }
 269
 270    async fn join_project(
 271        mut self: Arc<Server>,
 272        request: TypedEnvelope<proto::JoinProject>,
 273    ) -> tide::Result<()> {
 274        let project_id = request.payload.project_id;
 275
 276        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 277        let response_data = self
 278            .state_mut()
 279            .join_project(request.sender_id, user_id, project_id)
 280            .and_then(|joined| {
 281                let share = joined.project.share()?;
 282                let peer_count = share.guests.len();
 283                let mut collaborators = Vec::with_capacity(peer_count);
 284                collaborators.push(proto::Collaborator {
 285                    peer_id: joined.project.host_connection_id.0,
 286                    replica_id: 0,
 287                    user_id: joined.project.host_user_id.to_proto(),
 288                });
 289                let worktrees = joined
 290                    .project
 291                    .worktrees
 292                    .iter()
 293                    .filter_map(|(id, worktree)| {
 294                        worktree.share.as_ref().map(|share| proto::Worktree {
 295                            id: *id,
 296                            root_name: worktree.root_name.clone(),
 297                            entries: share.entries.values().cloned().collect(),
 298                        })
 299                    })
 300                    .collect();
 301                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 302                    if *peer_conn_id != request.sender_id {
 303                        collaborators.push(proto::Collaborator {
 304                            peer_id: peer_conn_id.0,
 305                            replica_id: *peer_replica_id as u32,
 306                            user_id: peer_user_id.to_proto(),
 307                        });
 308                    }
 309                }
 310                let response = proto::JoinProjectResponse {
 311                    worktrees,
 312                    replica_id: joined.replica_id as u32,
 313                    collaborators,
 314                };
 315                let connection_ids = joined.project.connection_ids();
 316                let contact_user_ids = joined.project.authorized_user_ids();
 317                Ok((response, connection_ids, contact_user_ids))
 318            });
 319
 320        match response_data {
 321            Ok((response, connection_ids, contact_user_ids)) => {
 322                broadcast(request.sender_id, connection_ids, |conn_id| {
 323                    self.peer.send(
 324                        conn_id,
 325                        proto::AddProjectCollaborator {
 326                            project_id: project_id,
 327                            collaborator: Some(proto::Collaborator {
 328                                peer_id: request.sender_id.0,
 329                                replica_id: response.replica_id,
 330                                user_id: user_id.to_proto(),
 331                            }),
 332                        },
 333                    )
 334                })
 335                .await?;
 336                self.peer.respond(request.receipt(), response).await?;
 337                self.update_contacts_for_users(&contact_user_ids).await?;
 338            }
 339            Err(error) => {
 340                self.peer
 341                    .respond_with_error(
 342                        request.receipt(),
 343                        proto::Error {
 344                            message: error.to_string(),
 345                        },
 346                    )
 347                    .await?;
 348            }
 349        }
 350
 351        Ok(())
 352    }
 353
 354    async fn leave_project(
 355        mut self: Arc<Server>,
 356        request: TypedEnvelope<proto::LeaveProject>,
 357    ) -> tide::Result<()> {
 358        let sender_id = request.sender_id;
 359        let project_id = request.payload.project_id;
 360        let worktree = self.state_mut().leave_project(sender_id, project_id);
 361        if let Some(worktree) = worktree {
 362            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 363                self.peer.send(
 364                    conn_id,
 365                    proto::RemoveProjectCollaborator {
 366                        project_id,
 367                        peer_id: sender_id.0,
 368                    },
 369                )
 370            })
 371            .await?;
 372            self.update_contacts_for_users(&worktree.authorized_user_ids)
 373                .await?;
 374        }
 375        Ok(())
 376    }
 377
 378    async fn register_worktree(
 379        mut self: Arc<Server>,
 380        request: TypedEnvelope<proto::RegisterWorktree>,
 381    ) -> tide::Result<()> {
 382        let receipt = request.receipt();
 383        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 384
 385        let mut contact_user_ids = HashSet::default();
 386        contact_user_ids.insert(host_user_id);
 387        for github_login in request.payload.authorized_logins {
 388            match self.app_state.db.create_user(&github_login, false).await {
 389                Ok(contact_user_id) => {
 390                    contact_user_ids.insert(contact_user_id);
 391                }
 392                Err(err) => {
 393                    let message = err.to_string();
 394                    self.peer
 395                        .respond_with_error(receipt, proto::Error { message })
 396                        .await?;
 397                    return Ok(());
 398                }
 399            }
 400        }
 401
 402        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 403        let ok = self.state_mut().register_worktree(
 404            request.payload.project_id,
 405            request.payload.worktree_id,
 406            Worktree {
 407                authorized_user_ids: contact_user_ids.clone(),
 408                root_name: request.payload.root_name,
 409                share: None,
 410            },
 411        );
 412
 413        if ok {
 414            self.peer.respond(receipt, proto::Ack {}).await?;
 415            self.update_contacts_for_users(&contact_user_ids).await?;
 416        } else {
 417            self.peer
 418                .respond_with_error(
 419                    receipt,
 420                    proto::Error {
 421                        message: NO_SUCH_PROJECT.to_string(),
 422                    },
 423                )
 424                .await?;
 425        }
 426
 427        Ok(())
 428    }
 429
 430    async fn unregister_worktree(
 431        mut self: Arc<Server>,
 432        request: TypedEnvelope<proto::UnregisterWorktree>,
 433    ) -> tide::Result<()> {
 434        let project_id = request.payload.project_id;
 435        let worktree_id = request.payload.worktree_id;
 436        let (worktree, guest_connection_ids) =
 437            self.state_mut()
 438                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 439
 440        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 441            self.peer.send(
 442                conn_id,
 443                proto::UnregisterWorktree {
 444                    project_id,
 445                    worktree_id,
 446                },
 447            )
 448        })
 449        .await?;
 450        self.update_contacts_for_users(&worktree.authorized_user_ids)
 451            .await?;
 452        Ok(())
 453    }
 454
 455    async fn share_worktree(
 456        mut self: Arc<Server>,
 457        mut request: TypedEnvelope<proto::ShareWorktree>,
 458    ) -> tide::Result<()> {
 459        let worktree = request
 460            .payload
 461            .worktree
 462            .as_mut()
 463            .ok_or_else(|| anyhow!("missing worktree"))?;
 464        let entries = mem::take(&mut worktree.entries)
 465            .into_iter()
 466            .map(|entry| (entry.id, entry))
 467            .collect();
 468
 469        let contact_user_ids = self.state_mut().share_worktree(
 470            request.payload.project_id,
 471            worktree.id,
 472            request.sender_id,
 473            entries,
 474        );
 475        if let Some(contact_user_ids) = contact_user_ids {
 476            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 477            self.update_contacts_for_users(&contact_user_ids).await?;
 478        } else {
 479            self.peer
 480                .respond_with_error(
 481                    request.receipt(),
 482                    proto::Error {
 483                        message: "no such worktree".to_string(),
 484                    },
 485                )
 486                .await?;
 487        }
 488        Ok(())
 489    }
 490
 491    async fn update_worktree(
 492        mut self: Arc<Server>,
 493        request: TypedEnvelope<proto::UpdateWorktree>,
 494    ) -> tide::Result<()> {
 495        let connection_ids = self
 496            .state_mut()
 497            .update_worktree(
 498                request.sender_id,
 499                request.payload.project_id,
 500                request.payload.worktree_id,
 501                &request.payload.removed_entries,
 502                &request.payload.updated_entries,
 503            )
 504            .ok_or_else(|| anyhow!("no such worktree"))?;
 505
 506        broadcast(request.sender_id, connection_ids, |connection_id| {
 507            self.peer
 508                .forward_send(request.sender_id, connection_id, request.payload.clone())
 509        })
 510        .await?;
 511
 512        Ok(())
 513    }
 514
 515    async fn open_buffer(
 516        self: Arc<Server>,
 517        request: TypedEnvelope<proto::OpenBuffer>,
 518    ) -> tide::Result<()> {
 519        let receipt = request.receipt();
 520        let host_connection_id = self
 521            .state()
 522            .read_project(request.payload.project_id, request.sender_id)
 523            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 524            .host_connection_id;
 525        let response = self
 526            .peer
 527            .forward_request(request.sender_id, host_connection_id, request.payload)
 528            .await?;
 529        self.peer.respond(receipt, response).await?;
 530        Ok(())
 531    }
 532
 533    async fn close_buffer(
 534        self: Arc<Server>,
 535        request: TypedEnvelope<proto::CloseBuffer>,
 536    ) -> tide::Result<()> {
 537        let host_connection_id = self
 538            .state()
 539            .read_project(request.payload.project_id, request.sender_id)
 540            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 541            .host_connection_id;
 542        self.peer
 543            .forward_send(request.sender_id, host_connection_id, request.payload)
 544            .await?;
 545        Ok(())
 546    }
 547
 548    async fn save_buffer(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<proto::SaveBuffer>,
 551    ) -> tide::Result<()> {
 552        let host;
 553        let guests;
 554        {
 555            let state = self.state();
 556            let project = state
 557                .read_project(request.payload.project_id, request.sender_id)
 558                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 559            host = project.host_connection_id;
 560            guests = project.guest_connection_ids()
 561        }
 562
 563        let sender = request.sender_id;
 564        let receipt = request.receipt();
 565        let response = self
 566            .peer
 567            .forward_request(sender, host, request.payload.clone())
 568            .await?;
 569
 570        broadcast(host, guests, |conn_id| {
 571            let response = response.clone();
 572            let peer = &self.peer;
 573            async move {
 574                if conn_id == sender {
 575                    peer.respond(receipt, response).await
 576                } else {
 577                    peer.forward_send(host, conn_id, response).await
 578                }
 579            }
 580        })
 581        .await?;
 582
 583        Ok(())
 584    }
 585
 586    async fn update_buffer(
 587        self: Arc<Server>,
 588        request: TypedEnvelope<proto::UpdateBuffer>,
 589    ) -> tide::Result<()> {
 590        let receiver_ids = self
 591            .state()
 592            .project_connection_ids(request.payload.project_id, request.sender_id)
 593            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 594        broadcast(request.sender_id, receiver_ids, |connection_id| {
 595            self.peer
 596                .forward_send(request.sender_id, connection_id, request.payload.clone())
 597        })
 598        .await?;
 599        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 600        Ok(())
 601    }
 602
 603    async fn buffer_saved(
 604        self: Arc<Server>,
 605        request: TypedEnvelope<proto::BufferSaved>,
 606    ) -> tide::Result<()> {
 607        let receiver_ids = self
 608            .state()
 609            .project_connection_ids(request.payload.project_id, request.sender_id)
 610            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 611        broadcast(request.sender_id, receiver_ids, |connection_id| {
 612            self.peer
 613                .forward_send(request.sender_id, connection_id, request.payload.clone())
 614        })
 615        .await?;
 616        Ok(())
 617    }
 618
 619    async fn get_channels(
 620        self: Arc<Server>,
 621        request: TypedEnvelope<proto::GetChannels>,
 622    ) -> tide::Result<()> {
 623        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 624        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 625        self.peer
 626            .respond(
 627                request.receipt(),
 628                proto::GetChannelsResponse {
 629                    channels: channels
 630                        .into_iter()
 631                        .map(|chan| proto::Channel {
 632                            id: chan.id.to_proto(),
 633                            name: chan.name,
 634                        })
 635                        .collect(),
 636                },
 637            )
 638            .await?;
 639        Ok(())
 640    }
 641
 642    async fn get_users(
 643        self: Arc<Server>,
 644        request: TypedEnvelope<proto::GetUsers>,
 645    ) -> tide::Result<()> {
 646        let receipt = request.receipt();
 647        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 648        let users = self
 649            .app_state
 650            .db
 651            .get_users_by_ids(user_ids)
 652            .await?
 653            .into_iter()
 654            .map(|user| proto::User {
 655                id: user.id.to_proto(),
 656                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 657                github_login: user.github_login,
 658            })
 659            .collect();
 660        self.peer
 661            .respond(receipt, proto::GetUsersResponse { users })
 662            .await?;
 663        Ok(())
 664    }
 665
 666    async fn update_contacts_for_users<'a>(
 667        self: &Arc<Server>,
 668        user_ids: impl IntoIterator<Item = &'a UserId>,
 669    ) -> tide::Result<()> {
 670        let mut send_futures = Vec::new();
 671
 672        {
 673            let state = self.state();
 674            for user_id in user_ids {
 675                let contacts = state.contacts_for_user(*user_id);
 676                for connection_id in state.connection_ids_for_user(*user_id) {
 677                    send_futures.push(self.peer.send(
 678                        connection_id,
 679                        proto::UpdateContacts {
 680                            contacts: contacts.clone(),
 681                        },
 682                    ));
 683                }
 684            }
 685        }
 686        futures::future::try_join_all(send_futures).await?;
 687
 688        Ok(())
 689    }
 690
 691    async fn join_channel(
 692        mut self: Arc<Self>,
 693        request: TypedEnvelope<proto::JoinChannel>,
 694    ) -> tide::Result<()> {
 695        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 696        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 697        if !self
 698            .app_state
 699            .db
 700            .can_user_access_channel(user_id, channel_id)
 701            .await?
 702        {
 703            Err(anyhow!("access denied"))?;
 704        }
 705
 706        self.state_mut().join_channel(request.sender_id, channel_id);
 707        let messages = self
 708            .app_state
 709            .db
 710            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 711            .await?
 712            .into_iter()
 713            .map(|msg| proto::ChannelMessage {
 714                id: msg.id.to_proto(),
 715                body: msg.body,
 716                timestamp: msg.sent_at.unix_timestamp() as u64,
 717                sender_id: msg.sender_id.to_proto(),
 718                nonce: Some(msg.nonce.as_u128().into()),
 719            })
 720            .collect::<Vec<_>>();
 721        self.peer
 722            .respond(
 723                request.receipt(),
 724                proto::JoinChannelResponse {
 725                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 726                    messages,
 727                },
 728            )
 729            .await?;
 730        Ok(())
 731    }
 732
 733    async fn leave_channel(
 734        mut self: Arc<Self>,
 735        request: TypedEnvelope<proto::LeaveChannel>,
 736    ) -> tide::Result<()> {
 737        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 738        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 739        if !self
 740            .app_state
 741            .db
 742            .can_user_access_channel(user_id, channel_id)
 743            .await?
 744        {
 745            Err(anyhow!("access denied"))?;
 746        }
 747
 748        self.state_mut()
 749            .leave_channel(request.sender_id, channel_id);
 750
 751        Ok(())
 752    }
 753
 754    async fn send_channel_message(
 755        self: Arc<Self>,
 756        request: TypedEnvelope<proto::SendChannelMessage>,
 757    ) -> tide::Result<()> {
 758        let receipt = request.receipt();
 759        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 760        let user_id;
 761        let connection_ids;
 762        {
 763            let state = self.state();
 764            user_id = state.user_id_for_connection(request.sender_id)?;
 765            if let Some(ids) = state.channel_connection_ids(channel_id) {
 766                connection_ids = ids;
 767            } else {
 768                return Ok(());
 769            }
 770        }
 771
 772        // Validate the message body.
 773        let body = request.payload.body.trim().to_string();
 774        if body.len() > MAX_MESSAGE_LEN {
 775            self.peer
 776                .respond_with_error(
 777                    receipt,
 778                    proto::Error {
 779                        message: "message is too long".to_string(),
 780                    },
 781                )
 782                .await?;
 783            return Ok(());
 784        }
 785        if body.is_empty() {
 786            self.peer
 787                .respond_with_error(
 788                    receipt,
 789                    proto::Error {
 790                        message: "message can't be blank".to_string(),
 791                    },
 792                )
 793                .await?;
 794            return Ok(());
 795        }
 796
 797        let timestamp = OffsetDateTime::now_utc();
 798        let nonce = if let Some(nonce) = request.payload.nonce {
 799            nonce
 800        } else {
 801            self.peer
 802                .respond_with_error(
 803                    receipt,
 804                    proto::Error {
 805                        message: "nonce can't be blank".to_string(),
 806                    },
 807                )
 808                .await?;
 809            return Ok(());
 810        };
 811
 812        let message_id = self
 813            .app_state
 814            .db
 815            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 816            .await?
 817            .to_proto();
 818        let message = proto::ChannelMessage {
 819            sender_id: user_id.to_proto(),
 820            id: message_id,
 821            body,
 822            timestamp: timestamp.unix_timestamp() as u64,
 823            nonce: Some(nonce),
 824        };
 825        broadcast(request.sender_id, connection_ids, |conn_id| {
 826            self.peer.send(
 827                conn_id,
 828                proto::ChannelMessageSent {
 829                    channel_id: channel_id.to_proto(),
 830                    message: Some(message.clone()),
 831                },
 832            )
 833        })
 834        .await?;
 835        self.peer
 836            .respond(
 837                receipt,
 838                proto::SendChannelMessageResponse {
 839                    message: Some(message),
 840                },
 841            )
 842            .await?;
 843        Ok(())
 844    }
 845
 846    async fn get_channel_messages(
 847        self: Arc<Self>,
 848        request: TypedEnvelope<proto::GetChannelMessages>,
 849    ) -> tide::Result<()> {
 850        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 851        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 852        if !self
 853            .app_state
 854            .db
 855            .can_user_access_channel(user_id, channel_id)
 856            .await?
 857        {
 858            Err(anyhow!("access denied"))?;
 859        }
 860
 861        let messages = self
 862            .app_state
 863            .db
 864            .get_channel_messages(
 865                channel_id,
 866                MESSAGE_COUNT_PER_PAGE,
 867                Some(MessageId::from_proto(request.payload.before_message_id)),
 868            )
 869            .await?
 870            .into_iter()
 871            .map(|msg| proto::ChannelMessage {
 872                id: msg.id.to_proto(),
 873                body: msg.body,
 874                timestamp: msg.sent_at.unix_timestamp() as u64,
 875                sender_id: msg.sender_id.to_proto(),
 876                nonce: Some(msg.nonce.as_u128().into()),
 877            })
 878            .collect::<Vec<_>>();
 879        self.peer
 880            .respond(
 881                request.receipt(),
 882                proto::GetChannelMessagesResponse {
 883                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 884                    messages,
 885                },
 886            )
 887            .await?;
 888        Ok(())
 889    }
 890
 891    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 892        self.store.read()
 893    }
 894
 895    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 896        self.store.write()
 897    }
 898}
 899
 900pub async fn broadcast<F, T>(
 901    sender_id: ConnectionId,
 902    receiver_ids: Vec<ConnectionId>,
 903    mut f: F,
 904) -> anyhow::Result<()>
 905where
 906    F: FnMut(ConnectionId) -> T,
 907    T: Future<Output = anyhow::Result<()>>,
 908{
 909    let futures = receiver_ids
 910        .into_iter()
 911        .filter(|id| *id != sender_id)
 912        .map(|id| f(id));
 913    futures::future::try_join_all(futures).await?;
 914    Ok(())
 915}
 916
 917pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 918    let server = Server::new(app.state().clone(), rpc.clone(), None);
 919    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 920        let server = server.clone();
 921        async move {
 922            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 923
 924            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 925            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 926            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 927            let client_protocol_version: Option<u32> = request
 928                .header("X-Zed-Protocol-Version")
 929                .and_then(|v| v.as_str().parse().ok());
 930
 931            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 932                return Ok(Response::new(StatusCode::UpgradeRequired));
 933            }
 934
 935            let header = match request.header("Sec-Websocket-Key") {
 936                Some(h) => h.as_str(),
 937                None => return Err(anyhow!("expected sec-websocket-key"))?,
 938            };
 939
 940            let user_id = process_auth_header(&request).await?;
 941
 942            let mut response = Response::new(StatusCode::SwitchingProtocols);
 943            response.insert_header(UPGRADE, "websocket");
 944            response.insert_header(CONNECTION, "Upgrade");
 945            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 946            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 947            response.insert_header("Sec-Websocket-Version", "13");
 948
 949            let http_res: &mut tide::http::Response = response.as_mut();
 950            let upgrade_receiver = http_res.recv_upgrade().await;
 951            let addr = request.remote().unwrap_or("unknown").to_string();
 952            task::spawn(async move {
 953                if let Some(stream) = upgrade_receiver.await {
 954                    server
 955                        .handle_connection(
 956                            Connection::new(
 957                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 958                            ),
 959                            addr,
 960                            user_id,
 961                            None,
 962                        )
 963                        .await;
 964                }
 965            });
 966
 967            Ok(response)
 968        }
 969    });
 970}
 971
 972fn header_contains_ignore_case<T>(
 973    request: &tide::Request<T>,
 974    header_name: HeaderName,
 975    value: &str,
 976) -> bool {
 977    request
 978        .header(header_name)
 979        .map(|h| {
 980            h.as_str()
 981                .split(',')
 982                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 983        })
 984        .unwrap_or(false)
 985}
 986
 987#[cfg(test)]
 988mod tests {
 989    use super::*;
 990    use crate::{
 991        auth,
 992        db::{tests::TestDb, UserId},
 993        github, AppState, Config,
 994    };
 995    use ::rpc::Peer;
 996    use async_std::task;
 997    use gpui::{ModelHandle, TestAppContext};
 998    use parking_lot::Mutex;
 999    use postage::{mpsc, watch};
1000    use rpc::PeerId;
1001    use serde_json::json;
1002    use sqlx::types::time::OffsetDateTime;
1003    use std::{
1004        ops::Deref,
1005        path::Path,
1006        sync::{
1007            atomic::{AtomicBool, Ordering::SeqCst},
1008            Arc,
1009        },
1010        time::Duration,
1011    };
1012    use zed::{
1013        client::{
1014            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1015            EstablishConnectionError, UserStore,
1016        },
1017        editor::{Editor, EditorSettings, Input, MultiBuffer},
1018        fs::{FakeFs, Fs as _},
1019        language::{
1020            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1021            LanguageRegistry, LanguageServerConfig, Point,
1022        },
1023        lsp,
1024        project::Project,
1025    };
1026
1027    #[gpui::test]
1028    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1029        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1030        let lang_registry = Arc::new(LanguageRegistry::new());
1031        let fs = Arc::new(FakeFs::new());
1032        cx_a.foreground().forbid_parking();
1033
1034        // Connect to a server as 2 clients.
1035        let mut server = TestServer::start().await;
1036        let client_a = server.create_client(&mut cx_a, "user_a").await;
1037        let client_b = server.create_client(&mut cx_b, "user_b").await;
1038
1039        // Share a project as client A
1040        fs.insert_tree(
1041            "/a",
1042            json!({
1043                ".zed.toml": r#"collaborators = ["user_b"]"#,
1044                "a.txt": "a-contents",
1045                "b.txt": "b-contents",
1046            }),
1047        )
1048        .await;
1049        let project_a = cx_a.update(|cx| {
1050            Project::local(
1051                client_a.clone(),
1052                client_a.user_store.clone(),
1053                lang_registry.clone(),
1054                fs.clone(),
1055                cx,
1056            )
1057        });
1058        let worktree_a = project_a
1059            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1060            .await
1061            .unwrap();
1062        worktree_a
1063            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1064            .await;
1065        let project_id = project_a
1066            .update(&mut cx_a, |project, _| project.next_remote_id())
1067            .await;
1068        project_a
1069            .update(&mut cx_a, |project, cx| project.share(cx))
1070            .await
1071            .unwrap();
1072
1073        // Join that project as client B
1074        let project_b = Project::remote(
1075            project_id,
1076            client_b.clone(),
1077            client_b.user_store.clone(),
1078            lang_registry.clone(),
1079            fs.clone(),
1080            &mut cx_b.to_async(),
1081        )
1082        .await
1083        .unwrap();
1084        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1085
1086        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1087            assert_eq!(
1088                project
1089                    .collaborators()
1090                    .get(&client_a.peer_id)
1091                    .unwrap()
1092                    .user
1093                    .github_login,
1094                "user_a"
1095            );
1096            project.replica_id()
1097        });
1098        project_a
1099            .condition(&cx_a, |tree, _| {
1100                tree.collaborators()
1101                    .get(&client_b.peer_id)
1102                    .map_or(false, |collaborator| {
1103                        collaborator.replica_id == replica_id_b
1104                            && collaborator.user.github_login == "user_b"
1105                    })
1106            })
1107            .await;
1108
1109        // Open the same file as client B and client A.
1110        let buffer_b = worktree_b
1111            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1112            .await
1113            .unwrap();
1114        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1115        buffer_b.read_with(&cx_b, |buf, cx| {
1116            assert_eq!(buf.read(cx).text(), "b-contents")
1117        });
1118        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1119        let buffer_a = worktree_a
1120            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1121            .await
1122            .unwrap();
1123
1124        let editor_b = cx_b.add_view(window_b, |cx| {
1125            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1126        });
1127        // TODO
1128        // // Create a selection set as client B and see that selection set as client A.
1129        // buffer_a
1130        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1131        //     .await;
1132
1133        // Edit the buffer as client B and see that edit as client A.
1134        editor_b.update(&mut cx_b, |editor, cx| {
1135            editor.handle_input(&Input("ok, ".into()), cx)
1136        });
1137        buffer_a
1138            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1139            .await;
1140
1141        // TODO
1142        // // Remove the selection set as client B, see those selections disappear as client A.
1143        cx_b.update(move |_| drop(editor_b));
1144        // buffer_a
1145        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1146        //     .await;
1147
1148        // Close the buffer as client A, see that the buffer is closed.
1149        cx_a.update(move |_| drop(buffer_a));
1150        worktree_a
1151            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1152            .await;
1153
1154        // Dropping the client B's project removes client B from client A's collaborators.
1155        cx_b.update(move |_| drop(project_b));
1156        project_a
1157            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1158            .await;
1159    }
1160
1161    #[gpui::test]
1162    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1163        cx_b.update(zed::contacts_panel::init);
1164        let lang_registry = Arc::new(LanguageRegistry::new());
1165        let fs = Arc::new(FakeFs::new());
1166        cx_a.foreground().forbid_parking();
1167
1168        // Connect to a server as 2 clients.
1169        let mut server = TestServer::start().await;
1170        let client_a = server.create_client(&mut cx_a, "user_a").await;
1171        let client_b = server.create_client(&mut cx_b, "user_b").await;
1172
1173        // Share a project as client A
1174        fs.insert_tree(
1175            "/a",
1176            json!({
1177                ".zed.toml": r#"collaborators = ["user_b"]"#,
1178                "a.txt": "a-contents",
1179                "b.txt": "b-contents",
1180            }),
1181        )
1182        .await;
1183        let project_a = cx_a.update(|cx| {
1184            Project::local(
1185                client_a.clone(),
1186                client_a.user_store.clone(),
1187                lang_registry.clone(),
1188                fs.clone(),
1189                cx,
1190            )
1191        });
1192        let worktree_a = project_a
1193            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1194            .await
1195            .unwrap();
1196        worktree_a
1197            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1198            .await;
1199        let project_id = project_a
1200            .update(&mut cx_a, |project, _| project.next_remote_id())
1201            .await;
1202        project_a
1203            .update(&mut cx_a, |project, cx| project.share(cx))
1204            .await
1205            .unwrap();
1206
1207        // Join that project as client B
1208        let project_b = Project::remote(
1209            project_id,
1210            client_b.clone(),
1211            client_b.user_store.clone(),
1212            lang_registry.clone(),
1213            fs.clone(),
1214            &mut cx_b.to_async(),
1215        )
1216        .await
1217        .unwrap();
1218
1219        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1220        worktree_b
1221            .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1222            .await
1223            .unwrap();
1224
1225        project_a
1226            .update(&mut cx_a, |project, cx| project.unshare(cx))
1227            .await
1228            .unwrap();
1229        project_b
1230            .condition(&mut cx_b, |project, _| project.is_read_only())
1231            .await;
1232    }
1233
1234    #[gpui::test]
1235    async fn test_propagate_saves_and_fs_changes(
1236        mut cx_a: TestAppContext,
1237        mut cx_b: TestAppContext,
1238        mut cx_c: TestAppContext,
1239    ) {
1240        let lang_registry = Arc::new(LanguageRegistry::new());
1241        let fs = Arc::new(FakeFs::new());
1242        cx_a.foreground().forbid_parking();
1243
1244        // Connect to a server as 3 clients.
1245        let mut server = TestServer::start().await;
1246        let client_a = server.create_client(&mut cx_a, "user_a").await;
1247        let client_b = server.create_client(&mut cx_b, "user_b").await;
1248        let client_c = server.create_client(&mut cx_c, "user_c").await;
1249
1250        // Share a worktree as client A.
1251        fs.insert_tree(
1252            "/a",
1253            json!({
1254                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1255                "file1": "",
1256                "file2": ""
1257            }),
1258        )
1259        .await;
1260        let project_a = cx_a.update(|cx| {
1261            Project::local(
1262                client_a.clone(),
1263                client_a.user_store.clone(),
1264                lang_registry.clone(),
1265                fs.clone(),
1266                cx,
1267            )
1268        });
1269        let worktree_a = project_a
1270            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1271            .await
1272            .unwrap();
1273        worktree_a
1274            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1275            .await;
1276        let project_id = project_a
1277            .update(&mut cx_a, |project, _| project.next_remote_id())
1278            .await;
1279        project_a
1280            .update(&mut cx_a, |project, cx| project.share(cx))
1281            .await
1282            .unwrap();
1283
1284        // Join that worktree as clients B and C.
1285        let project_b = Project::remote(
1286            project_id,
1287            client_b.clone(),
1288            client_b.user_store.clone(),
1289            lang_registry.clone(),
1290            fs.clone(),
1291            &mut cx_b.to_async(),
1292        )
1293        .await
1294        .unwrap();
1295        let project_c = Project::remote(
1296            project_id,
1297            client_c.clone(),
1298            client_c.user_store.clone(),
1299            lang_registry.clone(),
1300            fs.clone(),
1301            &mut cx_c.to_async(),
1302        )
1303        .await
1304        .unwrap();
1305
1306        // Open and edit a buffer as both guests B and C.
1307        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1308        let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1309        let buffer_b = worktree_b
1310            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1311            .await
1312            .unwrap();
1313        let buffer_c = worktree_c
1314            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1315            .await
1316            .unwrap();
1317        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1318        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1319
1320        // Open and edit that buffer as the host.
1321        let buffer_a = worktree_a
1322            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1323            .await
1324            .unwrap();
1325
1326        buffer_a
1327            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1328            .await;
1329        buffer_a.update(&mut cx_a, |buf, cx| {
1330            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1331        });
1332
1333        // Wait for edits to propagate
1334        buffer_a
1335            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1336            .await;
1337        buffer_b
1338            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1339            .await;
1340        buffer_c
1341            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1342            .await;
1343
1344        // Edit the buffer as the host and concurrently save as guest B.
1345        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1346        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1347        save_b.await.unwrap();
1348        assert_eq!(
1349            fs.load("/a/file1".as_ref()).await.unwrap(),
1350            "hi-a, i-am-c, i-am-b, i-am-a"
1351        );
1352        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1353        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1354        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1355
1356        // Make changes on host's file system, see those changes on the guests.
1357        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1358            .await
1359            .unwrap();
1360        fs.insert_file(Path::new("/a/file4"), "4".into())
1361            .await
1362            .unwrap();
1363
1364        worktree_b
1365            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1366            .await;
1367        worktree_c
1368            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1369            .await;
1370        worktree_b.read_with(&cx_b, |tree, _| {
1371            assert_eq!(
1372                tree.paths()
1373                    .map(|p| p.to_string_lossy())
1374                    .collect::<Vec<_>>(),
1375                &[".zed.toml", "file1", "file3", "file4"]
1376            )
1377        });
1378        worktree_c.read_with(&cx_c, |tree, _| {
1379            assert_eq!(
1380                tree.paths()
1381                    .map(|p| p.to_string_lossy())
1382                    .collect::<Vec<_>>(),
1383                &[".zed.toml", "file1", "file3", "file4"]
1384            )
1385        });
1386    }
1387
1388    #[gpui::test]
1389    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1390        cx_a.foreground().forbid_parking();
1391        let lang_registry = Arc::new(LanguageRegistry::new());
1392        let fs = Arc::new(FakeFs::new());
1393
1394        // Connect to a server as 2 clients.
1395        let mut server = TestServer::start().await;
1396        let client_a = server.create_client(&mut cx_a, "user_a").await;
1397        let client_b = server.create_client(&mut cx_b, "user_b").await;
1398
1399        // Share a project as client A
1400        fs.insert_tree(
1401            "/dir",
1402            json!({
1403                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1404                "a.txt": "a-contents",
1405            }),
1406        )
1407        .await;
1408
1409        let project_a = cx_a.update(|cx| {
1410            Project::local(
1411                client_a.clone(),
1412                client_a.user_store.clone(),
1413                lang_registry.clone(),
1414                fs.clone(),
1415                cx,
1416            )
1417        });
1418        let worktree_a = project_a
1419            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1420            .await
1421            .unwrap();
1422        worktree_a
1423            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1424            .await;
1425        let project_id = project_a
1426            .update(&mut cx_a, |project, _| project.next_remote_id())
1427            .await;
1428        project_a
1429            .update(&mut cx_a, |project, cx| project.share(cx))
1430            .await
1431            .unwrap();
1432
1433        // Join that project as client B
1434        let project_b = Project::remote(
1435            project_id,
1436            client_b.clone(),
1437            client_b.user_store.clone(),
1438            lang_registry.clone(),
1439            fs.clone(),
1440            &mut cx_b.to_async(),
1441        )
1442        .await
1443        .unwrap();
1444        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1445
1446        // Open a buffer as client B
1447        let buffer_b = worktree_b
1448            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1449            .await
1450            .unwrap();
1451        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1452
1453        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1454        buffer_b.read_with(&cx_b, |buf, _| {
1455            assert!(buf.is_dirty());
1456            assert!(!buf.has_conflict());
1457        });
1458
1459        buffer_b
1460            .update(&mut cx_b, |buf, cx| buf.save(cx))
1461            .unwrap()
1462            .await
1463            .unwrap();
1464        worktree_b
1465            .condition(&cx_b, |_, cx| {
1466                buffer_b.read(cx).file().unwrap().mtime() != mtime
1467            })
1468            .await;
1469        buffer_b.read_with(&cx_b, |buf, _| {
1470            assert!(!buf.is_dirty());
1471            assert!(!buf.has_conflict());
1472        });
1473
1474        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1475        buffer_b.read_with(&cx_b, |buf, _| {
1476            assert!(buf.is_dirty());
1477            assert!(!buf.has_conflict());
1478        });
1479    }
1480
1481    #[gpui::test]
1482    async fn test_editing_while_guest_opens_buffer(
1483        mut cx_a: TestAppContext,
1484        mut cx_b: TestAppContext,
1485    ) {
1486        cx_a.foreground().forbid_parking();
1487        let lang_registry = Arc::new(LanguageRegistry::new());
1488        let fs = Arc::new(FakeFs::new());
1489
1490        // Connect to a server as 2 clients.
1491        let mut server = TestServer::start().await;
1492        let client_a = server.create_client(&mut cx_a, "user_a").await;
1493        let client_b = server.create_client(&mut cx_b, "user_b").await;
1494
1495        // Share a project as client A
1496        fs.insert_tree(
1497            "/dir",
1498            json!({
1499                ".zed.toml": r#"collaborators = ["user_b"]"#,
1500                "a.txt": "a-contents",
1501            }),
1502        )
1503        .await;
1504        let project_a = cx_a.update(|cx| {
1505            Project::local(
1506                client_a.clone(),
1507                client_a.user_store.clone(),
1508                lang_registry.clone(),
1509                fs.clone(),
1510                cx,
1511            )
1512        });
1513        let worktree_a = project_a
1514            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1515            .await
1516            .unwrap();
1517        worktree_a
1518            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1519            .await;
1520        let project_id = project_a
1521            .update(&mut cx_a, |project, _| project.next_remote_id())
1522            .await;
1523        project_a
1524            .update(&mut cx_a, |project, cx| project.share(cx))
1525            .await
1526            .unwrap();
1527
1528        // Join that project as client B
1529        let project_b = Project::remote(
1530            project_id,
1531            client_b.clone(),
1532            client_b.user_store.clone(),
1533            lang_registry.clone(),
1534            fs.clone(),
1535            &mut cx_b.to_async(),
1536        )
1537        .await
1538        .unwrap();
1539        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1540
1541        // Open a buffer as client A
1542        let buffer_a = worktree_a
1543            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1544            .await
1545            .unwrap();
1546
1547        // Start opening the same buffer as client B
1548        let buffer_b = cx_b
1549            .background()
1550            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1551        task::yield_now().await;
1552
1553        // Edit the buffer as client A while client B is still opening it.
1554        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1555
1556        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1557        let buffer_b = buffer_b.await.unwrap();
1558        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1559    }
1560
1561    #[gpui::test]
1562    async fn test_leaving_worktree_while_opening_buffer(
1563        mut cx_a: TestAppContext,
1564        mut cx_b: TestAppContext,
1565    ) {
1566        cx_a.foreground().forbid_parking();
1567        let lang_registry = Arc::new(LanguageRegistry::new());
1568        let fs = Arc::new(FakeFs::new());
1569
1570        // Connect to a server as 2 clients.
1571        let mut server = TestServer::start().await;
1572        let client_a = server.create_client(&mut cx_a, "user_a").await;
1573        let client_b = server.create_client(&mut cx_b, "user_b").await;
1574
1575        // Share a project as client A
1576        fs.insert_tree(
1577            "/dir",
1578            json!({
1579                ".zed.toml": r#"collaborators = ["user_b"]"#,
1580                "a.txt": "a-contents",
1581            }),
1582        )
1583        .await;
1584        let project_a = cx_a.update(|cx| {
1585            Project::local(
1586                client_a.clone(),
1587                client_a.user_store.clone(),
1588                lang_registry.clone(),
1589                fs.clone(),
1590                cx,
1591            )
1592        });
1593        let worktree_a = project_a
1594            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1595            .await
1596            .unwrap();
1597        worktree_a
1598            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1599            .await;
1600        let project_id = project_a
1601            .update(&mut cx_a, |project, _| project.next_remote_id())
1602            .await;
1603        project_a
1604            .update(&mut cx_a, |project, cx| project.share(cx))
1605            .await
1606            .unwrap();
1607
1608        // Join that project as client B
1609        let project_b = Project::remote(
1610            project_id,
1611            client_b.clone(),
1612            client_b.user_store.clone(),
1613            lang_registry.clone(),
1614            fs.clone(),
1615            &mut cx_b.to_async(),
1616        )
1617        .await
1618        .unwrap();
1619        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1620
1621        // See that a guest has joined as client A.
1622        project_a
1623            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1624            .await;
1625
1626        // Begin opening a buffer as client B, but leave the project before the open completes.
1627        let buffer_b = cx_b
1628            .background()
1629            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1630        cx_b.update(|_| drop(worktree_b));
1631        drop(buffer_b);
1632
1633        // See that the guest has left.
1634        project_a
1635            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1636            .await;
1637    }
1638
1639    #[gpui::test]
1640    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1641        cx_a.foreground().forbid_parking();
1642        let lang_registry = Arc::new(LanguageRegistry::new());
1643        let fs = Arc::new(FakeFs::new());
1644
1645        // Connect to a server as 2 clients.
1646        let mut server = TestServer::start().await;
1647        let client_a = server.create_client(&mut cx_a, "user_a").await;
1648        let client_b = server.create_client(&mut cx_b, "user_b").await;
1649
1650        // Share a project as client A
1651        fs.insert_tree(
1652            "/a",
1653            json!({
1654                ".zed.toml": r#"collaborators = ["user_b"]"#,
1655                "a.txt": "a-contents",
1656                "b.txt": "b-contents",
1657            }),
1658        )
1659        .await;
1660        let project_a = cx_a.update(|cx| {
1661            Project::local(
1662                client_a.clone(),
1663                client_a.user_store.clone(),
1664                lang_registry.clone(),
1665                fs.clone(),
1666                cx,
1667            )
1668        });
1669        let worktree_a = project_a
1670            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1671            .await
1672            .unwrap();
1673        worktree_a
1674            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675            .await;
1676        let project_id = project_a
1677            .update(&mut cx_a, |project, _| project.next_remote_id())
1678            .await;
1679        project_a
1680            .update(&mut cx_a, |project, cx| project.share(cx))
1681            .await
1682            .unwrap();
1683
1684        // Join that project as client B
1685        let _project_b = Project::remote(
1686            project_id,
1687            client_b.clone(),
1688            client_b.user_store.clone(),
1689            lang_registry.clone(),
1690            fs.clone(),
1691            &mut cx_b.to_async(),
1692        )
1693        .await
1694        .unwrap();
1695
1696        // See that a guest has joined as client A.
1697        project_a
1698            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1699            .await;
1700
1701        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1702        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1703        project_a
1704            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1705            .await;
1706    }
1707
1708    #[gpui::test]
1709    async fn test_collaborating_with_diagnostics(
1710        mut cx_a: TestAppContext,
1711        mut cx_b: TestAppContext,
1712    ) {
1713        cx_a.foreground().forbid_parking();
1714        let mut lang_registry = Arc::new(LanguageRegistry::new());
1715        let fs = Arc::new(FakeFs::new());
1716
1717        // Set up a fake language server.
1718        let (language_server_config, mut fake_language_server) =
1719            LanguageServerConfig::fake(cx_a.background()).await;
1720        Arc::get_mut(&mut lang_registry)
1721            .unwrap()
1722            .add(Arc::new(Language::new(
1723                LanguageConfig {
1724                    name: "Rust".to_string(),
1725                    path_suffixes: vec!["rs".to_string()],
1726                    language_server: Some(language_server_config),
1727                    ..Default::default()
1728                },
1729                Some(tree_sitter_rust::language()),
1730            )));
1731
1732        // Connect to a server as 2 clients.
1733        let mut server = TestServer::start().await;
1734        let client_a = server.create_client(&mut cx_a, "user_a").await;
1735        let client_b = server.create_client(&mut cx_b, "user_b").await;
1736
1737        // Share a project as client A
1738        fs.insert_tree(
1739            "/a",
1740            json!({
1741                ".zed.toml": r#"collaborators = ["user_b"]"#,
1742                "a.rs": "let one = two",
1743                "other.rs": "",
1744            }),
1745        )
1746        .await;
1747        let project_a = cx_a.update(|cx| {
1748            Project::local(
1749                client_a.clone(),
1750                client_a.user_store.clone(),
1751                lang_registry.clone(),
1752                fs.clone(),
1753                cx,
1754            )
1755        });
1756        let worktree_a = project_a
1757            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1758            .await
1759            .unwrap();
1760        worktree_a
1761            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1762            .await;
1763        let project_id = project_a
1764            .update(&mut cx_a, |project, _| project.next_remote_id())
1765            .await;
1766        project_a
1767            .update(&mut cx_a, |project, cx| project.share(cx))
1768            .await
1769            .unwrap();
1770
1771        // Cause the language server to start.
1772        let _ = cx_a
1773            .background()
1774            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1775                worktree.open_buffer("other.rs", cx)
1776            }))
1777            .await
1778            .unwrap();
1779
1780        // Simulate a language server reporting errors for a file.
1781        fake_language_server
1782            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1783                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1784                version: None,
1785                diagnostics: vec![
1786                    lsp::Diagnostic {
1787                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1788                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1789                        message: "message 1".to_string(),
1790                        ..Default::default()
1791                    },
1792                    lsp::Diagnostic {
1793                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1794                        range: lsp::Range::new(
1795                            lsp::Position::new(0, 10),
1796                            lsp::Position::new(0, 13),
1797                        ),
1798                        message: "message 2".to_string(),
1799                        ..Default::default()
1800                    },
1801                ],
1802            })
1803            .await;
1804
1805        // Join the worktree as client B.
1806        let project_b = Project::remote(
1807            project_id,
1808            client_b.clone(),
1809            client_b.user_store.clone(),
1810            lang_registry.clone(),
1811            fs.clone(),
1812            &mut cx_b.to_async(),
1813        )
1814        .await
1815        .unwrap();
1816        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1817
1818        // Open the file with the errors.
1819        let buffer_b = cx_b
1820            .background()
1821            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1822            .await
1823            .unwrap();
1824
1825        buffer_b.read_with(&cx_b, |buffer, _| {
1826            assert_eq!(
1827                buffer
1828                    .snapshot()
1829                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1830                    .collect::<Vec<_>>(),
1831                &[
1832                    DiagnosticEntry {
1833                        range: Point::new(0, 4)..Point::new(0, 7),
1834                        diagnostic: Diagnostic {
1835                            group_id: 0,
1836                            message: "message 1".to_string(),
1837                            severity: lsp::DiagnosticSeverity::ERROR,
1838                            is_primary: true,
1839                            ..Default::default()
1840                        }
1841                    },
1842                    DiagnosticEntry {
1843                        range: Point::new(0, 10)..Point::new(0, 13),
1844                        diagnostic: Diagnostic {
1845                            group_id: 1,
1846                            severity: lsp::DiagnosticSeverity::WARNING,
1847                            message: "message 2".to_string(),
1848                            is_primary: true,
1849                            ..Default::default()
1850                        }
1851                    }
1852                ]
1853            );
1854        });
1855    }
1856
1857    #[gpui::test]
1858    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1859        cx_a.foreground().forbid_parking();
1860
1861        // Connect to a server as 2 clients.
1862        let mut server = TestServer::start().await;
1863        let client_a = server.create_client(&mut cx_a, "user_a").await;
1864        let client_b = server.create_client(&mut cx_b, "user_b").await;
1865
1866        // Create an org that includes these 2 users.
1867        let db = &server.app_state.db;
1868        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1869        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1870            .await
1871            .unwrap();
1872        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1873            .await
1874            .unwrap();
1875
1876        // Create a channel that includes all the users.
1877        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1878        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1879            .await
1880            .unwrap();
1881        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1882            .await
1883            .unwrap();
1884        db.create_channel_message(
1885            channel_id,
1886            client_b.current_user_id(&cx_b),
1887            "hello A, it's B.",
1888            OffsetDateTime::now_utc(),
1889            1,
1890        )
1891        .await
1892        .unwrap();
1893
1894        let channels_a = cx_a
1895            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1896        channels_a
1897            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1898            .await;
1899        channels_a.read_with(&cx_a, |list, _| {
1900            assert_eq!(
1901                list.available_channels().unwrap(),
1902                &[ChannelDetails {
1903                    id: channel_id.to_proto(),
1904                    name: "test-channel".to_string()
1905                }]
1906            )
1907        });
1908        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1909            this.get_channel(channel_id.to_proto(), cx).unwrap()
1910        });
1911        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1912        channel_a
1913            .condition(&cx_a, |channel, _| {
1914                channel_messages(channel)
1915                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1916            })
1917            .await;
1918
1919        let channels_b = cx_b
1920            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1921        channels_b
1922            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1923            .await;
1924        channels_b.read_with(&cx_b, |list, _| {
1925            assert_eq!(
1926                list.available_channels().unwrap(),
1927                &[ChannelDetails {
1928                    id: channel_id.to_proto(),
1929                    name: "test-channel".to_string()
1930                }]
1931            )
1932        });
1933
1934        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1935            this.get_channel(channel_id.to_proto(), cx).unwrap()
1936        });
1937        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1938        channel_b
1939            .condition(&cx_b, |channel, _| {
1940                channel_messages(channel)
1941                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1942            })
1943            .await;
1944
1945        channel_a
1946            .update(&mut cx_a, |channel, cx| {
1947                channel
1948                    .send_message("oh, hi B.".to_string(), cx)
1949                    .unwrap()
1950                    .detach();
1951                let task = channel.send_message("sup".to_string(), cx).unwrap();
1952                assert_eq!(
1953                    channel_messages(channel),
1954                    &[
1955                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1956                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
1957                        ("user_a".to_string(), "sup".to_string(), true)
1958                    ]
1959                );
1960                task
1961            })
1962            .await
1963            .unwrap();
1964
1965        channel_b
1966            .condition(&cx_b, |channel, _| {
1967                channel_messages(channel)
1968                    == [
1969                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1970                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
1971                        ("user_a".to_string(), "sup".to_string(), false),
1972                    ]
1973            })
1974            .await;
1975
1976        assert_eq!(
1977            server
1978                .state()
1979                .await
1980                .channel(channel_id)
1981                .unwrap()
1982                .connection_ids
1983                .len(),
1984            2
1985        );
1986        cx_b.update(|_| drop(channel_b));
1987        server
1988            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1989            .await;
1990
1991        cx_a.update(|_| drop(channel_a));
1992        server
1993            .condition(|state| state.channel(channel_id).is_none())
1994            .await;
1995    }
1996
1997    #[gpui::test]
1998    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1999        cx_a.foreground().forbid_parking();
2000
2001        let mut server = TestServer::start().await;
2002        let client_a = server.create_client(&mut cx_a, "user_a").await;
2003
2004        let db = &server.app_state.db;
2005        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2006        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2007        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2008            .await
2009            .unwrap();
2010        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2011            .await
2012            .unwrap();
2013
2014        let channels_a = cx_a
2015            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2016        channels_a
2017            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2018            .await;
2019        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2020            this.get_channel(channel_id.to_proto(), cx).unwrap()
2021        });
2022
2023        // Messages aren't allowed to be too long.
2024        channel_a
2025            .update(&mut cx_a, |channel, cx| {
2026                let long_body = "this is long.\n".repeat(1024);
2027                channel.send_message(long_body, cx).unwrap()
2028            })
2029            .await
2030            .unwrap_err();
2031
2032        // Messages aren't allowed to be blank.
2033        channel_a.update(&mut cx_a, |channel, cx| {
2034            channel.send_message(String::new(), cx).unwrap_err()
2035        });
2036
2037        // Leading and trailing whitespace are trimmed.
2038        channel_a
2039            .update(&mut cx_a, |channel, cx| {
2040                channel
2041                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2042                    .unwrap()
2043            })
2044            .await
2045            .unwrap();
2046        assert_eq!(
2047            db.get_channel_messages(channel_id, 10, None)
2048                .await
2049                .unwrap()
2050                .iter()
2051                .map(|m| &m.body)
2052                .collect::<Vec<_>>(),
2053            &["surrounded by whitespace"]
2054        );
2055    }
2056
2057    #[gpui::test]
2058    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2059        cx_a.foreground().forbid_parking();
2060
2061        // Connect to a server as 2 clients.
2062        let mut server = TestServer::start().await;
2063        let client_a = server.create_client(&mut cx_a, "user_a").await;
2064        let client_b = server.create_client(&mut cx_b, "user_b").await;
2065        let mut status_b = client_b.status();
2066
2067        // Create an org that includes these 2 users.
2068        let db = &server.app_state.db;
2069        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2070        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2071            .await
2072            .unwrap();
2073        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2074            .await
2075            .unwrap();
2076
2077        // Create a channel that includes all the users.
2078        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2079        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2080            .await
2081            .unwrap();
2082        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2083            .await
2084            .unwrap();
2085        db.create_channel_message(
2086            channel_id,
2087            client_b.current_user_id(&cx_b),
2088            "hello A, it's B.",
2089            OffsetDateTime::now_utc(),
2090            2,
2091        )
2092        .await
2093        .unwrap();
2094
2095        let channels_a = cx_a
2096            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2097        channels_a
2098            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2099            .await;
2100
2101        channels_a.read_with(&cx_a, |list, _| {
2102            assert_eq!(
2103                list.available_channels().unwrap(),
2104                &[ChannelDetails {
2105                    id: channel_id.to_proto(),
2106                    name: "test-channel".to_string()
2107                }]
2108            )
2109        });
2110        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2111            this.get_channel(channel_id.to_proto(), cx).unwrap()
2112        });
2113        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2114        channel_a
2115            .condition(&cx_a, |channel, _| {
2116                channel_messages(channel)
2117                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2118            })
2119            .await;
2120
2121        let channels_b = cx_b
2122            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2123        channels_b
2124            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2125            .await;
2126        channels_b.read_with(&cx_b, |list, _| {
2127            assert_eq!(
2128                list.available_channels().unwrap(),
2129                &[ChannelDetails {
2130                    id: channel_id.to_proto(),
2131                    name: "test-channel".to_string()
2132                }]
2133            )
2134        });
2135
2136        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2137            this.get_channel(channel_id.to_proto(), cx).unwrap()
2138        });
2139        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2140        channel_b
2141            .condition(&cx_b, |channel, _| {
2142                channel_messages(channel)
2143                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2144            })
2145            .await;
2146
2147        // Disconnect client B, ensuring we can still access its cached channel data.
2148        server.forbid_connections();
2149        server.disconnect_client(client_b.current_user_id(&cx_b));
2150        while !matches!(
2151            status_b.recv().await,
2152            Some(client::Status::ReconnectionError { .. })
2153        ) {}
2154
2155        channels_b.read_with(&cx_b, |channels, _| {
2156            assert_eq!(
2157                channels.available_channels().unwrap(),
2158                [ChannelDetails {
2159                    id: channel_id.to_proto(),
2160                    name: "test-channel".to_string()
2161                }]
2162            )
2163        });
2164        channel_b.read_with(&cx_b, |channel, _| {
2165            assert_eq!(
2166                channel_messages(channel),
2167                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2168            )
2169        });
2170
2171        // Send a message from client B while it is disconnected.
2172        channel_b
2173            .update(&mut cx_b, |channel, cx| {
2174                let task = channel
2175                    .send_message("can you see this?".to_string(), cx)
2176                    .unwrap();
2177                assert_eq!(
2178                    channel_messages(channel),
2179                    &[
2180                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2181                        ("user_b".to_string(), "can you see this?".to_string(), true)
2182                    ]
2183                );
2184                task
2185            })
2186            .await
2187            .unwrap_err();
2188
2189        // Send a message from client A while B is disconnected.
2190        channel_a
2191            .update(&mut cx_a, |channel, cx| {
2192                channel
2193                    .send_message("oh, hi B.".to_string(), cx)
2194                    .unwrap()
2195                    .detach();
2196                let task = channel.send_message("sup".to_string(), cx).unwrap();
2197                assert_eq!(
2198                    channel_messages(channel),
2199                    &[
2200                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2201                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2202                        ("user_a".to_string(), "sup".to_string(), true)
2203                    ]
2204                );
2205                task
2206            })
2207            .await
2208            .unwrap();
2209
2210        // Give client B a chance to reconnect.
2211        server.allow_connections();
2212        cx_b.foreground().advance_clock(Duration::from_secs(10));
2213
2214        // Verify that B sees the new messages upon reconnection, as well as the message client B
2215        // sent while offline.
2216        channel_b
2217            .condition(&cx_b, |channel, _| {
2218                channel_messages(channel)
2219                    == [
2220                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2221                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2222                        ("user_a".to_string(), "sup".to_string(), false),
2223                        ("user_b".to_string(), "can you see this?".to_string(), false),
2224                    ]
2225            })
2226            .await;
2227
2228        // Ensure client A and B can communicate normally after reconnection.
2229        channel_a
2230            .update(&mut cx_a, |channel, cx| {
2231                channel.send_message("you online?".to_string(), cx).unwrap()
2232            })
2233            .await
2234            .unwrap();
2235        channel_b
2236            .condition(&cx_b, |channel, _| {
2237                channel_messages(channel)
2238                    == [
2239                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2240                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2241                        ("user_a".to_string(), "sup".to_string(), false),
2242                        ("user_b".to_string(), "can you see this?".to_string(), false),
2243                        ("user_a".to_string(), "you online?".to_string(), false),
2244                    ]
2245            })
2246            .await;
2247
2248        channel_b
2249            .update(&mut cx_b, |channel, cx| {
2250                channel.send_message("yep".to_string(), cx).unwrap()
2251            })
2252            .await
2253            .unwrap();
2254        channel_a
2255            .condition(&cx_a, |channel, _| {
2256                channel_messages(channel)
2257                    == [
2258                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2259                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2260                        ("user_a".to_string(), "sup".to_string(), false),
2261                        ("user_b".to_string(), "can you see this?".to_string(), false),
2262                        ("user_a".to_string(), "you online?".to_string(), false),
2263                        ("user_b".to_string(), "yep".to_string(), false),
2264                    ]
2265            })
2266            .await;
2267    }
2268
2269    #[gpui::test]
2270    async fn test_contacts(
2271        mut cx_a: TestAppContext,
2272        mut cx_b: TestAppContext,
2273        mut cx_c: TestAppContext,
2274    ) {
2275        cx_a.foreground().forbid_parking();
2276        let lang_registry = Arc::new(LanguageRegistry::new());
2277        let fs = Arc::new(FakeFs::new());
2278
2279        // Connect to a server as 3 clients.
2280        let mut server = TestServer::start().await;
2281        let client_a = server.create_client(&mut cx_a, "user_a").await;
2282        let client_b = server.create_client(&mut cx_b, "user_b").await;
2283        let client_c = server.create_client(&mut cx_c, "user_c").await;
2284
2285        // Share a worktree as client A.
2286        fs.insert_tree(
2287            "/a",
2288            json!({
2289                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2290            }),
2291        )
2292        .await;
2293
2294        let project_a = cx_a.update(|cx| {
2295            Project::local(
2296                client_a.clone(),
2297                client_a.user_store.clone(),
2298                lang_registry.clone(),
2299                fs.clone(),
2300                cx,
2301            )
2302        });
2303        let worktree_a = project_a
2304            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2305            .await
2306            .unwrap();
2307        worktree_a
2308            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2309            .await;
2310
2311        client_a
2312            .user_store
2313            .condition(&cx_a, |user_store, _| {
2314                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2315            })
2316            .await;
2317        client_b
2318            .user_store
2319            .condition(&cx_b, |user_store, _| {
2320                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2321            })
2322            .await;
2323        client_c
2324            .user_store
2325            .condition(&cx_c, |user_store, _| {
2326                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2327            })
2328            .await;
2329
2330        let project_id = project_a
2331            .update(&mut cx_a, |project, _| project.next_remote_id())
2332            .await;
2333        project_a
2334            .update(&mut cx_a, |project, cx| project.share(cx))
2335            .await
2336            .unwrap();
2337
2338        let _project_b = Project::remote(
2339            project_id,
2340            client_b.clone(),
2341            client_b.user_store.clone(),
2342            lang_registry.clone(),
2343            fs.clone(),
2344            &mut cx_b.to_async(),
2345        )
2346        .await
2347        .unwrap();
2348
2349        client_a
2350            .user_store
2351            .condition(&cx_a, |user_store, _| {
2352                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2353            })
2354            .await;
2355        client_b
2356            .user_store
2357            .condition(&cx_b, |user_store, _| {
2358                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2359            })
2360            .await;
2361        client_c
2362            .user_store
2363            .condition(&cx_c, |user_store, _| {
2364                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2365            })
2366            .await;
2367
2368        project_a
2369            .condition(&cx_a, |project, _| {
2370                project.collaborators().contains_key(&client_b.peer_id)
2371            })
2372            .await;
2373
2374        cx_a.update(move |_| drop(project_a));
2375        client_a
2376            .user_store
2377            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2378            .await;
2379        client_b
2380            .user_store
2381            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2382            .await;
2383        client_c
2384            .user_store
2385            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2386            .await;
2387
2388        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2389            user_store
2390                .contacts()
2391                .iter()
2392                .map(|contact| {
2393                    let worktrees = contact
2394                        .projects
2395                        .iter()
2396                        .map(|p| {
2397                            (
2398                                p.worktree_root_names[0].as_str(),
2399                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2400                            )
2401                        })
2402                        .collect();
2403                    (contact.user.github_login.as_str(), worktrees)
2404                })
2405                .collect()
2406        }
2407    }
2408
2409    struct TestServer {
2410        peer: Arc<Peer>,
2411        app_state: Arc<AppState>,
2412        server: Arc<Server>,
2413        notifications: mpsc::Receiver<()>,
2414        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2415        forbid_connections: Arc<AtomicBool>,
2416        _test_db: TestDb,
2417    }
2418
2419    impl TestServer {
2420        async fn start() -> Self {
2421            let test_db = TestDb::new();
2422            let app_state = Self::build_app_state(&test_db).await;
2423            let peer = Peer::new();
2424            let notifications = mpsc::channel(128);
2425            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2426            Self {
2427                peer,
2428                app_state,
2429                server,
2430                notifications: notifications.1,
2431                connection_killers: Default::default(),
2432                forbid_connections: Default::default(),
2433                _test_db: test_db,
2434            }
2435        }
2436
2437        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2438            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2439            let client_name = name.to_string();
2440            let mut client = Client::new();
2441            let server = self.server.clone();
2442            let connection_killers = self.connection_killers.clone();
2443            let forbid_connections = self.forbid_connections.clone();
2444            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2445
2446            Arc::get_mut(&mut client)
2447                .unwrap()
2448                .override_authenticate(move |cx| {
2449                    cx.spawn(|_| async move {
2450                        let access_token = "the-token".to_string();
2451                        Ok(Credentials {
2452                            user_id: user_id.0 as u64,
2453                            access_token,
2454                        })
2455                    })
2456                })
2457                .override_establish_connection(move |credentials, cx| {
2458                    assert_eq!(credentials.user_id, user_id.0 as u64);
2459                    assert_eq!(credentials.access_token, "the-token");
2460
2461                    let server = server.clone();
2462                    let connection_killers = connection_killers.clone();
2463                    let forbid_connections = forbid_connections.clone();
2464                    let client_name = client_name.clone();
2465                    let connection_id_tx = connection_id_tx.clone();
2466                    cx.spawn(move |cx| async move {
2467                        if forbid_connections.load(SeqCst) {
2468                            Err(EstablishConnectionError::other(anyhow!(
2469                                "server is forbidding connections"
2470                            )))
2471                        } else {
2472                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2473                            connection_killers.lock().insert(user_id, kill_conn);
2474                            cx.background()
2475                                .spawn(server.handle_connection(
2476                                    server_conn,
2477                                    client_name,
2478                                    user_id,
2479                                    Some(connection_id_tx),
2480                                ))
2481                                .detach();
2482                            Ok(client_conn)
2483                        }
2484                    })
2485                });
2486
2487            let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2488            client
2489                .authenticate_and_connect(&cx.to_async())
2490                .await
2491                .unwrap();
2492
2493            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2494            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2495            let mut authed_user =
2496                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2497            while authed_user.recv().await.unwrap().is_none() {}
2498
2499            TestClient {
2500                client,
2501                peer_id,
2502                user_store,
2503            }
2504        }
2505
2506        fn disconnect_client(&self, user_id: UserId) {
2507            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2508                let _ = kill_conn.try_send(Some(()));
2509            }
2510        }
2511
2512        fn forbid_connections(&self) {
2513            self.forbid_connections.store(true, SeqCst);
2514        }
2515
2516        fn allow_connections(&self) {
2517            self.forbid_connections.store(false, SeqCst);
2518        }
2519
2520        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2521            let mut config = Config::default();
2522            config.session_secret = "a".repeat(32);
2523            config.database_url = test_db.url.clone();
2524            let github_client = github::AppClient::test();
2525            Arc::new(AppState {
2526                db: test_db.db().clone(),
2527                handlebars: Default::default(),
2528                auth_client: auth::build_client("", ""),
2529                repo_client: github::RepoClient::test(&github_client),
2530                github_client,
2531                config,
2532            })
2533        }
2534
2535        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2536            self.server.store.read()
2537        }
2538
2539        async fn condition<F>(&mut self, mut predicate: F)
2540        where
2541            F: FnMut(&Store) -> bool,
2542        {
2543            async_std::future::timeout(Duration::from_millis(500), async {
2544                while !(predicate)(&*self.server.store.read()) {
2545                    self.notifications.recv().await;
2546                }
2547            })
2548            .await
2549            .expect("condition timed out");
2550        }
2551    }
2552
2553    impl Drop for TestServer {
2554        fn drop(&mut self) {
2555            task::block_on(self.peer.reset());
2556        }
2557    }
2558
2559    struct TestClient {
2560        client: Arc<Client>,
2561        pub peer_id: PeerId,
2562        pub user_store: ModelHandle<UserStore>,
2563    }
2564
2565    impl Deref for TestClient {
2566        type Target = Arc<Client>;
2567
2568        fn deref(&self) -> &Self::Target {
2569            &self.client
2570        }
2571    }
2572
2573    impl TestClient {
2574        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2575            UserId::from_proto(
2576                self.user_store
2577                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2578            )
2579        }
2580    }
2581
2582    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2583        channel
2584            .messages()
2585            .cursor::<()>()
2586            .map(|m| {
2587                (
2588                    m.sender.github_login.clone(),
2589                    m.body.clone(),
2590                    m.is_pending(),
2591                )
2592            })
2593            .collect()
2594    }
2595
2596    struct EmptyView;
2597
2598    impl gpui::Entity for EmptyView {
2599        type Event = ();
2600    }
2601
2602    impl gpui::View for EmptyView {
2603        fn ui_name() -> &'static str {
2604            "empty view"
2605        }
2606
2607        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2608            gpui::Element::boxed(gpui::elements::Empty)
2609        }
2610    }
2611}