rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, mem, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::open_buffer)
  78            .add_handler(Server::close_buffer)
  79            .add_handler(Server::update_buffer)
  80            .add_handler(Server::buffer_saved)
  81            .add_handler(Server::save_buffer)
  82            .add_handler(Server::get_channels)
  83            .add_handler(Server::get_users)
  84            .add_handler(Server::join_channel)
  85            .add_handler(Server::leave_channel)
  86            .add_handler(Server::send_channel_message)
  87            .add_handler(Server::get_channel_messages);
  88
  89        Arc::new(server)
  90    }
  91
  92    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  93    where
  94        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  95        Fut: 'static + Send + Future<Output = tide::Result<()>>,
  96        M: EnvelopedMessage,
  97    {
  98        let prev_handler = self.handlers.insert(
  99            TypeId::of::<M>(),
 100            Box::new(move |server, envelope| {
 101                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 102                (handler)(server, *envelope).boxed()
 103            }),
 104        );
 105        if prev_handler.is_some() {
 106            panic!("registered a handler for the same message twice");
 107        }
 108        self
 109    }
 110
 111    pub fn handle_connection(
 112        self: &Arc<Self>,
 113        connection: Connection,
 114        addr: String,
 115        user_id: UserId,
 116        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 117    ) -> impl Future<Output = ()> {
 118        let mut this = self.clone();
 119        async move {
 120            let (connection_id, handle_io, mut incoming_rx) =
 121                this.peer.add_connection(connection).await;
 122
 123            if let Some(send_connection_id) = send_connection_id.as_mut() {
 124                let _ = send_connection_id.send(connection_id).await;
 125            }
 126
 127            this.state_mut().add_connection(connection_id, user_id);
 128            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 129                log::error!("error updating contacts for {:?}: {}", user_id, err);
 130            }
 131
 132            let handle_io = handle_io.fuse();
 133            futures::pin_mut!(handle_io);
 134            loop {
 135                let next_message = incoming_rx.recv().fuse();
 136                futures::pin_mut!(next_message);
 137                futures::select_biased! {
 138                    message = next_message => {
 139                        if let Some(message) = message {
 140                            let start_time = Instant::now();
 141                            log::info!("RPC message received: {}", message.payload_type_name());
 142                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 143                                if let Err(err) = (handler)(this.clone(), message).await {
 144                                    log::error!("error handling message: {:?}", err);
 145                                } else {
 146                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 147                                }
 148
 149                                if let Some(mut notifications) = this.notifications.clone() {
 150                                    let _ = notifications.send(()).await;
 151                                }
 152                            } else {
 153                                log::warn!("unhandled message: {}", message.payload_type_name());
 154                            }
 155                        } else {
 156                            log::info!("rpc connection closed {:?}", addr);
 157                            break;
 158                        }
 159                    }
 160                    handle_io = handle_io => {
 161                        if let Err(err) = handle_io {
 162                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 163                        }
 164                        break;
 165                    }
 166                }
 167            }
 168
 169            if let Err(err) = this.sign_out(connection_id).await {
 170                log::error!("error signing out connection {:?} - {:?}", addr, err);
 171            }
 172        }
 173    }
 174
 175    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 176        self.peer.disconnect(connection_id).await;
 177        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 178
 179        for (project_id, project) in removed_connection.hosted_projects {
 180            if let Some(share) = project.share {
 181                broadcast(
 182                    connection_id,
 183                    share.guests.keys().copied().collect(),
 184                    |conn_id| {
 185                        self.peer
 186                            .send(conn_id, proto::UnshareProject { project_id })
 187                    },
 188                )
 189                .await?;
 190            }
 191        }
 192
 193        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 194            broadcast(connection_id, peer_ids, |conn_id| {
 195                self.peer.send(
 196                    conn_id,
 197                    proto::RemoveProjectCollaborator {
 198                        project_id,
 199                        peer_id: connection_id.0,
 200                    },
 201                )
 202            })
 203            .await?;
 204        }
 205
 206        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 207            .await?;
 208
 209        Ok(())
 210    }
 211
 212    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 213        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 214        Ok(())
 215    }
 216
 217    async fn register_project(
 218        mut self: Arc<Server>,
 219        request: TypedEnvelope<proto::RegisterProject>,
 220    ) -> tide::Result<()> {
 221        let project_id = {
 222            let mut state = self.state_mut();
 223            let user_id = state.user_id_for_connection(request.sender_id)?;
 224            state.register_project(request.sender_id, user_id)
 225        };
 226        self.peer
 227            .respond(
 228                request.receipt(),
 229                proto::RegisterProjectResponse { project_id },
 230            )
 231            .await?;
 232        Ok(())
 233    }
 234
 235    async fn unregister_project(
 236        mut self: Arc<Server>,
 237        request: TypedEnvelope<proto::UnregisterProject>,
 238    ) -> tide::Result<()> {
 239        let project = self
 240            .state_mut()
 241            .unregister_project(request.payload.project_id, request.sender_id)
 242            .ok_or_else(|| anyhow!("no such project"))?;
 243        self.update_contacts_for_users(project.authorized_user_ids().iter())
 244            .await?;
 245        Ok(())
 246    }
 247
 248    async fn share_project(
 249        mut self: Arc<Server>,
 250        request: TypedEnvelope<proto::ShareProject>,
 251    ) -> tide::Result<()> {
 252        self.state_mut()
 253            .share_project(request.payload.project_id, request.sender_id);
 254        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 255        Ok(())
 256    }
 257
 258    async fn unshare_project(
 259        mut self: Arc<Server>,
 260        request: TypedEnvelope<proto::UnshareProject>,
 261    ) -> tide::Result<()> {
 262        let project_id = request.payload.project_id;
 263        let project = self
 264            .state_mut()
 265            .unshare_project(project_id, request.sender_id)?;
 266
 267        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 268            self.peer
 269                .send(conn_id, proto::UnshareProject { project_id })
 270        })
 271        .await?;
 272        self.update_contacts_for_users(&project.authorized_user_ids)
 273            .await?;
 274
 275        Ok(())
 276    }
 277
 278    async fn join_project(
 279        mut self: Arc<Server>,
 280        request: TypedEnvelope<proto::JoinProject>,
 281    ) -> tide::Result<()> {
 282        let project_id = request.payload.project_id;
 283
 284        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 285        let response_data = self
 286            .state_mut()
 287            .join_project(request.sender_id, user_id, project_id)
 288            .and_then(|joined| {
 289                let share = joined.project.share()?;
 290                let peer_count = share.guests.len();
 291                let mut collaborators = Vec::with_capacity(peer_count);
 292                collaborators.push(proto::Collaborator {
 293                    peer_id: joined.project.host_connection_id.0,
 294                    replica_id: 0,
 295                    user_id: joined.project.host_user_id.to_proto(),
 296                });
 297                let worktrees = joined
 298                    .project
 299                    .worktrees
 300                    .iter()
 301                    .filter_map(|(id, worktree)| {
 302                        worktree.share.as_ref().map(|share| proto::Worktree {
 303                            id: *id,
 304                            root_name: worktree.root_name.clone(),
 305                            entries: share.entries.values().cloned().collect(),
 306                            diagnostic_summaries: share
 307                                .diagnostic_summaries
 308                                .values()
 309                                .cloned()
 310                                .collect(),
 311                        })
 312                    })
 313                    .collect();
 314                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 315                    if *peer_conn_id != request.sender_id {
 316                        collaborators.push(proto::Collaborator {
 317                            peer_id: peer_conn_id.0,
 318                            replica_id: *peer_replica_id as u32,
 319                            user_id: peer_user_id.to_proto(),
 320                        });
 321                    }
 322                }
 323                let response = proto::JoinProjectResponse {
 324                    worktrees,
 325                    replica_id: joined.replica_id as u32,
 326                    collaborators,
 327                };
 328                let connection_ids = joined.project.connection_ids();
 329                let contact_user_ids = joined.project.authorized_user_ids();
 330                Ok((response, connection_ids, contact_user_ids))
 331            });
 332
 333        match response_data {
 334            Ok((response, connection_ids, contact_user_ids)) => {
 335                broadcast(request.sender_id, connection_ids, |conn_id| {
 336                    self.peer.send(
 337                        conn_id,
 338                        proto::AddProjectCollaborator {
 339                            project_id: project_id,
 340                            collaborator: Some(proto::Collaborator {
 341                                peer_id: request.sender_id.0,
 342                                replica_id: response.replica_id,
 343                                user_id: user_id.to_proto(),
 344                            }),
 345                        },
 346                    )
 347                })
 348                .await?;
 349                self.peer.respond(request.receipt(), response).await?;
 350                self.update_contacts_for_users(&contact_user_ids).await?;
 351            }
 352            Err(error) => {
 353                self.peer
 354                    .respond_with_error(
 355                        request.receipt(),
 356                        proto::Error {
 357                            message: error.to_string(),
 358                        },
 359                    )
 360                    .await?;
 361            }
 362        }
 363
 364        Ok(())
 365    }
 366
 367    async fn leave_project(
 368        mut self: Arc<Server>,
 369        request: TypedEnvelope<proto::LeaveProject>,
 370    ) -> tide::Result<()> {
 371        let sender_id = request.sender_id;
 372        let project_id = request.payload.project_id;
 373        let worktree = self.state_mut().leave_project(sender_id, project_id);
 374        if let Some(worktree) = worktree {
 375            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 376                self.peer.send(
 377                    conn_id,
 378                    proto::RemoveProjectCollaborator {
 379                        project_id,
 380                        peer_id: sender_id.0,
 381                    },
 382                )
 383            })
 384            .await?;
 385            self.update_contacts_for_users(&worktree.authorized_user_ids)
 386                .await?;
 387        }
 388        Ok(())
 389    }
 390
 391    async fn register_worktree(
 392        mut self: Arc<Server>,
 393        request: TypedEnvelope<proto::RegisterWorktree>,
 394    ) -> tide::Result<()> {
 395        let receipt = request.receipt();
 396        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 397
 398        let mut contact_user_ids = HashSet::default();
 399        contact_user_ids.insert(host_user_id);
 400        for github_login in request.payload.authorized_logins {
 401            match self.app_state.db.create_user(&github_login, false).await {
 402                Ok(contact_user_id) => {
 403                    contact_user_ids.insert(contact_user_id);
 404                }
 405                Err(err) => {
 406                    let message = err.to_string();
 407                    self.peer
 408                        .respond_with_error(receipt, proto::Error { message })
 409                        .await?;
 410                    return Ok(());
 411                }
 412            }
 413        }
 414
 415        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 416        let ok = self.state_mut().register_worktree(
 417            request.payload.project_id,
 418            request.payload.worktree_id,
 419            Worktree {
 420                authorized_user_ids: contact_user_ids.clone(),
 421                root_name: request.payload.root_name,
 422                share: None,
 423            },
 424        );
 425
 426        if ok {
 427            self.peer.respond(receipt, proto::Ack {}).await?;
 428            self.update_contacts_for_users(&contact_user_ids).await?;
 429        } else {
 430            self.peer
 431                .respond_with_error(
 432                    receipt,
 433                    proto::Error {
 434                        message: NO_SUCH_PROJECT.to_string(),
 435                    },
 436                )
 437                .await?;
 438        }
 439
 440        Ok(())
 441    }
 442
 443    async fn unregister_worktree(
 444        mut self: Arc<Server>,
 445        request: TypedEnvelope<proto::UnregisterWorktree>,
 446    ) -> tide::Result<()> {
 447        let project_id = request.payload.project_id;
 448        let worktree_id = request.payload.worktree_id;
 449        let (worktree, guest_connection_ids) =
 450            self.state_mut()
 451                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 452
 453        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 454            self.peer.send(
 455                conn_id,
 456                proto::UnregisterWorktree {
 457                    project_id,
 458                    worktree_id,
 459                },
 460            )
 461        })
 462        .await?;
 463        self.update_contacts_for_users(&worktree.authorized_user_ids)
 464            .await?;
 465        Ok(())
 466    }
 467
 468    async fn share_worktree(
 469        mut self: Arc<Server>,
 470        mut request: TypedEnvelope<proto::ShareWorktree>,
 471    ) -> tide::Result<()> {
 472        let worktree = request
 473            .payload
 474            .worktree
 475            .as_mut()
 476            .ok_or_else(|| anyhow!("missing worktree"))?;
 477        let entries = mem::take(&mut worktree.entries)
 478            .into_iter()
 479            .map(|entry| (entry.id, entry))
 480            .collect();
 481
 482        let diagnostic_summaries = mem::take(&mut worktree.diagnostic_summaries)
 483            .into_iter()
 484            .map(|summary| (PathBuf::from(summary.path.clone()), summary))
 485            .collect();
 486
 487        let contact_user_ids = self.state_mut().share_worktree(
 488            request.payload.project_id,
 489            worktree.id,
 490            request.sender_id,
 491            entries,
 492            diagnostic_summaries,
 493        );
 494        if let Some(contact_user_ids) = contact_user_ids {
 495            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 496            self.update_contacts_for_users(&contact_user_ids).await?;
 497        } else {
 498            self.peer
 499                .respond_with_error(
 500                    request.receipt(),
 501                    proto::Error {
 502                        message: "no such worktree".to_string(),
 503                    },
 504                )
 505                .await?;
 506        }
 507        Ok(())
 508    }
 509
 510    async fn update_worktree(
 511        mut self: Arc<Server>,
 512        request: TypedEnvelope<proto::UpdateWorktree>,
 513    ) -> tide::Result<()> {
 514        let connection_ids = self
 515            .state_mut()
 516            .update_worktree(
 517                request.sender_id,
 518                request.payload.project_id,
 519                request.payload.worktree_id,
 520                &request.payload.removed_entries,
 521                &request.payload.updated_entries,
 522            )
 523            .ok_or_else(|| anyhow!("no such worktree"))?;
 524
 525        broadcast(request.sender_id, connection_ids, |connection_id| {
 526            self.peer
 527                .forward_send(request.sender_id, connection_id, request.payload.clone())
 528        })
 529        .await?;
 530
 531        Ok(())
 532    }
 533
 534    async fn update_diagnostic_summary(
 535        mut self: Arc<Server>,
 536        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 537    ) -> tide::Result<()> {
 538        let receiver_ids = request
 539            .payload
 540            .summary
 541            .clone()
 542            .and_then(|summary| {
 543                self.state_mut().update_diagnostic_summary(
 544                    request.payload.project_id,
 545                    request.payload.worktree_id,
 546                    request.sender_id,
 547                    summary,
 548                )
 549            })
 550            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 551
 552        broadcast(request.sender_id, receiver_ids, |connection_id| {
 553            self.peer
 554                .forward_send(request.sender_id, connection_id, request.payload.clone())
 555        })
 556        .await?;
 557        Ok(())
 558    }
 559
 560    async fn disk_based_diagnostics_updating(
 561        self: Arc<Server>,
 562        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 563    ) -> tide::Result<()> {
 564        let receiver_ids = self
 565            .state()
 566            .project_connection_ids(request.payload.project_id, request.sender_id)
 567            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 568        broadcast(request.sender_id, receiver_ids, |connection_id| {
 569            self.peer
 570                .forward_send(request.sender_id, connection_id, request.payload.clone())
 571        })
 572        .await?;
 573        Ok(())
 574    }
 575
 576    async fn disk_based_diagnostics_updated(
 577        self: Arc<Server>,
 578        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 579    ) -> tide::Result<()> {
 580        let receiver_ids = self
 581            .state()
 582            .project_connection_ids(request.payload.project_id, request.sender_id)
 583            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 584        broadcast(request.sender_id, receiver_ids, |connection_id| {
 585            self.peer
 586                .forward_send(request.sender_id, connection_id, request.payload.clone())
 587        })
 588        .await?;
 589        Ok(())
 590    }
 591
 592    async fn open_buffer(
 593        self: Arc<Server>,
 594        request: TypedEnvelope<proto::OpenBuffer>,
 595    ) -> tide::Result<()> {
 596        let receipt = request.receipt();
 597        let host_connection_id = self
 598            .state()
 599            .read_project(request.payload.project_id, request.sender_id)
 600            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 601            .host_connection_id;
 602        let response = self
 603            .peer
 604            .forward_request(request.sender_id, host_connection_id, request.payload)
 605            .await?;
 606        self.peer.respond(receipt, response).await?;
 607        Ok(())
 608    }
 609
 610    async fn close_buffer(
 611        self: Arc<Server>,
 612        request: TypedEnvelope<proto::CloseBuffer>,
 613    ) -> tide::Result<()> {
 614        let host_connection_id = self
 615            .state()
 616            .read_project(request.payload.project_id, request.sender_id)
 617            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 618            .host_connection_id;
 619        self.peer
 620            .forward_send(request.sender_id, host_connection_id, request.payload)
 621            .await?;
 622        Ok(())
 623    }
 624
 625    async fn save_buffer(
 626        self: Arc<Server>,
 627        request: TypedEnvelope<proto::SaveBuffer>,
 628    ) -> tide::Result<()> {
 629        let host;
 630        let guests;
 631        {
 632            let state = self.state();
 633            let project = state
 634                .read_project(request.payload.project_id, request.sender_id)
 635                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 636            host = project.host_connection_id;
 637            guests = project.guest_connection_ids()
 638        }
 639
 640        let sender = request.sender_id;
 641        let receipt = request.receipt();
 642        let response = self
 643            .peer
 644            .forward_request(sender, host, request.payload.clone())
 645            .await?;
 646
 647        broadcast(host, guests, |conn_id| {
 648            let response = response.clone();
 649            let peer = &self.peer;
 650            async move {
 651                if conn_id == sender {
 652                    peer.respond(receipt, response).await
 653                } else {
 654                    peer.forward_send(host, conn_id, response).await
 655                }
 656            }
 657        })
 658        .await?;
 659
 660        Ok(())
 661    }
 662
 663    async fn update_buffer(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::UpdateBuffer>,
 666    ) -> tide::Result<()> {
 667        let receiver_ids = self
 668            .state()
 669            .project_connection_ids(request.payload.project_id, request.sender_id)
 670            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 671        broadcast(request.sender_id, receiver_ids, |connection_id| {
 672            self.peer
 673                .forward_send(request.sender_id, connection_id, request.payload.clone())
 674        })
 675        .await?;
 676        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 677        Ok(())
 678    }
 679
 680    async fn buffer_saved(
 681        self: Arc<Server>,
 682        request: TypedEnvelope<proto::BufferSaved>,
 683    ) -> tide::Result<()> {
 684        let receiver_ids = self
 685            .state()
 686            .project_connection_ids(request.payload.project_id, request.sender_id)
 687            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 688        broadcast(request.sender_id, receiver_ids, |connection_id| {
 689            self.peer
 690                .forward_send(request.sender_id, connection_id, request.payload.clone())
 691        })
 692        .await?;
 693        Ok(())
 694    }
 695
 696    async fn get_channels(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::GetChannels>,
 699    ) -> tide::Result<()> {
 700        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 701        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 702        self.peer
 703            .respond(
 704                request.receipt(),
 705                proto::GetChannelsResponse {
 706                    channels: channels
 707                        .into_iter()
 708                        .map(|chan| proto::Channel {
 709                            id: chan.id.to_proto(),
 710                            name: chan.name,
 711                        })
 712                        .collect(),
 713                },
 714            )
 715            .await?;
 716        Ok(())
 717    }
 718
 719    async fn get_users(
 720        self: Arc<Server>,
 721        request: TypedEnvelope<proto::GetUsers>,
 722    ) -> tide::Result<()> {
 723        let receipt = request.receipt();
 724        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 725        let users = self
 726            .app_state
 727            .db
 728            .get_users_by_ids(user_ids)
 729            .await?
 730            .into_iter()
 731            .map(|user| proto::User {
 732                id: user.id.to_proto(),
 733                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 734                github_login: user.github_login,
 735            })
 736            .collect();
 737        self.peer
 738            .respond(receipt, proto::GetUsersResponse { users })
 739            .await?;
 740        Ok(())
 741    }
 742
 743    async fn update_contacts_for_users<'a>(
 744        self: &Arc<Server>,
 745        user_ids: impl IntoIterator<Item = &'a UserId>,
 746    ) -> tide::Result<()> {
 747        let mut send_futures = Vec::new();
 748
 749        {
 750            let state = self.state();
 751            for user_id in user_ids {
 752                let contacts = state.contacts_for_user(*user_id);
 753                for connection_id in state.connection_ids_for_user(*user_id) {
 754                    send_futures.push(self.peer.send(
 755                        connection_id,
 756                        proto::UpdateContacts {
 757                            contacts: contacts.clone(),
 758                        },
 759                    ));
 760                }
 761            }
 762        }
 763        futures::future::try_join_all(send_futures).await?;
 764
 765        Ok(())
 766    }
 767
 768    async fn join_channel(
 769        mut self: Arc<Self>,
 770        request: TypedEnvelope<proto::JoinChannel>,
 771    ) -> tide::Result<()> {
 772        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 773        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 774        if !self
 775            .app_state
 776            .db
 777            .can_user_access_channel(user_id, channel_id)
 778            .await?
 779        {
 780            Err(anyhow!("access denied"))?;
 781        }
 782
 783        self.state_mut().join_channel(request.sender_id, channel_id);
 784        let messages = self
 785            .app_state
 786            .db
 787            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 788            .await?
 789            .into_iter()
 790            .map(|msg| proto::ChannelMessage {
 791                id: msg.id.to_proto(),
 792                body: msg.body,
 793                timestamp: msg.sent_at.unix_timestamp() as u64,
 794                sender_id: msg.sender_id.to_proto(),
 795                nonce: Some(msg.nonce.as_u128().into()),
 796            })
 797            .collect::<Vec<_>>();
 798        self.peer
 799            .respond(
 800                request.receipt(),
 801                proto::JoinChannelResponse {
 802                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 803                    messages,
 804                },
 805            )
 806            .await?;
 807        Ok(())
 808    }
 809
 810    async fn leave_channel(
 811        mut self: Arc<Self>,
 812        request: TypedEnvelope<proto::LeaveChannel>,
 813    ) -> tide::Result<()> {
 814        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 815        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 816        if !self
 817            .app_state
 818            .db
 819            .can_user_access_channel(user_id, channel_id)
 820            .await?
 821        {
 822            Err(anyhow!("access denied"))?;
 823        }
 824
 825        self.state_mut()
 826            .leave_channel(request.sender_id, channel_id);
 827
 828        Ok(())
 829    }
 830
 831    async fn send_channel_message(
 832        self: Arc<Self>,
 833        request: TypedEnvelope<proto::SendChannelMessage>,
 834    ) -> tide::Result<()> {
 835        let receipt = request.receipt();
 836        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 837        let user_id;
 838        let connection_ids;
 839        {
 840            let state = self.state();
 841            user_id = state.user_id_for_connection(request.sender_id)?;
 842            if let Some(ids) = state.channel_connection_ids(channel_id) {
 843                connection_ids = ids;
 844            } else {
 845                return Ok(());
 846            }
 847        }
 848
 849        // Validate the message body.
 850        let body = request.payload.body.trim().to_string();
 851        if body.len() > MAX_MESSAGE_LEN {
 852            self.peer
 853                .respond_with_error(
 854                    receipt,
 855                    proto::Error {
 856                        message: "message is too long".to_string(),
 857                    },
 858                )
 859                .await?;
 860            return Ok(());
 861        }
 862        if body.is_empty() {
 863            self.peer
 864                .respond_with_error(
 865                    receipt,
 866                    proto::Error {
 867                        message: "message can't be blank".to_string(),
 868                    },
 869                )
 870                .await?;
 871            return Ok(());
 872        }
 873
 874        let timestamp = OffsetDateTime::now_utc();
 875        let nonce = if let Some(nonce) = request.payload.nonce {
 876            nonce
 877        } else {
 878            self.peer
 879                .respond_with_error(
 880                    receipt,
 881                    proto::Error {
 882                        message: "nonce can't be blank".to_string(),
 883                    },
 884                )
 885                .await?;
 886            return Ok(());
 887        };
 888
 889        let message_id = self
 890            .app_state
 891            .db
 892            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 893            .await?
 894            .to_proto();
 895        let message = proto::ChannelMessage {
 896            sender_id: user_id.to_proto(),
 897            id: message_id,
 898            body,
 899            timestamp: timestamp.unix_timestamp() as u64,
 900            nonce: Some(nonce),
 901        };
 902        broadcast(request.sender_id, connection_ids, |conn_id| {
 903            self.peer.send(
 904                conn_id,
 905                proto::ChannelMessageSent {
 906                    channel_id: channel_id.to_proto(),
 907                    message: Some(message.clone()),
 908                },
 909            )
 910        })
 911        .await?;
 912        self.peer
 913            .respond(
 914                receipt,
 915                proto::SendChannelMessageResponse {
 916                    message: Some(message),
 917                },
 918            )
 919            .await?;
 920        Ok(())
 921    }
 922
 923    async fn get_channel_messages(
 924        self: Arc<Self>,
 925        request: TypedEnvelope<proto::GetChannelMessages>,
 926    ) -> tide::Result<()> {
 927        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 928        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 929        if !self
 930            .app_state
 931            .db
 932            .can_user_access_channel(user_id, channel_id)
 933            .await?
 934        {
 935            Err(anyhow!("access denied"))?;
 936        }
 937
 938        let messages = self
 939            .app_state
 940            .db
 941            .get_channel_messages(
 942                channel_id,
 943                MESSAGE_COUNT_PER_PAGE,
 944                Some(MessageId::from_proto(request.payload.before_message_id)),
 945            )
 946            .await?
 947            .into_iter()
 948            .map(|msg| proto::ChannelMessage {
 949                id: msg.id.to_proto(),
 950                body: msg.body,
 951                timestamp: msg.sent_at.unix_timestamp() as u64,
 952                sender_id: msg.sender_id.to_proto(),
 953                nonce: Some(msg.nonce.as_u128().into()),
 954            })
 955            .collect::<Vec<_>>();
 956        self.peer
 957            .respond(
 958                request.receipt(),
 959                proto::GetChannelMessagesResponse {
 960                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 961                    messages,
 962                },
 963            )
 964            .await?;
 965        Ok(())
 966    }
 967
 968    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 969        self.store.read()
 970    }
 971
 972    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 973        self.store.write()
 974    }
 975}
 976
 977pub async fn broadcast<F, T>(
 978    sender_id: ConnectionId,
 979    receiver_ids: Vec<ConnectionId>,
 980    mut f: F,
 981) -> anyhow::Result<()>
 982where
 983    F: FnMut(ConnectionId) -> T,
 984    T: Future<Output = anyhow::Result<()>>,
 985{
 986    let futures = receiver_ids
 987        .into_iter()
 988        .filter(|id| *id != sender_id)
 989        .map(|id| f(id));
 990    futures::future::try_join_all(futures).await?;
 991    Ok(())
 992}
 993
 994pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 995    let server = Server::new(app.state().clone(), rpc.clone(), None);
 996    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 997        let server = server.clone();
 998        async move {
 999            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1000
1001            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1002            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1003            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1004            let client_protocol_version: Option<u32> = request
1005                .header("X-Zed-Protocol-Version")
1006                .and_then(|v| v.as_str().parse().ok());
1007
1008            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1009                return Ok(Response::new(StatusCode::UpgradeRequired));
1010            }
1011
1012            let header = match request.header("Sec-Websocket-Key") {
1013                Some(h) => h.as_str(),
1014                None => return Err(anyhow!("expected sec-websocket-key"))?,
1015            };
1016
1017            let user_id = process_auth_header(&request).await?;
1018
1019            let mut response = Response::new(StatusCode::SwitchingProtocols);
1020            response.insert_header(UPGRADE, "websocket");
1021            response.insert_header(CONNECTION, "Upgrade");
1022            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1023            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1024            response.insert_header("Sec-Websocket-Version", "13");
1025
1026            let http_res: &mut tide::http::Response = response.as_mut();
1027            let upgrade_receiver = http_res.recv_upgrade().await;
1028            let addr = request.remote().unwrap_or("unknown").to_string();
1029            task::spawn(async move {
1030                if let Some(stream) = upgrade_receiver.await {
1031                    server
1032                        .handle_connection(
1033                            Connection::new(
1034                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1035                            ),
1036                            addr,
1037                            user_id,
1038                            None,
1039                        )
1040                        .await;
1041                }
1042            });
1043
1044            Ok(response)
1045        }
1046    });
1047}
1048
1049fn header_contains_ignore_case<T>(
1050    request: &tide::Request<T>,
1051    header_name: HeaderName,
1052    value: &str,
1053) -> bool {
1054    request
1055        .header(header_name)
1056        .map(|h| {
1057            h.as_str()
1058                .split(',')
1059                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1060        })
1061        .unwrap_or(false)
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use super::*;
1067    use crate::{
1068        auth,
1069        db::{tests::TestDb, UserId},
1070        github, AppState, Config,
1071    };
1072    use ::rpc::Peer;
1073    use async_std::task;
1074    use gpui::{executor, ModelHandle, TestAppContext};
1075    use parking_lot::Mutex;
1076    use postage::{mpsc, watch};
1077    use rpc::PeerId;
1078    use serde_json::json;
1079    use sqlx::types::time::OffsetDateTime;
1080    use std::{
1081        ops::Deref,
1082        path::Path,
1083        rc::Rc,
1084        sync::{
1085            atomic::{AtomicBool, Ordering::SeqCst},
1086            Arc,
1087        },
1088        time::Duration,
1089    };
1090    use zed::{
1091        client::{
1092            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1093            EstablishConnectionError, UserStore,
1094        },
1095        editor::{Editor, EditorSettings, Input, MultiBuffer},
1096        fs::{FakeFs, Fs as _},
1097        language::{
1098            tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1099            LanguageRegistry, LanguageServerConfig, Point,
1100        },
1101        lsp,
1102        project::{DiagnosticSummary, Project, ProjectPath},
1103    };
1104
1105    #[gpui::test]
1106    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1107        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1108        let lang_registry = Arc::new(LanguageRegistry::new());
1109        let fs = Arc::new(FakeFs::new());
1110        cx_a.foreground().forbid_parking();
1111
1112        // Connect to a server as 2 clients.
1113        let mut server = TestServer::start(cx_a.foreground()).await;
1114        let client_a = server.create_client(&mut cx_a, "user_a").await;
1115        let client_b = server.create_client(&mut cx_b, "user_b").await;
1116
1117        // Share a project as client A
1118        fs.insert_tree(
1119            "/a",
1120            json!({
1121                ".zed.toml": r#"collaborators = ["user_b"]"#,
1122                "a.txt": "a-contents",
1123                "b.txt": "b-contents",
1124            }),
1125        )
1126        .await;
1127        let project_a = cx_a.update(|cx| {
1128            Project::local(
1129                client_a.clone(),
1130                client_a.user_store.clone(),
1131                lang_registry.clone(),
1132                fs.clone(),
1133                cx,
1134            )
1135        });
1136        let worktree_a = project_a
1137            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1138            .await
1139            .unwrap();
1140        worktree_a
1141            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1142            .await;
1143        let project_id = project_a
1144            .update(&mut cx_a, |project, _| project.next_remote_id())
1145            .await;
1146        project_a
1147            .update(&mut cx_a, |project, cx| project.share(cx))
1148            .await
1149            .unwrap();
1150
1151        // Join that project as client B
1152        let project_b = Project::remote(
1153            project_id,
1154            client_b.clone(),
1155            client_b.user_store.clone(),
1156            lang_registry.clone(),
1157            fs.clone(),
1158            &mut cx_b.to_async(),
1159        )
1160        .await
1161        .unwrap();
1162        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1163
1164        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1165            assert_eq!(
1166                project
1167                    .collaborators()
1168                    .get(&client_a.peer_id)
1169                    .unwrap()
1170                    .user
1171                    .github_login,
1172                "user_a"
1173            );
1174            project.replica_id()
1175        });
1176        project_a
1177            .condition(&cx_a, |tree, _| {
1178                tree.collaborators()
1179                    .get(&client_b.peer_id)
1180                    .map_or(false, |collaborator| {
1181                        collaborator.replica_id == replica_id_b
1182                            && collaborator.user.github_login == "user_b"
1183                    })
1184            })
1185            .await;
1186
1187        // Open the same file as client B and client A.
1188        let buffer_b = worktree_b
1189            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1190            .await
1191            .unwrap();
1192        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1193        buffer_b.read_with(&cx_b, |buf, cx| {
1194            assert_eq!(buf.read(cx).text(), "b-contents")
1195        });
1196        worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1197        let buffer_a = worktree_a
1198            .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1199            .await
1200            .unwrap();
1201
1202        let editor_b = cx_b.add_view(window_b, |cx| {
1203            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1204        });
1205        // TODO
1206        // // Create a selection set as client B and see that selection set as client A.
1207        // buffer_a
1208        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1209        //     .await;
1210
1211        // Edit the buffer as client B and see that edit as client A.
1212        editor_b.update(&mut cx_b, |editor, cx| {
1213            editor.handle_input(&Input("ok, ".into()), cx)
1214        });
1215        buffer_a
1216            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1217            .await;
1218
1219        // TODO
1220        // // Remove the selection set as client B, see those selections disappear as client A.
1221        cx_b.update(move |_| drop(editor_b));
1222        // buffer_a
1223        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1224        //     .await;
1225
1226        // Close the buffer as client A, see that the buffer is closed.
1227        cx_a.update(move |_| drop(buffer_a));
1228        worktree_a
1229            .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1230            .await;
1231
1232        // Dropping the client B's project removes client B from client A's collaborators.
1233        cx_b.update(move |_| drop(project_b));
1234        project_a
1235            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1236            .await;
1237    }
1238
1239    #[gpui::test]
1240    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1241        let lang_registry = Arc::new(LanguageRegistry::new());
1242        let fs = Arc::new(FakeFs::new());
1243        cx_a.foreground().forbid_parking();
1244
1245        // Connect to a server as 2 clients.
1246        let mut server = TestServer::start(cx_a.foreground()).await;
1247        let client_a = server.create_client(&mut cx_a, "user_a").await;
1248        let client_b = server.create_client(&mut cx_b, "user_b").await;
1249
1250        // Share a project as client A
1251        fs.insert_tree(
1252            "/a",
1253            json!({
1254                ".zed.toml": r#"collaborators = ["user_b"]"#,
1255                "a.txt": "a-contents",
1256                "b.txt": "b-contents",
1257            }),
1258        )
1259        .await;
1260        let project_a = cx_a.update(|cx| {
1261            Project::local(
1262                client_a.clone(),
1263                client_a.user_store.clone(),
1264                lang_registry.clone(),
1265                fs.clone(),
1266                cx,
1267            )
1268        });
1269        let worktree_a = project_a
1270            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1271            .await
1272            .unwrap();
1273        worktree_a
1274            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1275            .await;
1276        let project_id = project_a
1277            .update(&mut cx_a, |project, _| project.next_remote_id())
1278            .await;
1279        project_a
1280            .update(&mut cx_a, |project, cx| project.share(cx))
1281            .await
1282            .unwrap();
1283
1284        // Join that project as client B
1285        let project_b = Project::remote(
1286            project_id,
1287            client_b.clone(),
1288            client_b.user_store.clone(),
1289            lang_registry.clone(),
1290            fs.clone(),
1291            &mut cx_b.to_async(),
1292        )
1293        .await
1294        .unwrap();
1295
1296        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1297        worktree_b
1298            .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1299            .await
1300            .unwrap();
1301
1302        project_a
1303            .update(&mut cx_a, |project, cx| project.unshare(cx))
1304            .await
1305            .unwrap();
1306        project_b
1307            .condition(&mut cx_b, |project, _| project.is_read_only())
1308            .await;
1309    }
1310
1311    #[gpui::test]
1312    async fn test_propagate_saves_and_fs_changes(
1313        mut cx_a: TestAppContext,
1314        mut cx_b: TestAppContext,
1315        mut cx_c: TestAppContext,
1316    ) {
1317        let lang_registry = Arc::new(LanguageRegistry::new());
1318        let fs = Arc::new(FakeFs::new());
1319        cx_a.foreground().forbid_parking();
1320
1321        // Connect to a server as 3 clients.
1322        let mut server = TestServer::start(cx_a.foreground()).await;
1323        let client_a = server.create_client(&mut cx_a, "user_a").await;
1324        let client_b = server.create_client(&mut cx_b, "user_b").await;
1325        let client_c = server.create_client(&mut cx_c, "user_c").await;
1326
1327        // Share a worktree as client A.
1328        fs.insert_tree(
1329            "/a",
1330            json!({
1331                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1332                "file1": "",
1333                "file2": ""
1334            }),
1335        )
1336        .await;
1337        let project_a = cx_a.update(|cx| {
1338            Project::local(
1339                client_a.clone(),
1340                client_a.user_store.clone(),
1341                lang_registry.clone(),
1342                fs.clone(),
1343                cx,
1344            )
1345        });
1346        let worktree_a = project_a
1347            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1348            .await
1349            .unwrap();
1350        worktree_a
1351            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1352            .await;
1353        let project_id = project_a
1354            .update(&mut cx_a, |project, _| project.next_remote_id())
1355            .await;
1356        project_a
1357            .update(&mut cx_a, |project, cx| project.share(cx))
1358            .await
1359            .unwrap();
1360
1361        // Join that worktree as clients B and C.
1362        let project_b = Project::remote(
1363            project_id,
1364            client_b.clone(),
1365            client_b.user_store.clone(),
1366            lang_registry.clone(),
1367            fs.clone(),
1368            &mut cx_b.to_async(),
1369        )
1370        .await
1371        .unwrap();
1372        let project_c = Project::remote(
1373            project_id,
1374            client_c.clone(),
1375            client_c.user_store.clone(),
1376            lang_registry.clone(),
1377            fs.clone(),
1378            &mut cx_c.to_async(),
1379        )
1380        .await
1381        .unwrap();
1382
1383        // Open and edit a buffer as both guests B and C.
1384        let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1385        let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1386        let buffer_b = worktree_b
1387            .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1388            .await
1389            .unwrap();
1390        let buffer_c = worktree_c
1391            .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1392            .await
1393            .unwrap();
1394        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1395        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1396
1397        // Open and edit that buffer as the host.
1398        let buffer_a = worktree_a
1399            .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1400            .await
1401            .unwrap();
1402
1403        buffer_a
1404            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1405            .await;
1406        buffer_a.update(&mut cx_a, |buf, cx| {
1407            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1408        });
1409
1410        // Wait for edits to propagate
1411        buffer_a
1412            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1413            .await;
1414        buffer_b
1415            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1416            .await;
1417        buffer_c
1418            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1419            .await;
1420
1421        // Edit the buffer as the host and concurrently save as guest B.
1422        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1423        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1424        save_b.await.unwrap();
1425        assert_eq!(
1426            fs.load("/a/file1".as_ref()).await.unwrap(),
1427            "hi-a, i-am-c, i-am-b, i-am-a"
1428        );
1429        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1430        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1431        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1432
1433        // Make changes on host's file system, see those changes on the guests.
1434        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1435            .await
1436            .unwrap();
1437        fs.insert_file(Path::new("/a/file4"), "4".into())
1438            .await
1439            .unwrap();
1440
1441        worktree_b
1442            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1443            .await;
1444        worktree_c
1445            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1446            .await;
1447        worktree_b.read_with(&cx_b, |tree, _| {
1448            assert_eq!(
1449                tree.paths()
1450                    .map(|p| p.to_string_lossy())
1451                    .collect::<Vec<_>>(),
1452                &[".zed.toml", "file1", "file3", "file4"]
1453            )
1454        });
1455        worktree_c.read_with(&cx_c, |tree, _| {
1456            assert_eq!(
1457                tree.paths()
1458                    .map(|p| p.to_string_lossy())
1459                    .collect::<Vec<_>>(),
1460                &[".zed.toml", "file1", "file3", "file4"]
1461            )
1462        });
1463    }
1464
1465    #[gpui::test]
1466    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1467        cx_a.foreground().forbid_parking();
1468        let lang_registry = Arc::new(LanguageRegistry::new());
1469        let fs = Arc::new(FakeFs::new());
1470
1471        // Connect to a server as 2 clients.
1472        let mut server = TestServer::start(cx_a.foreground()).await;
1473        let client_a = server.create_client(&mut cx_a, "user_a").await;
1474        let client_b = server.create_client(&mut cx_b, "user_b").await;
1475
1476        // Share a project as client A
1477        fs.insert_tree(
1478            "/dir",
1479            json!({
1480                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1481                "a.txt": "a-contents",
1482            }),
1483        )
1484        .await;
1485
1486        let project_a = cx_a.update(|cx| {
1487            Project::local(
1488                client_a.clone(),
1489                client_a.user_store.clone(),
1490                lang_registry.clone(),
1491                fs.clone(),
1492                cx,
1493            )
1494        });
1495        let worktree_a = project_a
1496            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1497            .await
1498            .unwrap();
1499        worktree_a
1500            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1501            .await;
1502        let project_id = project_a
1503            .update(&mut cx_a, |project, _| project.next_remote_id())
1504            .await;
1505        project_a
1506            .update(&mut cx_a, |project, cx| project.share(cx))
1507            .await
1508            .unwrap();
1509
1510        // Join that project as client B
1511        let project_b = Project::remote(
1512            project_id,
1513            client_b.clone(),
1514            client_b.user_store.clone(),
1515            lang_registry.clone(),
1516            fs.clone(),
1517            &mut cx_b.to_async(),
1518        )
1519        .await
1520        .unwrap();
1521        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1522
1523        // Open a buffer as client B
1524        let buffer_b = worktree_b
1525            .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1526            .await
1527            .unwrap();
1528        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1529
1530        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1531        buffer_b.read_with(&cx_b, |buf, _| {
1532            assert!(buf.is_dirty());
1533            assert!(!buf.has_conflict());
1534        });
1535
1536        buffer_b
1537            .update(&mut cx_b, |buf, cx| buf.save(cx))
1538            .unwrap()
1539            .await
1540            .unwrap();
1541        worktree_b
1542            .condition(&cx_b, |_, cx| {
1543                buffer_b.read(cx).file().unwrap().mtime() != mtime
1544            })
1545            .await;
1546        buffer_b.read_with(&cx_b, |buf, _| {
1547            assert!(!buf.is_dirty());
1548            assert!(!buf.has_conflict());
1549        });
1550
1551        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1552        buffer_b.read_with(&cx_b, |buf, _| {
1553            assert!(buf.is_dirty());
1554            assert!(!buf.has_conflict());
1555        });
1556    }
1557
1558    #[gpui::test]
1559    async fn test_editing_while_guest_opens_buffer(
1560        mut cx_a: TestAppContext,
1561        mut cx_b: TestAppContext,
1562    ) {
1563        cx_a.foreground().forbid_parking();
1564        let lang_registry = Arc::new(LanguageRegistry::new());
1565        let fs = Arc::new(FakeFs::new());
1566
1567        // Connect to a server as 2 clients.
1568        let mut server = TestServer::start(cx_a.foreground()).await;
1569        let client_a = server.create_client(&mut cx_a, "user_a").await;
1570        let client_b = server.create_client(&mut cx_b, "user_b").await;
1571
1572        // Share a project as client A
1573        fs.insert_tree(
1574            "/dir",
1575            json!({
1576                ".zed.toml": r#"collaborators = ["user_b"]"#,
1577                "a.txt": "a-contents",
1578            }),
1579        )
1580        .await;
1581        let project_a = cx_a.update(|cx| {
1582            Project::local(
1583                client_a.clone(),
1584                client_a.user_store.clone(),
1585                lang_registry.clone(),
1586                fs.clone(),
1587                cx,
1588            )
1589        });
1590        let worktree_a = project_a
1591            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1592            .await
1593            .unwrap();
1594        worktree_a
1595            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1596            .await;
1597        let project_id = project_a
1598            .update(&mut cx_a, |project, _| project.next_remote_id())
1599            .await;
1600        project_a
1601            .update(&mut cx_a, |project, cx| project.share(cx))
1602            .await
1603            .unwrap();
1604
1605        // Join that project as client B
1606        let project_b = Project::remote(
1607            project_id,
1608            client_b.clone(),
1609            client_b.user_store.clone(),
1610            lang_registry.clone(),
1611            fs.clone(),
1612            &mut cx_b.to_async(),
1613        )
1614        .await
1615        .unwrap();
1616        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1617
1618        // Open a buffer as client A
1619        let buffer_a = worktree_a
1620            .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1621            .await
1622            .unwrap();
1623
1624        // Start opening the same buffer as client B
1625        let buffer_b = cx_b
1626            .background()
1627            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1628        task::yield_now().await;
1629
1630        // Edit the buffer as client A while client B is still opening it.
1631        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1632
1633        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1634        let buffer_b = buffer_b.await.unwrap();
1635        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1636    }
1637
1638    #[gpui::test]
1639    async fn test_leaving_worktree_while_opening_buffer(
1640        mut cx_a: TestAppContext,
1641        mut cx_b: TestAppContext,
1642    ) {
1643        cx_a.foreground().forbid_parking();
1644        let lang_registry = Arc::new(LanguageRegistry::new());
1645        let fs = Arc::new(FakeFs::new());
1646
1647        // Connect to a server as 2 clients.
1648        let mut server = TestServer::start(cx_a.foreground()).await;
1649        let client_a = server.create_client(&mut cx_a, "user_a").await;
1650        let client_b = server.create_client(&mut cx_b, "user_b").await;
1651
1652        // Share a project as client A
1653        fs.insert_tree(
1654            "/dir",
1655            json!({
1656                ".zed.toml": r#"collaborators = ["user_b"]"#,
1657                "a.txt": "a-contents",
1658            }),
1659        )
1660        .await;
1661        let project_a = cx_a.update(|cx| {
1662            Project::local(
1663                client_a.clone(),
1664                client_a.user_store.clone(),
1665                lang_registry.clone(),
1666                fs.clone(),
1667                cx,
1668            )
1669        });
1670        let worktree_a = project_a
1671            .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1672            .await
1673            .unwrap();
1674        worktree_a
1675            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1676            .await;
1677        let project_id = project_a
1678            .update(&mut cx_a, |project, _| project.next_remote_id())
1679            .await;
1680        project_a
1681            .update(&mut cx_a, |project, cx| project.share(cx))
1682            .await
1683            .unwrap();
1684
1685        // Join that project as client B
1686        let project_b = Project::remote(
1687            project_id,
1688            client_b.clone(),
1689            client_b.user_store.clone(),
1690            lang_registry.clone(),
1691            fs.clone(),
1692            &mut cx_b.to_async(),
1693        )
1694        .await
1695        .unwrap();
1696        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1697
1698        // See that a guest has joined as client A.
1699        project_a
1700            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1701            .await;
1702
1703        // Begin opening a buffer as client B, but leave the project before the open completes.
1704        let buffer_b = cx_b
1705            .background()
1706            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1707        cx_b.update(|_| drop(project_b));
1708        drop(buffer_b);
1709
1710        // See that the guest has left.
1711        project_a
1712            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1713            .await;
1714    }
1715
1716    #[gpui::test]
1717    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1718        cx_a.foreground().forbid_parking();
1719        let lang_registry = Arc::new(LanguageRegistry::new());
1720        let fs = Arc::new(FakeFs::new());
1721
1722        // Connect to a server as 2 clients.
1723        let mut server = TestServer::start(cx_a.foreground()).await;
1724        let client_a = server.create_client(&mut cx_a, "user_a").await;
1725        let client_b = server.create_client(&mut cx_b, "user_b").await;
1726
1727        // Share a project as client A
1728        fs.insert_tree(
1729            "/a",
1730            json!({
1731                ".zed.toml": r#"collaborators = ["user_b"]"#,
1732                "a.txt": "a-contents",
1733                "b.txt": "b-contents",
1734            }),
1735        )
1736        .await;
1737        let project_a = cx_a.update(|cx| {
1738            Project::local(
1739                client_a.clone(),
1740                client_a.user_store.clone(),
1741                lang_registry.clone(),
1742                fs.clone(),
1743                cx,
1744            )
1745        });
1746        let worktree_a = project_a
1747            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1748            .await
1749            .unwrap();
1750        worktree_a
1751            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1752            .await;
1753        let project_id = project_a
1754            .update(&mut cx_a, |project, _| project.next_remote_id())
1755            .await;
1756        project_a
1757            .update(&mut cx_a, |project, cx| project.share(cx))
1758            .await
1759            .unwrap();
1760
1761        // Join that project as client B
1762        let _project_b = Project::remote(
1763            project_id,
1764            client_b.clone(),
1765            client_b.user_store.clone(),
1766            lang_registry.clone(),
1767            fs.clone(),
1768            &mut cx_b.to_async(),
1769        )
1770        .await
1771        .unwrap();
1772
1773        // See that a guest has joined as client A.
1774        project_a
1775            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1776            .await;
1777
1778        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1779        client_b.disconnect(&cx_b.to_async()).await.unwrap();
1780        project_a
1781            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1782            .await;
1783    }
1784
1785    #[gpui::test]
1786    async fn test_collaborating_with_diagnostics(
1787        mut cx_a: TestAppContext,
1788        mut cx_b: TestAppContext,
1789    ) {
1790        cx_a.foreground().forbid_parking();
1791        let mut lang_registry = Arc::new(LanguageRegistry::new());
1792        let fs = Arc::new(FakeFs::new());
1793
1794        // Set up a fake language server.
1795        let (language_server_config, mut fake_language_server) =
1796            LanguageServerConfig::fake(cx_a.background()).await;
1797        Arc::get_mut(&mut lang_registry)
1798            .unwrap()
1799            .add(Arc::new(Language::new(
1800                LanguageConfig {
1801                    name: "Rust".to_string(),
1802                    path_suffixes: vec!["rs".to_string()],
1803                    language_server: Some(language_server_config),
1804                    ..Default::default()
1805                },
1806                Some(tree_sitter_rust::language()),
1807            )));
1808
1809        // Connect to a server as 2 clients.
1810        let mut server = TestServer::start(cx_a.foreground()).await;
1811        let client_a = server.create_client(&mut cx_a, "user_a").await;
1812        let client_b = server.create_client(&mut cx_b, "user_b").await;
1813
1814        // Share a project as client A
1815        fs.insert_tree(
1816            "/a",
1817            json!({
1818                ".zed.toml": r#"collaborators = ["user_b"]"#,
1819                "a.rs": "let one = two",
1820                "other.rs": "",
1821            }),
1822        )
1823        .await;
1824        let project_a = cx_a.update(|cx| {
1825            Project::local(
1826                client_a.clone(),
1827                client_a.user_store.clone(),
1828                lang_registry.clone(),
1829                fs.clone(),
1830                cx,
1831            )
1832        });
1833        let worktree_a = project_a
1834            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1835            .await
1836            .unwrap();
1837        worktree_a
1838            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1839            .await;
1840        let project_id = project_a
1841            .update(&mut cx_a, |project, _| project.next_remote_id())
1842            .await;
1843        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1844        project_a
1845            .update(&mut cx_a, |project, cx| project.share(cx))
1846            .await
1847            .unwrap();
1848
1849        // Cause the language server to start.
1850        let _ = cx_a
1851            .background()
1852            .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1853                worktree.open_buffer("other.rs", cx)
1854            }))
1855            .await
1856            .unwrap();
1857
1858        // Simulate a language server reporting errors for a file.
1859        fake_language_server
1860            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1861                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1862                version: None,
1863                diagnostics: vec![lsp::Diagnostic {
1864                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1865                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1866                    message: "message 1".to_string(),
1867                    ..Default::default()
1868                }],
1869            })
1870            .await;
1871
1872        // Wait for server to see the diagnostics update.
1873        server
1874            .condition(|store| {
1875                let worktree = store
1876                    .project(project_id)
1877                    .unwrap()
1878                    .worktrees
1879                    .get(&worktree_id.to_proto())
1880                    .unwrap();
1881
1882                !worktree
1883                    .share
1884                    .as_ref()
1885                    .unwrap()
1886                    .diagnostic_summaries
1887                    .is_empty()
1888            })
1889            .await;
1890
1891        // Join the worktree as client B.
1892        let project_b = Project::remote(
1893            project_id,
1894            client_b.clone(),
1895            client_b.user_store.clone(),
1896            lang_registry.clone(),
1897            fs.clone(),
1898            &mut cx_b.to_async(),
1899        )
1900        .await
1901        .unwrap();
1902
1903        project_b.read_with(&cx_b, |project, cx| {
1904            assert_eq!(
1905                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1906                &[(
1907                    ProjectPath {
1908                        worktree_id,
1909                        path: Arc::from(Path::new("a.rs")),
1910                    },
1911                    DiagnosticSummary {
1912                        error_count: 1,
1913                        warning_count: 0,
1914                        ..Default::default()
1915                    },
1916                )]
1917            )
1918        });
1919
1920        // Simulate a language server reporting more errors for a file.
1921        fake_language_server
1922            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1923                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1924                version: None,
1925                diagnostics: vec![
1926                    lsp::Diagnostic {
1927                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1928                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1929                        message: "message 1".to_string(),
1930                        ..Default::default()
1931                    },
1932                    lsp::Diagnostic {
1933                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1934                        range: lsp::Range::new(
1935                            lsp::Position::new(0, 10),
1936                            lsp::Position::new(0, 13),
1937                        ),
1938                        message: "message 2".to_string(),
1939                        ..Default::default()
1940                    },
1941                ],
1942            })
1943            .await;
1944
1945        // Client b gets the updated summaries
1946        project_b
1947            .condition(&cx_b, |project, cx| {
1948                project.diagnostic_summaries(cx).collect::<Vec<_>>()
1949                    == &[(
1950                        ProjectPath {
1951                            worktree_id,
1952                            path: Arc::from(Path::new("a.rs")),
1953                        },
1954                        DiagnosticSummary {
1955                            error_count: 1,
1956                            warning_count: 1,
1957                            ..Default::default()
1958                        },
1959                    )]
1960            })
1961            .await;
1962
1963        // Open the file with the errors on client B. They should be present.
1964        let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1965        let buffer_b = cx_b
1966            .background()
1967            .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1968            .await
1969            .unwrap();
1970
1971        buffer_b.read_with(&cx_b, |buffer, _| {
1972            assert_eq!(
1973                buffer
1974                    .snapshot()
1975                    .diagnostics_in_range::<_, Point>(0..buffer.len())
1976                    .map(|entry| entry)
1977                    .collect::<Vec<_>>(),
1978                &[
1979                    DiagnosticEntry {
1980                        range: Point::new(0, 4)..Point::new(0, 7),
1981                        diagnostic: Diagnostic {
1982                            group_id: 0,
1983                            message: "message 1".to_string(),
1984                            severity: lsp::DiagnosticSeverity::ERROR,
1985                            is_primary: true,
1986                            ..Default::default()
1987                        }
1988                    },
1989                    DiagnosticEntry {
1990                        range: Point::new(0, 10)..Point::new(0, 13),
1991                        diagnostic: Diagnostic {
1992                            group_id: 1,
1993                            severity: lsp::DiagnosticSeverity::WARNING,
1994                            message: "message 2".to_string(),
1995                            is_primary: true,
1996                            ..Default::default()
1997                        }
1998                    }
1999                ]
2000            );
2001        });
2002    }
2003
2004    #[gpui::test]
2005    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2006        cx_a.foreground().forbid_parking();
2007
2008        // Connect to a server as 2 clients.
2009        let mut server = TestServer::start(cx_a.foreground()).await;
2010        let client_a = server.create_client(&mut cx_a, "user_a").await;
2011        let client_b = server.create_client(&mut cx_b, "user_b").await;
2012
2013        // Create an org that includes these 2 users.
2014        let db = &server.app_state.db;
2015        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2016        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2017            .await
2018            .unwrap();
2019        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2020            .await
2021            .unwrap();
2022
2023        // Create a channel that includes all the users.
2024        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2025        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2026            .await
2027            .unwrap();
2028        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2029            .await
2030            .unwrap();
2031        db.create_channel_message(
2032            channel_id,
2033            client_b.current_user_id(&cx_b),
2034            "hello A, it's B.",
2035            OffsetDateTime::now_utc(),
2036            1,
2037        )
2038        .await
2039        .unwrap();
2040
2041        let channels_a = cx_a
2042            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2043        channels_a
2044            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2045            .await;
2046        channels_a.read_with(&cx_a, |list, _| {
2047            assert_eq!(
2048                list.available_channels().unwrap(),
2049                &[ChannelDetails {
2050                    id: channel_id.to_proto(),
2051                    name: "test-channel".to_string()
2052                }]
2053            )
2054        });
2055        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2056            this.get_channel(channel_id.to_proto(), cx).unwrap()
2057        });
2058        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2059        channel_a
2060            .condition(&cx_a, |channel, _| {
2061                channel_messages(channel)
2062                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2063            })
2064            .await;
2065
2066        let channels_b = cx_b
2067            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2068        channels_b
2069            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2070            .await;
2071        channels_b.read_with(&cx_b, |list, _| {
2072            assert_eq!(
2073                list.available_channels().unwrap(),
2074                &[ChannelDetails {
2075                    id: channel_id.to_proto(),
2076                    name: "test-channel".to_string()
2077                }]
2078            )
2079        });
2080
2081        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2082            this.get_channel(channel_id.to_proto(), cx).unwrap()
2083        });
2084        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2085        channel_b
2086            .condition(&cx_b, |channel, _| {
2087                channel_messages(channel)
2088                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2089            })
2090            .await;
2091
2092        channel_a
2093            .update(&mut cx_a, |channel, cx| {
2094                channel
2095                    .send_message("oh, hi B.".to_string(), cx)
2096                    .unwrap()
2097                    .detach();
2098                let task = channel.send_message("sup".to_string(), cx).unwrap();
2099                assert_eq!(
2100                    channel_messages(channel),
2101                    &[
2102                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2103                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2104                        ("user_a".to_string(), "sup".to_string(), true)
2105                    ]
2106                );
2107                task
2108            })
2109            .await
2110            .unwrap();
2111
2112        channel_b
2113            .condition(&cx_b, |channel, _| {
2114                channel_messages(channel)
2115                    == [
2116                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2117                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2118                        ("user_a".to_string(), "sup".to_string(), false),
2119                    ]
2120            })
2121            .await;
2122
2123        assert_eq!(
2124            server
2125                .state()
2126                .await
2127                .channel(channel_id)
2128                .unwrap()
2129                .connection_ids
2130                .len(),
2131            2
2132        );
2133        cx_b.update(|_| drop(channel_b));
2134        server
2135            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2136            .await;
2137
2138        cx_a.update(|_| drop(channel_a));
2139        server
2140            .condition(|state| state.channel(channel_id).is_none())
2141            .await;
2142    }
2143
2144    #[gpui::test]
2145    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2146        cx_a.foreground().forbid_parking();
2147
2148        let mut server = TestServer::start(cx_a.foreground()).await;
2149        let client_a = server.create_client(&mut cx_a, "user_a").await;
2150
2151        let db = &server.app_state.db;
2152        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2153        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2154        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2155            .await
2156            .unwrap();
2157        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2158            .await
2159            .unwrap();
2160
2161        let channels_a = cx_a
2162            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2163        channels_a
2164            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2165            .await;
2166        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2167            this.get_channel(channel_id.to_proto(), cx).unwrap()
2168        });
2169
2170        // Messages aren't allowed to be too long.
2171        channel_a
2172            .update(&mut cx_a, |channel, cx| {
2173                let long_body = "this is long.\n".repeat(1024);
2174                channel.send_message(long_body, cx).unwrap()
2175            })
2176            .await
2177            .unwrap_err();
2178
2179        // Messages aren't allowed to be blank.
2180        channel_a.update(&mut cx_a, |channel, cx| {
2181            channel.send_message(String::new(), cx).unwrap_err()
2182        });
2183
2184        // Leading and trailing whitespace are trimmed.
2185        channel_a
2186            .update(&mut cx_a, |channel, cx| {
2187                channel
2188                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2189                    .unwrap()
2190            })
2191            .await
2192            .unwrap();
2193        assert_eq!(
2194            db.get_channel_messages(channel_id, 10, None)
2195                .await
2196                .unwrap()
2197                .iter()
2198                .map(|m| &m.body)
2199                .collect::<Vec<_>>(),
2200            &["surrounded by whitespace"]
2201        );
2202    }
2203
2204    #[gpui::test]
2205    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2206        cx_a.foreground().forbid_parking();
2207
2208        // Connect to a server as 2 clients.
2209        let mut server = TestServer::start(cx_a.foreground()).await;
2210        let client_a = server.create_client(&mut cx_a, "user_a").await;
2211        let client_b = server.create_client(&mut cx_b, "user_b").await;
2212        let mut status_b = client_b.status();
2213
2214        // Create an org that includes these 2 users.
2215        let db = &server.app_state.db;
2216        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2217        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2218            .await
2219            .unwrap();
2220        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2221            .await
2222            .unwrap();
2223
2224        // Create a channel that includes all the users.
2225        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2226        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2227            .await
2228            .unwrap();
2229        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2230            .await
2231            .unwrap();
2232        db.create_channel_message(
2233            channel_id,
2234            client_b.current_user_id(&cx_b),
2235            "hello A, it's B.",
2236            OffsetDateTime::now_utc(),
2237            2,
2238        )
2239        .await
2240        .unwrap();
2241
2242        let channels_a = cx_a
2243            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2244        channels_a
2245            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2246            .await;
2247
2248        channels_a.read_with(&cx_a, |list, _| {
2249            assert_eq!(
2250                list.available_channels().unwrap(),
2251                &[ChannelDetails {
2252                    id: channel_id.to_proto(),
2253                    name: "test-channel".to_string()
2254                }]
2255            )
2256        });
2257        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2258            this.get_channel(channel_id.to_proto(), cx).unwrap()
2259        });
2260        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2261        channel_a
2262            .condition(&cx_a, |channel, _| {
2263                channel_messages(channel)
2264                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2265            })
2266            .await;
2267
2268        let channels_b = cx_b
2269            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2270        channels_b
2271            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2272            .await;
2273        channels_b.read_with(&cx_b, |list, _| {
2274            assert_eq!(
2275                list.available_channels().unwrap(),
2276                &[ChannelDetails {
2277                    id: channel_id.to_proto(),
2278                    name: "test-channel".to_string()
2279                }]
2280            )
2281        });
2282
2283        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2284            this.get_channel(channel_id.to_proto(), cx).unwrap()
2285        });
2286        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2287        channel_b
2288            .condition(&cx_b, |channel, _| {
2289                channel_messages(channel)
2290                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2291            })
2292            .await;
2293
2294        // Disconnect client B, ensuring we can still access its cached channel data.
2295        server.forbid_connections();
2296        server.disconnect_client(client_b.current_user_id(&cx_b));
2297        while !matches!(
2298            status_b.recv().await,
2299            Some(client::Status::ReconnectionError { .. })
2300        ) {}
2301
2302        channels_b.read_with(&cx_b, |channels, _| {
2303            assert_eq!(
2304                channels.available_channels().unwrap(),
2305                [ChannelDetails {
2306                    id: channel_id.to_proto(),
2307                    name: "test-channel".to_string()
2308                }]
2309            )
2310        });
2311        channel_b.read_with(&cx_b, |channel, _| {
2312            assert_eq!(
2313                channel_messages(channel),
2314                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2315            )
2316        });
2317
2318        // Send a message from client B while it is disconnected.
2319        channel_b
2320            .update(&mut cx_b, |channel, cx| {
2321                let task = channel
2322                    .send_message("can you see this?".to_string(), cx)
2323                    .unwrap();
2324                assert_eq!(
2325                    channel_messages(channel),
2326                    &[
2327                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2328                        ("user_b".to_string(), "can you see this?".to_string(), true)
2329                    ]
2330                );
2331                task
2332            })
2333            .await
2334            .unwrap_err();
2335
2336        // Send a message from client A while B is disconnected.
2337        channel_a
2338            .update(&mut cx_a, |channel, cx| {
2339                channel
2340                    .send_message("oh, hi B.".to_string(), cx)
2341                    .unwrap()
2342                    .detach();
2343                let task = channel.send_message("sup".to_string(), cx).unwrap();
2344                assert_eq!(
2345                    channel_messages(channel),
2346                    &[
2347                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2348                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2349                        ("user_a".to_string(), "sup".to_string(), true)
2350                    ]
2351                );
2352                task
2353            })
2354            .await
2355            .unwrap();
2356
2357        // Give client B a chance to reconnect.
2358        server.allow_connections();
2359        cx_b.foreground().advance_clock(Duration::from_secs(10));
2360
2361        // Verify that B sees the new messages upon reconnection, as well as the message client B
2362        // sent while offline.
2363        channel_b
2364            .condition(&cx_b, |channel, _| {
2365                channel_messages(channel)
2366                    == [
2367                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2368                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2369                        ("user_a".to_string(), "sup".to_string(), false),
2370                        ("user_b".to_string(), "can you see this?".to_string(), false),
2371                    ]
2372            })
2373            .await;
2374
2375        // Ensure client A and B can communicate normally after reconnection.
2376        channel_a
2377            .update(&mut cx_a, |channel, cx| {
2378                channel.send_message("you online?".to_string(), cx).unwrap()
2379            })
2380            .await
2381            .unwrap();
2382        channel_b
2383            .condition(&cx_b, |channel, _| {
2384                channel_messages(channel)
2385                    == [
2386                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2387                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2388                        ("user_a".to_string(), "sup".to_string(), false),
2389                        ("user_b".to_string(), "can you see this?".to_string(), false),
2390                        ("user_a".to_string(), "you online?".to_string(), false),
2391                    ]
2392            })
2393            .await;
2394
2395        channel_b
2396            .update(&mut cx_b, |channel, cx| {
2397                channel.send_message("yep".to_string(), cx).unwrap()
2398            })
2399            .await
2400            .unwrap();
2401        channel_a
2402            .condition(&cx_a, |channel, _| {
2403                channel_messages(channel)
2404                    == [
2405                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2406                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2407                        ("user_a".to_string(), "sup".to_string(), false),
2408                        ("user_b".to_string(), "can you see this?".to_string(), false),
2409                        ("user_a".to_string(), "you online?".to_string(), false),
2410                        ("user_b".to_string(), "yep".to_string(), false),
2411                    ]
2412            })
2413            .await;
2414    }
2415
2416    #[gpui::test]
2417    async fn test_contacts(
2418        mut cx_a: TestAppContext,
2419        mut cx_b: TestAppContext,
2420        mut cx_c: TestAppContext,
2421    ) {
2422        cx_a.foreground().forbid_parking();
2423        let lang_registry = Arc::new(LanguageRegistry::new());
2424        let fs = Arc::new(FakeFs::new());
2425
2426        // Connect to a server as 3 clients.
2427        let mut server = TestServer::start(cx_a.foreground()).await;
2428        let client_a = server.create_client(&mut cx_a, "user_a").await;
2429        let client_b = server.create_client(&mut cx_b, "user_b").await;
2430        let client_c = server.create_client(&mut cx_c, "user_c").await;
2431
2432        // Share a worktree as client A.
2433        fs.insert_tree(
2434            "/a",
2435            json!({
2436                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2437            }),
2438        )
2439        .await;
2440
2441        let project_a = cx_a.update(|cx| {
2442            Project::local(
2443                client_a.clone(),
2444                client_a.user_store.clone(),
2445                lang_registry.clone(),
2446                fs.clone(),
2447                cx,
2448            )
2449        });
2450        let worktree_a = project_a
2451            .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2452            .await
2453            .unwrap();
2454        worktree_a
2455            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2456            .await;
2457
2458        client_a
2459            .user_store
2460            .condition(&cx_a, |user_store, _| {
2461                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2462            })
2463            .await;
2464        client_b
2465            .user_store
2466            .condition(&cx_b, |user_store, _| {
2467                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2468            })
2469            .await;
2470        client_c
2471            .user_store
2472            .condition(&cx_c, |user_store, _| {
2473                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2474            })
2475            .await;
2476
2477        let project_id = project_a
2478            .update(&mut cx_a, |project, _| project.next_remote_id())
2479            .await;
2480        project_a
2481            .update(&mut cx_a, |project, cx| project.share(cx))
2482            .await
2483            .unwrap();
2484
2485        let _project_b = Project::remote(
2486            project_id,
2487            client_b.clone(),
2488            client_b.user_store.clone(),
2489            lang_registry.clone(),
2490            fs.clone(),
2491            &mut cx_b.to_async(),
2492        )
2493        .await
2494        .unwrap();
2495
2496        client_a
2497            .user_store
2498            .condition(&cx_a, |user_store, _| {
2499                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2500            })
2501            .await;
2502        client_b
2503            .user_store
2504            .condition(&cx_b, |user_store, _| {
2505                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2506            })
2507            .await;
2508        client_c
2509            .user_store
2510            .condition(&cx_c, |user_store, _| {
2511                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2512            })
2513            .await;
2514
2515        project_a
2516            .condition(&cx_a, |project, _| {
2517                project.collaborators().contains_key(&client_b.peer_id)
2518            })
2519            .await;
2520
2521        cx_a.update(move |_| drop(project_a));
2522        client_a
2523            .user_store
2524            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2525            .await;
2526        client_b
2527            .user_store
2528            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2529            .await;
2530        client_c
2531            .user_store
2532            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2533            .await;
2534
2535        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2536            user_store
2537                .contacts()
2538                .iter()
2539                .map(|contact| {
2540                    let worktrees = contact
2541                        .projects
2542                        .iter()
2543                        .map(|p| {
2544                            (
2545                                p.worktree_root_names[0].as_str(),
2546                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2547                            )
2548                        })
2549                        .collect();
2550                    (contact.user.github_login.as_str(), worktrees)
2551                })
2552                .collect()
2553        }
2554    }
2555
2556    struct TestServer {
2557        peer: Arc<Peer>,
2558        app_state: Arc<AppState>,
2559        server: Arc<Server>,
2560        foreground: Rc<executor::Foreground>,
2561        notifications: mpsc::Receiver<()>,
2562        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2563        forbid_connections: Arc<AtomicBool>,
2564        _test_db: TestDb,
2565    }
2566
2567    impl TestServer {
2568        async fn start(foreground: Rc<executor::Foreground>) -> Self {
2569            let test_db = TestDb::new();
2570            let app_state = Self::build_app_state(&test_db).await;
2571            let peer = Peer::new();
2572            let notifications = mpsc::channel(128);
2573            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2574            Self {
2575                peer,
2576                app_state,
2577                server,
2578                foreground,
2579                notifications: notifications.1,
2580                connection_killers: Default::default(),
2581                forbid_connections: Default::default(),
2582                _test_db: test_db,
2583            }
2584        }
2585
2586        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2587            let http = FakeHttpClient::with_404_response();
2588            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2589            let client_name = name.to_string();
2590            let mut client = Client::new(http.clone());
2591            let server = self.server.clone();
2592            let connection_killers = self.connection_killers.clone();
2593            let forbid_connections = self.forbid_connections.clone();
2594            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2595
2596            Arc::get_mut(&mut client)
2597                .unwrap()
2598                .override_authenticate(move |cx| {
2599                    cx.spawn(|_| async move {
2600                        let access_token = "the-token".to_string();
2601                        Ok(Credentials {
2602                            user_id: user_id.0 as u64,
2603                            access_token,
2604                        })
2605                    })
2606                })
2607                .override_establish_connection(move |credentials, cx| {
2608                    assert_eq!(credentials.user_id, user_id.0 as u64);
2609                    assert_eq!(credentials.access_token, "the-token");
2610
2611                    let server = server.clone();
2612                    let connection_killers = connection_killers.clone();
2613                    let forbid_connections = forbid_connections.clone();
2614                    let client_name = client_name.clone();
2615                    let connection_id_tx = connection_id_tx.clone();
2616                    cx.spawn(move |cx| async move {
2617                        if forbid_connections.load(SeqCst) {
2618                            Err(EstablishConnectionError::other(anyhow!(
2619                                "server is forbidding connections"
2620                            )))
2621                        } else {
2622                            let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2623                            connection_killers.lock().insert(user_id, kill_conn);
2624                            cx.background()
2625                                .spawn(server.handle_connection(
2626                                    server_conn,
2627                                    client_name,
2628                                    user_id,
2629                                    Some(connection_id_tx),
2630                                ))
2631                                .detach();
2632                            Ok(client_conn)
2633                        }
2634                    })
2635                });
2636
2637            client
2638                .authenticate_and_connect(&cx.to_async())
2639                .await
2640                .unwrap();
2641
2642            let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2643            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2644            let mut authed_user =
2645                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2646            while authed_user.recv().await.unwrap().is_none() {}
2647
2648            TestClient {
2649                client,
2650                peer_id,
2651                user_store,
2652            }
2653        }
2654
2655        fn disconnect_client(&self, user_id: UserId) {
2656            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2657                let _ = kill_conn.try_send(Some(()));
2658            }
2659        }
2660
2661        fn forbid_connections(&self) {
2662            self.forbid_connections.store(true, SeqCst);
2663        }
2664
2665        fn allow_connections(&self) {
2666            self.forbid_connections.store(false, SeqCst);
2667        }
2668
2669        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2670            let mut config = Config::default();
2671            config.session_secret = "a".repeat(32);
2672            config.database_url = test_db.url.clone();
2673            let github_client = github::AppClient::test();
2674            Arc::new(AppState {
2675                db: test_db.db().clone(),
2676                handlebars: Default::default(),
2677                auth_client: auth::build_client("", ""),
2678                repo_client: github::RepoClient::test(&github_client),
2679                github_client,
2680                config,
2681            })
2682        }
2683
2684        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2685            self.server.store.read()
2686        }
2687
2688        async fn condition<F>(&mut self, mut predicate: F)
2689        where
2690            F: FnMut(&Store) -> bool,
2691        {
2692            async_std::future::timeout(Duration::from_millis(500), async {
2693                while !(predicate)(&*self.server.store.read()) {
2694                    self.foreground.start_waiting();
2695                    self.notifications.recv().await;
2696                    self.foreground.finish_waiting();
2697                }
2698            })
2699            .await
2700            .expect("condition timed out");
2701        }
2702    }
2703
2704    impl Drop for TestServer {
2705        fn drop(&mut self) {
2706            task::block_on(self.peer.reset());
2707        }
2708    }
2709
2710    struct TestClient {
2711        client: Arc<Client>,
2712        pub peer_id: PeerId,
2713        pub user_store: ModelHandle<UserStore>,
2714    }
2715
2716    impl Deref for TestClient {
2717        type Target = Arc<Client>;
2718
2719        fn deref(&self) -> &Self::Target {
2720            &self.client
2721        }
2722    }
2723
2724    impl TestClient {
2725        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2726            UserId::from_proto(
2727                self.user_store
2728                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2729            )
2730        }
2731    }
2732
2733    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2734        channel
2735            .messages()
2736            .cursor::<()>()
2737            .map(|m| {
2738                (
2739                    m.sender.github_login.clone(),
2740                    m.body.clone(),
2741                    m.is_pending(),
2742                )
2743            })
2744            .collect()
2745    }
2746
2747    struct EmptyView;
2748
2749    impl gpui::Entity for EmptyView {
2750        type Event = ();
2751    }
2752
2753    impl gpui::View for EmptyView {
2754        fn ui_name() -> &'static str {
2755            "empty view"
2756        }
2757
2758        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2759            gpui::Element::boxed(gpui::elements::Empty)
2760        }
2761    }
2762}