rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_channels)
  87            .add_handler(Server::get_users)
  88            .add_handler(Server::join_channel)
  89            .add_handler(Server::leave_channel)
  90            .add_handler(Server::send_channel_message)
  91            .add_handler(Server::get_channel_messages);
  92
  93        Arc::new(server)
  94    }
  95
  96    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  97    where
  98        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
  99        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 100        M: EnvelopedMessage,
 101    {
 102        let prev_handler = self.handlers.insert(
 103            TypeId::of::<M>(),
 104            Box::new(move |server, envelope| {
 105                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 106                (handler)(server, *envelope).boxed()
 107            }),
 108        );
 109        if prev_handler.is_some() {
 110            panic!("registered a handler for the same message twice");
 111        }
 112        self
 113    }
 114
 115    pub fn handle_connection(
 116        self: &Arc<Self>,
 117        connection: Connection,
 118        addr: String,
 119        user_id: UserId,
 120        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 121    ) -> impl Future<Output = ()> {
 122        let mut this = self.clone();
 123        async move {
 124            let (connection_id, handle_io, mut incoming_rx) =
 125                this.peer.add_connection(connection).await;
 126
 127            if let Some(send_connection_id) = send_connection_id.as_mut() {
 128                let _ = send_connection_id.send(connection_id).await;
 129            }
 130
 131            this.state_mut().add_connection(connection_id, user_id);
 132            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 133                log::error!("error updating contacts for {:?}: {}", user_id, err);
 134            }
 135
 136            let handle_io = handle_io.fuse();
 137            futures::pin_mut!(handle_io);
 138            loop {
 139                let next_message = incoming_rx.next().fuse();
 140                futures::pin_mut!(next_message);
 141                futures::select_biased! {
 142                    message = next_message => {
 143                        if let Some(message) = message {
 144                            let start_time = Instant::now();
 145                            log::info!("RPC message received: {}", message.payload_type_name());
 146                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 147                                if let Err(err) = (handler)(this.clone(), message).await {
 148                                    log::error!("error handling message: {:?}", err);
 149                                } else {
 150                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 151                                }
 152
 153                                if let Some(mut notifications) = this.notifications.clone() {
 154                                    let _ = notifications.send(()).await;
 155                                }
 156                            } else {
 157                                log::warn!("unhandled message: {}", message.payload_type_name());
 158                            }
 159                        } else {
 160                            log::info!("rpc connection closed {:?}", addr);
 161                            break;
 162                        }
 163                    }
 164                    handle_io = handle_io => {
 165                        if let Err(err) = handle_io {
 166                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 167                        }
 168                        break;
 169                    }
 170                }
 171            }
 172
 173            if let Err(err) = this.sign_out(connection_id).await {
 174                log::error!("error signing out connection {:?} - {:?}", addr, err);
 175            }
 176        }
 177    }
 178
 179    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 180        self.peer.disconnect(connection_id);
 181        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 182
 183        for (project_id, project) in removed_connection.hosted_projects {
 184            if let Some(share) = project.share {
 185                broadcast(
 186                    connection_id,
 187                    share.guests.keys().copied().collect(),
 188                    |conn_id| {
 189                        self.peer
 190                            .send(conn_id, proto::UnshareProject { project_id })
 191                    },
 192                )
 193                .await?;
 194            }
 195        }
 196
 197        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 198            broadcast(connection_id, peer_ids, |conn_id| {
 199                self.peer.send(
 200                    conn_id,
 201                    proto::RemoveProjectCollaborator {
 202                        project_id,
 203                        peer_id: connection_id.0,
 204                    },
 205                )
 206            })
 207            .await?;
 208        }
 209
 210        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 211            .await?;
 212
 213        Ok(())
 214    }
 215
 216    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 217        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 218        Ok(())
 219    }
 220
 221    async fn register_project(
 222        mut self: Arc<Server>,
 223        request: TypedEnvelope<proto::RegisterProject>,
 224    ) -> tide::Result<()> {
 225        let project_id = {
 226            let mut state = self.state_mut();
 227            let user_id = state.user_id_for_connection(request.sender_id)?;
 228            state.register_project(request.sender_id, user_id)
 229        };
 230        self.peer
 231            .respond(
 232                request.receipt(),
 233                proto::RegisterProjectResponse { project_id },
 234            )
 235            .await?;
 236        Ok(())
 237    }
 238
 239    async fn unregister_project(
 240        mut self: Arc<Server>,
 241        request: TypedEnvelope<proto::UnregisterProject>,
 242    ) -> tide::Result<()> {
 243        let project = self
 244            .state_mut()
 245            .unregister_project(request.payload.project_id, request.sender_id)
 246            .ok_or_else(|| anyhow!("no such project"))?;
 247        self.update_contacts_for_users(project.authorized_user_ids().iter())
 248            .await?;
 249        Ok(())
 250    }
 251
 252    async fn share_project(
 253        mut self: Arc<Server>,
 254        request: TypedEnvelope<proto::ShareProject>,
 255    ) -> tide::Result<()> {
 256        self.state_mut()
 257            .share_project(request.payload.project_id, request.sender_id);
 258        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 259        Ok(())
 260    }
 261
 262    async fn unshare_project(
 263        mut self: Arc<Server>,
 264        request: TypedEnvelope<proto::UnshareProject>,
 265    ) -> tide::Result<()> {
 266        let project_id = request.payload.project_id;
 267        let project = self
 268            .state_mut()
 269            .unshare_project(project_id, request.sender_id)?;
 270
 271        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 272            self.peer
 273                .send(conn_id, proto::UnshareProject { project_id })
 274        })
 275        .await?;
 276        self.update_contacts_for_users(&project.authorized_user_ids)
 277            .await?;
 278
 279        Ok(())
 280    }
 281
 282    async fn join_project(
 283        mut self: Arc<Server>,
 284        request: TypedEnvelope<proto::JoinProject>,
 285    ) -> tide::Result<()> {
 286        let project_id = request.payload.project_id;
 287
 288        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 289        let response_data = self
 290            .state_mut()
 291            .join_project(request.sender_id, user_id, project_id)
 292            .and_then(|joined| {
 293                let share = joined.project.share()?;
 294                let peer_count = share.guests.len();
 295                let mut collaborators = Vec::with_capacity(peer_count);
 296                collaborators.push(proto::Collaborator {
 297                    peer_id: joined.project.host_connection_id.0,
 298                    replica_id: 0,
 299                    user_id: joined.project.host_user_id.to_proto(),
 300                });
 301                let worktrees = joined
 302                    .project
 303                    .worktrees
 304                    .iter()
 305                    .filter_map(|(id, worktree)| {
 306                        worktree.share.as_ref().map(|share| proto::Worktree {
 307                            id: *id,
 308                            root_name: worktree.root_name.clone(),
 309                            entries: share.entries.values().cloned().collect(),
 310                            diagnostic_summaries: share
 311                                .diagnostic_summaries
 312                                .values()
 313                                .cloned()
 314                                .collect(),
 315                            weak: worktree.weak,
 316                        })
 317                    })
 318                    .collect();
 319                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 320                    if *peer_conn_id != request.sender_id {
 321                        collaborators.push(proto::Collaborator {
 322                            peer_id: peer_conn_id.0,
 323                            replica_id: *peer_replica_id as u32,
 324                            user_id: peer_user_id.to_proto(),
 325                        });
 326                    }
 327                }
 328                let response = proto::JoinProjectResponse {
 329                    worktrees,
 330                    replica_id: joined.replica_id as u32,
 331                    collaborators,
 332                };
 333                let connection_ids = joined.project.connection_ids();
 334                let contact_user_ids = joined.project.authorized_user_ids();
 335                Ok((response, connection_ids, contact_user_ids))
 336            });
 337
 338        match response_data {
 339            Ok((response, connection_ids, contact_user_ids)) => {
 340                broadcast(request.sender_id, connection_ids, |conn_id| {
 341                    self.peer.send(
 342                        conn_id,
 343                        proto::AddProjectCollaborator {
 344                            project_id: project_id,
 345                            collaborator: Some(proto::Collaborator {
 346                                peer_id: request.sender_id.0,
 347                                replica_id: response.replica_id,
 348                                user_id: user_id.to_proto(),
 349                            }),
 350                        },
 351                    )
 352                })
 353                .await?;
 354                self.peer.respond(request.receipt(), response).await?;
 355                self.update_contacts_for_users(&contact_user_ids).await?;
 356            }
 357            Err(error) => {
 358                self.peer
 359                    .respond_with_error(
 360                        request.receipt(),
 361                        proto::Error {
 362                            message: error.to_string(),
 363                        },
 364                    )
 365                    .await?;
 366            }
 367        }
 368
 369        Ok(())
 370    }
 371
 372    async fn leave_project(
 373        mut self: Arc<Server>,
 374        request: TypedEnvelope<proto::LeaveProject>,
 375    ) -> tide::Result<()> {
 376        let sender_id = request.sender_id;
 377        let project_id = request.payload.project_id;
 378        let worktree = self.state_mut().leave_project(sender_id, project_id);
 379        if let Some(worktree) = worktree {
 380            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 381                self.peer.send(
 382                    conn_id,
 383                    proto::RemoveProjectCollaborator {
 384                        project_id,
 385                        peer_id: sender_id.0,
 386                    },
 387                )
 388            })
 389            .await?;
 390            self.update_contacts_for_users(&worktree.authorized_user_ids)
 391                .await?;
 392        }
 393        Ok(())
 394    }
 395
 396    async fn register_worktree(
 397        mut self: Arc<Server>,
 398        request: TypedEnvelope<proto::RegisterWorktree>,
 399    ) -> tide::Result<()> {
 400        let receipt = request.receipt();
 401        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 402
 403        let mut contact_user_ids = HashSet::default();
 404        contact_user_ids.insert(host_user_id);
 405        for github_login in request.payload.authorized_logins {
 406            match self.app_state.db.create_user(&github_login, false).await {
 407                Ok(contact_user_id) => {
 408                    contact_user_ids.insert(contact_user_id);
 409                }
 410                Err(err) => {
 411                    let message = err.to_string();
 412                    self.peer
 413                        .respond_with_error(receipt, proto::Error { message })
 414                        .await?;
 415                    return Ok(());
 416                }
 417            }
 418        }
 419
 420        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 421        let ok = self.state_mut().register_worktree(
 422            request.payload.project_id,
 423            request.payload.worktree_id,
 424            Worktree {
 425                authorized_user_ids: contact_user_ids.clone(),
 426                root_name: request.payload.root_name,
 427                share: None,
 428                weak: false,
 429            },
 430        );
 431
 432        if ok {
 433            self.peer.respond(receipt, proto::Ack {}).await?;
 434            self.update_contacts_for_users(&contact_user_ids).await?;
 435        } else {
 436            self.peer
 437                .respond_with_error(
 438                    receipt,
 439                    proto::Error {
 440                        message: NO_SUCH_PROJECT.to_string(),
 441                    },
 442                )
 443                .await?;
 444        }
 445
 446        Ok(())
 447    }
 448
 449    async fn unregister_worktree(
 450        mut self: Arc<Server>,
 451        request: TypedEnvelope<proto::UnregisterWorktree>,
 452    ) -> tide::Result<()> {
 453        let project_id = request.payload.project_id;
 454        let worktree_id = request.payload.worktree_id;
 455        let (worktree, guest_connection_ids) =
 456            self.state_mut()
 457                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 458
 459        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 460            self.peer.send(
 461                conn_id,
 462                proto::UnregisterWorktree {
 463                    project_id,
 464                    worktree_id,
 465                },
 466            )
 467        })
 468        .await?;
 469        self.update_contacts_for_users(&worktree.authorized_user_ids)
 470            .await?;
 471        Ok(())
 472    }
 473
 474    async fn share_worktree(
 475        mut self: Arc<Server>,
 476        mut request: TypedEnvelope<proto::ShareWorktree>,
 477    ) -> tide::Result<()> {
 478        let worktree = request
 479            .payload
 480            .worktree
 481            .as_mut()
 482            .ok_or_else(|| anyhow!("missing worktree"))?;
 483        let entries = worktree
 484            .entries
 485            .iter()
 486            .map(|entry| (entry.id, entry.clone()))
 487            .collect();
 488        let diagnostic_summaries = worktree
 489            .diagnostic_summaries
 490            .iter()
 491            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 492            .collect();
 493
 494        let shared_worktree = self.state_mut().share_worktree(
 495            request.payload.project_id,
 496            worktree.id,
 497            request.sender_id,
 498            entries,
 499            diagnostic_summaries,
 500        );
 501        if let Some(shared_worktree) = shared_worktree {
 502            broadcast(
 503                request.sender_id,
 504                shared_worktree.connection_ids,
 505                |connection_id| {
 506                    self.peer.forward_send(
 507                        request.sender_id,
 508                        connection_id,
 509                        request.payload.clone(),
 510                    )
 511                },
 512            )
 513            .await?;
 514            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 515            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
 516                .await?;
 517        } else {
 518            self.peer
 519                .respond_with_error(
 520                    request.receipt(),
 521                    proto::Error {
 522                        message: "no such worktree".to_string(),
 523                    },
 524                )
 525                .await?;
 526        }
 527        Ok(())
 528    }
 529
 530    async fn update_worktree(
 531        mut self: Arc<Server>,
 532        request: TypedEnvelope<proto::UpdateWorktree>,
 533    ) -> tide::Result<()> {
 534        let connection_ids = self
 535            .state_mut()
 536            .update_worktree(
 537                request.sender_id,
 538                request.payload.project_id,
 539                request.payload.worktree_id,
 540                &request.payload.removed_entries,
 541                &request.payload.updated_entries,
 542            )
 543            .ok_or_else(|| anyhow!("no such worktree"))?;
 544
 545        broadcast(request.sender_id, connection_ids, |connection_id| {
 546            self.peer
 547                .forward_send(request.sender_id, connection_id, request.payload.clone())
 548        })
 549        .await?;
 550
 551        Ok(())
 552    }
 553
 554    async fn update_diagnostic_summary(
 555        mut self: Arc<Server>,
 556        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 557    ) -> tide::Result<()> {
 558        let receiver_ids = request
 559            .payload
 560            .summary
 561            .clone()
 562            .and_then(|summary| {
 563                self.state_mut().update_diagnostic_summary(
 564                    request.payload.project_id,
 565                    request.payload.worktree_id,
 566                    request.sender_id,
 567                    summary,
 568                )
 569            })
 570            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 571
 572        broadcast(request.sender_id, receiver_ids, |connection_id| {
 573            self.peer
 574                .forward_send(request.sender_id, connection_id, request.payload.clone())
 575        })
 576        .await?;
 577        Ok(())
 578    }
 579
 580    async fn disk_based_diagnostics_updating(
 581        self: Arc<Server>,
 582        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 583    ) -> tide::Result<()> {
 584        let receiver_ids = self
 585            .state()
 586            .project_connection_ids(request.payload.project_id, request.sender_id)
 587            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 588        broadcast(request.sender_id, receiver_ids, |connection_id| {
 589            self.peer
 590                .forward_send(request.sender_id, connection_id, request.payload.clone())
 591        })
 592        .await?;
 593        Ok(())
 594    }
 595
 596    async fn disk_based_diagnostics_updated(
 597        self: Arc<Server>,
 598        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 599    ) -> tide::Result<()> {
 600        let receiver_ids = self
 601            .state()
 602            .project_connection_ids(request.payload.project_id, request.sender_id)
 603            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 604        broadcast(request.sender_id, receiver_ids, |connection_id| {
 605            self.peer
 606                .forward_send(request.sender_id, connection_id, request.payload.clone())
 607        })
 608        .await?;
 609        Ok(())
 610    }
 611
 612    async fn get_definition(
 613        self: Arc<Server>,
 614        request: TypedEnvelope<proto::GetDefinition>,
 615    ) -> tide::Result<()> {
 616        let receipt = request.receipt();
 617        let host_connection_id = self
 618            .state()
 619            .read_project(request.payload.project_id, request.sender_id)
 620            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 621            .host_connection_id;
 622        let response = self
 623            .peer
 624            .forward_request(request.sender_id, host_connection_id, request.payload)
 625            .await?;
 626        self.peer.respond(receipt, response).await?;
 627        Ok(())
 628    }
 629
 630    async fn open_buffer(
 631        self: Arc<Server>,
 632        request: TypedEnvelope<proto::OpenBuffer>,
 633    ) -> tide::Result<()> {
 634        let receipt = request.receipt();
 635        let host_connection_id = self
 636            .state()
 637            .read_project(request.payload.project_id, request.sender_id)
 638            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 639            .host_connection_id;
 640        let response = self
 641            .peer
 642            .forward_request(request.sender_id, host_connection_id, request.payload)
 643            .await?;
 644        self.peer.respond(receipt, response).await?;
 645        Ok(())
 646    }
 647
 648    async fn close_buffer(
 649        self: Arc<Server>,
 650        request: TypedEnvelope<proto::CloseBuffer>,
 651    ) -> tide::Result<()> {
 652        let host_connection_id = self
 653            .state()
 654            .read_project(request.payload.project_id, request.sender_id)
 655            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 656            .host_connection_id;
 657        self.peer
 658            .forward_send(request.sender_id, host_connection_id, request.payload)
 659            .await?;
 660        Ok(())
 661    }
 662
 663    async fn save_buffer(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::SaveBuffer>,
 666    ) -> tide::Result<()> {
 667        let host;
 668        let guests;
 669        {
 670            let state = self.state();
 671            let project = state
 672                .read_project(request.payload.project_id, request.sender_id)
 673                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 674            host = project.host_connection_id;
 675            guests = project.guest_connection_ids()
 676        }
 677
 678        let sender = request.sender_id;
 679        let receipt = request.receipt();
 680        let response = self
 681            .peer
 682            .forward_request(sender, host, request.payload.clone())
 683            .await?;
 684
 685        broadcast(host, guests, |conn_id| {
 686            let response = response.clone();
 687            let peer = &self.peer;
 688            async move {
 689                if conn_id == sender {
 690                    peer.respond(receipt, response).await
 691                } else {
 692                    peer.forward_send(host, conn_id, response).await
 693                }
 694            }
 695        })
 696        .await?;
 697
 698        Ok(())
 699    }
 700
 701    async fn format_buffer(
 702        self: Arc<Server>,
 703        request: TypedEnvelope<proto::FormatBuffer>,
 704    ) -> tide::Result<()> {
 705        let host;
 706        {
 707            let state = self.state();
 708            let project = state
 709                .read_project(request.payload.project_id, request.sender_id)
 710                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 711            host = project.host_connection_id;
 712        }
 713
 714        let sender = request.sender_id;
 715        let receipt = request.receipt();
 716        let response = self
 717            .peer
 718            .forward_request(sender, host, request.payload.clone())
 719            .await?;
 720        self.peer.respond(receipt, response).await?;
 721
 722        Ok(())
 723    }
 724
 725    async fn update_buffer(
 726        self: Arc<Server>,
 727        request: TypedEnvelope<proto::UpdateBuffer>,
 728    ) -> tide::Result<()> {
 729        let receiver_ids = self
 730            .state()
 731            .project_connection_ids(request.payload.project_id, request.sender_id)
 732            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 733        broadcast(request.sender_id, receiver_ids, |connection_id| {
 734            self.peer
 735                .forward_send(request.sender_id, connection_id, request.payload.clone())
 736        })
 737        .await?;
 738        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 739        Ok(())
 740    }
 741
 742    async fn update_buffer_file(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::UpdateBufferFile>,
 745    ) -> tide::Result<()> {
 746        let receiver_ids = self
 747            .state()
 748            .project_connection_ids(request.payload.project_id, request.sender_id)
 749            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 750        broadcast(request.sender_id, receiver_ids, |connection_id| {
 751            self.peer
 752                .forward_send(request.sender_id, connection_id, request.payload.clone())
 753        })
 754        .await?;
 755        Ok(())
 756    }
 757
 758    async fn buffer_reloaded(
 759        self: Arc<Server>,
 760        request: TypedEnvelope<proto::BufferReloaded>,
 761    ) -> tide::Result<()> {
 762        let receiver_ids = self
 763            .state()
 764            .project_connection_ids(request.payload.project_id, request.sender_id)
 765            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 766        broadcast(request.sender_id, receiver_ids, |connection_id| {
 767            self.peer
 768                .forward_send(request.sender_id, connection_id, request.payload.clone())
 769        })
 770        .await?;
 771        Ok(())
 772    }
 773
 774    async fn buffer_saved(
 775        self: Arc<Server>,
 776        request: TypedEnvelope<proto::BufferSaved>,
 777    ) -> tide::Result<()> {
 778        let receiver_ids = self
 779            .state()
 780            .project_connection_ids(request.payload.project_id, request.sender_id)
 781            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 782        broadcast(request.sender_id, receiver_ids, |connection_id| {
 783            self.peer
 784                .forward_send(request.sender_id, connection_id, request.payload.clone())
 785        })
 786        .await?;
 787        Ok(())
 788    }
 789
 790    async fn get_channels(
 791        self: Arc<Server>,
 792        request: TypedEnvelope<proto::GetChannels>,
 793    ) -> tide::Result<()> {
 794        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 795        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 796        self.peer
 797            .respond(
 798                request.receipt(),
 799                proto::GetChannelsResponse {
 800                    channels: channels
 801                        .into_iter()
 802                        .map(|chan| proto::Channel {
 803                            id: chan.id.to_proto(),
 804                            name: chan.name,
 805                        })
 806                        .collect(),
 807                },
 808            )
 809            .await?;
 810        Ok(())
 811    }
 812
 813    async fn get_users(
 814        self: Arc<Server>,
 815        request: TypedEnvelope<proto::GetUsers>,
 816    ) -> tide::Result<()> {
 817        let receipt = request.receipt();
 818        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 819        let users = self
 820            .app_state
 821            .db
 822            .get_users_by_ids(user_ids)
 823            .await?
 824            .into_iter()
 825            .map(|user| proto::User {
 826                id: user.id.to_proto(),
 827                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 828                github_login: user.github_login,
 829            })
 830            .collect();
 831        self.peer
 832            .respond(receipt, proto::GetUsersResponse { users })
 833            .await?;
 834        Ok(())
 835    }
 836
 837    async fn update_contacts_for_users<'a>(
 838        self: &Arc<Server>,
 839        user_ids: impl IntoIterator<Item = &'a UserId>,
 840    ) -> tide::Result<()> {
 841        let mut send_futures = Vec::new();
 842
 843        {
 844            let state = self.state();
 845            for user_id in user_ids {
 846                let contacts = state.contacts_for_user(*user_id);
 847                for connection_id in state.connection_ids_for_user(*user_id) {
 848                    send_futures.push(self.peer.send(
 849                        connection_id,
 850                        proto::UpdateContacts {
 851                            contacts: contacts.clone(),
 852                        },
 853                    ));
 854                }
 855            }
 856        }
 857        futures::future::try_join_all(send_futures).await?;
 858
 859        Ok(())
 860    }
 861
 862    async fn join_channel(
 863        mut self: Arc<Self>,
 864        request: TypedEnvelope<proto::JoinChannel>,
 865    ) -> tide::Result<()> {
 866        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 867        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 868        if !self
 869            .app_state
 870            .db
 871            .can_user_access_channel(user_id, channel_id)
 872            .await?
 873        {
 874            Err(anyhow!("access denied"))?;
 875        }
 876
 877        self.state_mut().join_channel(request.sender_id, channel_id);
 878        let messages = self
 879            .app_state
 880            .db
 881            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 882            .await?
 883            .into_iter()
 884            .map(|msg| proto::ChannelMessage {
 885                id: msg.id.to_proto(),
 886                body: msg.body,
 887                timestamp: msg.sent_at.unix_timestamp() as u64,
 888                sender_id: msg.sender_id.to_proto(),
 889                nonce: Some(msg.nonce.as_u128().into()),
 890            })
 891            .collect::<Vec<_>>();
 892        self.peer
 893            .respond(
 894                request.receipt(),
 895                proto::JoinChannelResponse {
 896                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 897                    messages,
 898                },
 899            )
 900            .await?;
 901        Ok(())
 902    }
 903
 904    async fn leave_channel(
 905        mut self: Arc<Self>,
 906        request: TypedEnvelope<proto::LeaveChannel>,
 907    ) -> tide::Result<()> {
 908        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 909        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 910        if !self
 911            .app_state
 912            .db
 913            .can_user_access_channel(user_id, channel_id)
 914            .await?
 915        {
 916            Err(anyhow!("access denied"))?;
 917        }
 918
 919        self.state_mut()
 920            .leave_channel(request.sender_id, channel_id);
 921
 922        Ok(())
 923    }
 924
 925    async fn send_channel_message(
 926        self: Arc<Self>,
 927        request: TypedEnvelope<proto::SendChannelMessage>,
 928    ) -> tide::Result<()> {
 929        let receipt = request.receipt();
 930        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 931        let user_id;
 932        let connection_ids;
 933        {
 934            let state = self.state();
 935            user_id = state.user_id_for_connection(request.sender_id)?;
 936            if let Some(ids) = state.channel_connection_ids(channel_id) {
 937                connection_ids = ids;
 938            } else {
 939                return Ok(());
 940            }
 941        }
 942
 943        // Validate the message body.
 944        let body = request.payload.body.trim().to_string();
 945        if body.len() > MAX_MESSAGE_LEN {
 946            self.peer
 947                .respond_with_error(
 948                    receipt,
 949                    proto::Error {
 950                        message: "message is too long".to_string(),
 951                    },
 952                )
 953                .await?;
 954            return Ok(());
 955        }
 956        if body.is_empty() {
 957            self.peer
 958                .respond_with_error(
 959                    receipt,
 960                    proto::Error {
 961                        message: "message can't be blank".to_string(),
 962                    },
 963                )
 964                .await?;
 965            return Ok(());
 966        }
 967
 968        let timestamp = OffsetDateTime::now_utc();
 969        let nonce = if let Some(nonce) = request.payload.nonce {
 970            nonce
 971        } else {
 972            self.peer
 973                .respond_with_error(
 974                    receipt,
 975                    proto::Error {
 976                        message: "nonce can't be blank".to_string(),
 977                    },
 978                )
 979                .await?;
 980            return Ok(());
 981        };
 982
 983        let message_id = self
 984            .app_state
 985            .db
 986            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 987            .await?
 988            .to_proto();
 989        let message = proto::ChannelMessage {
 990            sender_id: user_id.to_proto(),
 991            id: message_id,
 992            body,
 993            timestamp: timestamp.unix_timestamp() as u64,
 994            nonce: Some(nonce),
 995        };
 996        broadcast(request.sender_id, connection_ids, |conn_id| {
 997            self.peer.send(
 998                conn_id,
 999                proto::ChannelMessageSent {
1000                    channel_id: channel_id.to_proto(),
1001                    message: Some(message.clone()),
1002                },
1003            )
1004        })
1005        .await?;
1006        self.peer
1007            .respond(
1008                receipt,
1009                proto::SendChannelMessageResponse {
1010                    message: Some(message),
1011                },
1012            )
1013            .await?;
1014        Ok(())
1015    }
1016
1017    async fn get_channel_messages(
1018        self: Arc<Self>,
1019        request: TypedEnvelope<proto::GetChannelMessages>,
1020    ) -> tide::Result<()> {
1021        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1022        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1023        if !self
1024            .app_state
1025            .db
1026            .can_user_access_channel(user_id, channel_id)
1027            .await?
1028        {
1029            Err(anyhow!("access denied"))?;
1030        }
1031
1032        let messages = self
1033            .app_state
1034            .db
1035            .get_channel_messages(
1036                channel_id,
1037                MESSAGE_COUNT_PER_PAGE,
1038                Some(MessageId::from_proto(request.payload.before_message_id)),
1039            )
1040            .await?
1041            .into_iter()
1042            .map(|msg| proto::ChannelMessage {
1043                id: msg.id.to_proto(),
1044                body: msg.body,
1045                timestamp: msg.sent_at.unix_timestamp() as u64,
1046                sender_id: msg.sender_id.to_proto(),
1047                nonce: Some(msg.nonce.as_u128().into()),
1048            })
1049            .collect::<Vec<_>>();
1050        self.peer
1051            .respond(
1052                request.receipt(),
1053                proto::GetChannelMessagesResponse {
1054                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1055                    messages,
1056                },
1057            )
1058            .await?;
1059        Ok(())
1060    }
1061
1062    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1063        self.store.read()
1064    }
1065
1066    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1067        self.store.write()
1068    }
1069}
1070
1071pub async fn broadcast<F, T>(
1072    sender_id: ConnectionId,
1073    receiver_ids: Vec<ConnectionId>,
1074    mut f: F,
1075) -> anyhow::Result<()>
1076where
1077    F: FnMut(ConnectionId) -> T,
1078    T: Future<Output = anyhow::Result<()>>,
1079{
1080    let futures = receiver_ids
1081        .into_iter()
1082        .filter(|id| *id != sender_id)
1083        .map(|id| f(id));
1084    futures::future::try_join_all(futures).await?;
1085    Ok(())
1086}
1087
1088pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1089    let server = Server::new(app.state().clone(), rpc.clone(), None);
1090    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1091        let server = server.clone();
1092        async move {
1093            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1094
1095            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1096            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1097            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1098            let client_protocol_version: Option<u32> = request
1099                .header("X-Zed-Protocol-Version")
1100                .and_then(|v| v.as_str().parse().ok());
1101
1102            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1103                return Ok(Response::new(StatusCode::UpgradeRequired));
1104            }
1105
1106            let header = match request.header("Sec-Websocket-Key") {
1107                Some(h) => h.as_str(),
1108                None => return Err(anyhow!("expected sec-websocket-key"))?,
1109            };
1110
1111            let user_id = process_auth_header(&request).await?;
1112
1113            let mut response = Response::new(StatusCode::SwitchingProtocols);
1114            response.insert_header(UPGRADE, "websocket");
1115            response.insert_header(CONNECTION, "Upgrade");
1116            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1117            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1118            response.insert_header("Sec-Websocket-Version", "13");
1119
1120            let http_res: &mut tide::http::Response = response.as_mut();
1121            let upgrade_receiver = http_res.recv_upgrade().await;
1122            let addr = request.remote().unwrap_or("unknown").to_string();
1123            task::spawn(async move {
1124                if let Some(stream) = upgrade_receiver.await {
1125                    server
1126                        .handle_connection(
1127                            Connection::new(
1128                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1129                            ),
1130                            addr,
1131                            user_id,
1132                            None,
1133                        )
1134                        .await;
1135                }
1136            });
1137
1138            Ok(response)
1139        }
1140    });
1141}
1142
1143fn header_contains_ignore_case<T>(
1144    request: &tide::Request<T>,
1145    header_name: HeaderName,
1146    value: &str,
1147) -> bool {
1148    request
1149        .header(header_name)
1150        .map(|h| {
1151            h.as_str()
1152                .split(',')
1153                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1154        })
1155        .unwrap_or(false)
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160    use super::*;
1161    use crate::{
1162        auth,
1163        db::{tests::TestDb, UserId},
1164        github, AppState, Config,
1165    };
1166    use ::rpc::Peer;
1167    use async_std::task;
1168    use gpui::{executor, ModelHandle, TestAppContext};
1169    use parking_lot::Mutex;
1170    use postage::{mpsc, watch};
1171    use rand::prelude::*;
1172    use rpc::PeerId;
1173    use serde_json::json;
1174    use sqlx::types::time::OffsetDateTime;
1175    use std::{
1176        ops::Deref,
1177        path::Path,
1178        rc::Rc,
1179        sync::{
1180            atomic::{AtomicBool, Ordering::SeqCst},
1181            Arc,
1182        },
1183        time::Duration,
1184    };
1185    use zed::{
1186        client::{
1187            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1188            EstablishConnectionError, UserStore,
1189        },
1190        editor::{Editor, EditorSettings, Input, MultiBuffer},
1191        fs::{FakeFs, Fs as _},
1192        language::{
1193            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1194            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1195        },
1196        lsp,
1197        project::{DiagnosticSummary, Project, ProjectPath},
1198    };
1199
1200    #[gpui::test]
1201    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1202        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1203        let lang_registry = Arc::new(LanguageRegistry::new());
1204        let fs = Arc::new(FakeFs::new(cx_a.background()));
1205        cx_a.foreground().forbid_parking();
1206
1207        // Connect to a server as 2 clients.
1208        let mut server = TestServer::start(cx_a.foreground()).await;
1209        let client_a = server.create_client(&mut cx_a, "user_a").await;
1210        let client_b = server.create_client(&mut cx_b, "user_b").await;
1211
1212        // Share a project as client A
1213        fs.insert_tree(
1214            "/a",
1215            json!({
1216                ".zed.toml": r#"collaborators = ["user_b"]"#,
1217                "a.txt": "a-contents",
1218                "b.txt": "b-contents",
1219            }),
1220        )
1221        .await;
1222        let project_a = cx_a.update(|cx| {
1223            Project::local(
1224                client_a.clone(),
1225                client_a.user_store.clone(),
1226                lang_registry.clone(),
1227                fs.clone(),
1228                cx,
1229            )
1230        });
1231        let (worktree_a, _) = project_a
1232            .update(&mut cx_a, |p, cx| {
1233                p.find_or_create_local_worktree("/a", false, cx)
1234            })
1235            .await
1236            .unwrap();
1237        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1238        worktree_a
1239            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1240            .await;
1241        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1242        project_a
1243            .update(&mut cx_a, |p, cx| p.share(cx))
1244            .await
1245            .unwrap();
1246
1247        // Join that project as client B
1248        let project_b = Project::remote(
1249            project_id,
1250            client_b.clone(),
1251            client_b.user_store.clone(),
1252            lang_registry.clone(),
1253            fs.clone(),
1254            &mut cx_b.to_async(),
1255        )
1256        .await
1257        .unwrap();
1258
1259        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1260            assert_eq!(
1261                project
1262                    .collaborators()
1263                    .get(&client_a.peer_id)
1264                    .unwrap()
1265                    .user
1266                    .github_login,
1267                "user_a"
1268            );
1269            project.replica_id()
1270        });
1271        project_a
1272            .condition(&cx_a, |tree, _| {
1273                tree.collaborators()
1274                    .get(&client_b.peer_id)
1275                    .map_or(false, |collaborator| {
1276                        collaborator.replica_id == replica_id_b
1277                            && collaborator.user.github_login == "user_b"
1278                    })
1279            })
1280            .await;
1281
1282        // Open the same file as client B and client A.
1283        let buffer_b = project_b
1284            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1285            .await
1286            .unwrap();
1287        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1288        buffer_b.read_with(&cx_b, |buf, cx| {
1289            assert_eq!(buf.read(cx).text(), "b-contents")
1290        });
1291        project_a.read_with(&cx_a, |project, cx| {
1292            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1293        });
1294        let buffer_a = project_a
1295            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1296            .await
1297            .unwrap();
1298
1299        let editor_b = cx_b.add_view(window_b, |cx| {
1300            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1301        });
1302
1303        // TODO
1304        // // Create a selection set as client B and see that selection set as client A.
1305        // buffer_a
1306        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1307        //     .await;
1308
1309        // Edit the buffer as client B and see that edit as client A.
1310        editor_b.update(&mut cx_b, |editor, cx| {
1311            editor.handle_input(&Input("ok, ".into()), cx)
1312        });
1313        buffer_a
1314            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1315            .await;
1316
1317        // TODO
1318        // // Remove the selection set as client B, see those selections disappear as client A.
1319        cx_b.update(move |_| drop(editor_b));
1320        // buffer_a
1321        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1322        //     .await;
1323
1324        // Close the buffer as client A, see that the buffer is closed.
1325        cx_a.update(move |_| drop(buffer_a));
1326        project_a
1327            .condition(&cx_a, |project, cx| {
1328                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1329            })
1330            .await;
1331
1332        // Dropping the client B's project removes client B from client A's collaborators.
1333        cx_b.update(move |_| drop(project_b));
1334        project_a
1335            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1336            .await;
1337    }
1338
1339    #[gpui::test]
1340    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1341        let lang_registry = Arc::new(LanguageRegistry::new());
1342        let fs = Arc::new(FakeFs::new(cx_a.background()));
1343        cx_a.foreground().forbid_parking();
1344
1345        // Connect to a server as 2 clients.
1346        let mut server = TestServer::start(cx_a.foreground()).await;
1347        let client_a = server.create_client(&mut cx_a, "user_a").await;
1348        let client_b = server.create_client(&mut cx_b, "user_b").await;
1349
1350        // Share a project as client A
1351        fs.insert_tree(
1352            "/a",
1353            json!({
1354                ".zed.toml": r#"collaborators = ["user_b"]"#,
1355                "a.txt": "a-contents",
1356                "b.txt": "b-contents",
1357            }),
1358        )
1359        .await;
1360        let project_a = cx_a.update(|cx| {
1361            Project::local(
1362                client_a.clone(),
1363                client_a.user_store.clone(),
1364                lang_registry.clone(),
1365                fs.clone(),
1366                cx,
1367            )
1368        });
1369        let (worktree_a, _) = project_a
1370            .update(&mut cx_a, |p, cx| {
1371                p.find_or_create_local_worktree("/a", false, cx)
1372            })
1373            .await
1374            .unwrap();
1375        worktree_a
1376            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1377            .await;
1378        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1379        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1380        project_a
1381            .update(&mut cx_a, |p, cx| p.share(cx))
1382            .await
1383            .unwrap();
1384        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1385
1386        // Join that project as client B
1387        let project_b = Project::remote(
1388            project_id,
1389            client_b.clone(),
1390            client_b.user_store.clone(),
1391            lang_registry.clone(),
1392            fs.clone(),
1393            &mut cx_b.to_async(),
1394        )
1395        .await
1396        .unwrap();
1397        project_b
1398            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1399            .await
1400            .unwrap();
1401
1402        // Unshare the project as client A
1403        project_a
1404            .update(&mut cx_a, |project, cx| project.unshare(cx))
1405            .await
1406            .unwrap();
1407        project_b
1408            .condition(&mut cx_b, |project, _| project.is_read_only())
1409            .await;
1410        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1411        drop(project_b);
1412
1413        // Share the project again and ensure guests can still join.
1414        project_a
1415            .update(&mut cx_a, |project, cx| project.share(cx))
1416            .await
1417            .unwrap();
1418        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1419
1420        let project_c = Project::remote(
1421            project_id,
1422            client_b.clone(),
1423            client_b.user_store.clone(),
1424            lang_registry.clone(),
1425            fs.clone(),
1426            &mut cx_b.to_async(),
1427        )
1428        .await
1429        .unwrap();
1430        project_c
1431            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1432            .await
1433            .unwrap();
1434    }
1435
1436    #[gpui::test]
1437    async fn test_propagate_saves_and_fs_changes(
1438        mut cx_a: TestAppContext,
1439        mut cx_b: TestAppContext,
1440        mut cx_c: TestAppContext,
1441    ) {
1442        let lang_registry = Arc::new(LanguageRegistry::new());
1443        let fs = Arc::new(FakeFs::new(cx_a.background()));
1444        cx_a.foreground().forbid_parking();
1445
1446        // Connect to a server as 3 clients.
1447        let mut server = TestServer::start(cx_a.foreground()).await;
1448        let client_a = server.create_client(&mut cx_a, "user_a").await;
1449        let client_b = server.create_client(&mut cx_b, "user_b").await;
1450        let client_c = server.create_client(&mut cx_c, "user_c").await;
1451
1452        // Share a worktree as client A.
1453        fs.insert_tree(
1454            "/a",
1455            json!({
1456                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1457                "file1": "",
1458                "file2": ""
1459            }),
1460        )
1461        .await;
1462        let project_a = cx_a.update(|cx| {
1463            Project::local(
1464                client_a.clone(),
1465                client_a.user_store.clone(),
1466                lang_registry.clone(),
1467                fs.clone(),
1468                cx,
1469            )
1470        });
1471        let (worktree_a, _) = project_a
1472            .update(&mut cx_a, |p, cx| {
1473                p.find_or_create_local_worktree("/a", false, cx)
1474            })
1475            .await
1476            .unwrap();
1477        worktree_a
1478            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1479            .await;
1480        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1481        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1482        project_a
1483            .update(&mut cx_a, |p, cx| p.share(cx))
1484            .await
1485            .unwrap();
1486
1487        // Join that worktree as clients B and C.
1488        let project_b = Project::remote(
1489            project_id,
1490            client_b.clone(),
1491            client_b.user_store.clone(),
1492            lang_registry.clone(),
1493            fs.clone(),
1494            &mut cx_b.to_async(),
1495        )
1496        .await
1497        .unwrap();
1498        let project_c = Project::remote(
1499            project_id,
1500            client_c.clone(),
1501            client_c.user_store.clone(),
1502            lang_registry.clone(),
1503            fs.clone(),
1504            &mut cx_c.to_async(),
1505        )
1506        .await
1507        .unwrap();
1508        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1509        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1510
1511        // Open and edit a buffer as both guests B and C.
1512        let buffer_b = project_b
1513            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1514            .await
1515            .unwrap();
1516        let buffer_c = project_c
1517            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1518            .await
1519            .unwrap();
1520        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1521        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1522
1523        // Open and edit that buffer as the host.
1524        let buffer_a = project_a
1525            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1526            .await
1527            .unwrap();
1528
1529        buffer_a
1530            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1531            .await;
1532        buffer_a.update(&mut cx_a, |buf, cx| {
1533            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1534        });
1535
1536        // Wait for edits to propagate
1537        buffer_a
1538            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1539            .await;
1540        buffer_b
1541            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1542            .await;
1543        buffer_c
1544            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1545            .await;
1546
1547        // Edit the buffer as the host and concurrently save as guest B.
1548        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1549        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1550        save_b.await.unwrap();
1551        assert_eq!(
1552            fs.load("/a/file1".as_ref()).await.unwrap(),
1553            "hi-a, i-am-c, i-am-b, i-am-a"
1554        );
1555        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1556        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1557        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1558
1559        // Make changes on host's file system, see those changes on guest worktrees.
1560        fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1561            .await
1562            .unwrap();
1563        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1564            .await
1565            .unwrap();
1566        fs.insert_file(Path::new("/a/file4"), "4".into())
1567            .await
1568            .unwrap();
1569
1570        worktree_a
1571            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1572            .await;
1573        worktree_b
1574            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1575            .await;
1576        worktree_c
1577            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1578            .await;
1579        worktree_a.read_with(&cx_a, |tree, _| {
1580            assert_eq!(
1581                tree.paths()
1582                    .map(|p| p.to_string_lossy())
1583                    .collect::<Vec<_>>(),
1584                &[".zed.toml", "file1-renamed", "file3", "file4"]
1585            )
1586        });
1587        worktree_b.read_with(&cx_b, |tree, _| {
1588            assert_eq!(
1589                tree.paths()
1590                    .map(|p| p.to_string_lossy())
1591                    .collect::<Vec<_>>(),
1592                &[".zed.toml", "file1-renamed", "file3", "file4"]
1593            )
1594        });
1595        worktree_c.read_with(&cx_c, |tree, _| {
1596            assert_eq!(
1597                tree.paths()
1598                    .map(|p| p.to_string_lossy())
1599                    .collect::<Vec<_>>(),
1600                &[".zed.toml", "file1-renamed", "file3", "file4"]
1601            )
1602        });
1603
1604        // Ensure buffer files are updated as well.
1605        buffer_a
1606            .condition(&cx_a, |buf, _| {
1607                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1608            })
1609            .await;
1610        buffer_b
1611            .condition(&cx_b, |buf, _| {
1612                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1613            })
1614            .await;
1615        buffer_c
1616            .condition(&cx_c, |buf, _| {
1617                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1618            })
1619            .await;
1620    }
1621
1622    #[gpui::test]
1623    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1624        cx_a.foreground().forbid_parking();
1625        let lang_registry = Arc::new(LanguageRegistry::new());
1626        let fs = Arc::new(FakeFs::new(cx_a.background()));
1627
1628        // Connect to a server as 2 clients.
1629        let mut server = TestServer::start(cx_a.foreground()).await;
1630        let client_a = server.create_client(&mut cx_a, "user_a").await;
1631        let client_b = server.create_client(&mut cx_b, "user_b").await;
1632
1633        // Share a project as client A
1634        fs.insert_tree(
1635            "/dir",
1636            json!({
1637                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1638                "a.txt": "a-contents",
1639            }),
1640        )
1641        .await;
1642
1643        let project_a = cx_a.update(|cx| {
1644            Project::local(
1645                client_a.clone(),
1646                client_a.user_store.clone(),
1647                lang_registry.clone(),
1648                fs.clone(),
1649                cx,
1650            )
1651        });
1652        let (worktree_a, _) = project_a
1653            .update(&mut cx_a, |p, cx| {
1654                p.find_or_create_local_worktree("/dir", false, cx)
1655            })
1656            .await
1657            .unwrap();
1658        worktree_a
1659            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1660            .await;
1661        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1662        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1663        project_a
1664            .update(&mut cx_a, |p, cx| p.share(cx))
1665            .await
1666            .unwrap();
1667
1668        // Join that project as client B
1669        let project_b = Project::remote(
1670            project_id,
1671            client_b.clone(),
1672            client_b.user_store.clone(),
1673            lang_registry.clone(),
1674            fs.clone(),
1675            &mut cx_b.to_async(),
1676        )
1677        .await
1678        .unwrap();
1679        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1680
1681        // Open a buffer as client B
1682        let buffer_b = project_b
1683            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1684            .await
1685            .unwrap();
1686        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1687
1688        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1689        buffer_b.read_with(&cx_b, |buf, _| {
1690            assert!(buf.is_dirty());
1691            assert!(!buf.has_conflict());
1692        });
1693
1694        buffer_b
1695            .update(&mut cx_b, |buf, cx| buf.save(cx))
1696            .await
1697            .unwrap();
1698        worktree_b
1699            .condition(&cx_b, |_, cx| {
1700                buffer_b.read(cx).file().unwrap().mtime() != mtime
1701            })
1702            .await;
1703        buffer_b.read_with(&cx_b, |buf, _| {
1704            assert!(!buf.is_dirty());
1705            assert!(!buf.has_conflict());
1706        });
1707
1708        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1709        buffer_b.read_with(&cx_b, |buf, _| {
1710            assert!(buf.is_dirty());
1711            assert!(!buf.has_conflict());
1712        });
1713    }
1714
1715    #[gpui::test]
1716    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1717        cx_a.foreground().forbid_parking();
1718        let lang_registry = Arc::new(LanguageRegistry::new());
1719        let fs = Arc::new(FakeFs::new(cx_a.background()));
1720
1721        // Connect to a server as 2 clients.
1722        let mut server = TestServer::start(cx_a.foreground()).await;
1723        let client_a = server.create_client(&mut cx_a, "user_a").await;
1724        let client_b = server.create_client(&mut cx_b, "user_b").await;
1725
1726        // Share a project as client A
1727        fs.insert_tree(
1728            "/dir",
1729            json!({
1730                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1731                "a.txt": "a-contents",
1732            }),
1733        )
1734        .await;
1735
1736        let project_a = cx_a.update(|cx| {
1737            Project::local(
1738                client_a.clone(),
1739                client_a.user_store.clone(),
1740                lang_registry.clone(),
1741                fs.clone(),
1742                cx,
1743            )
1744        });
1745        let (worktree_a, _) = project_a
1746            .update(&mut cx_a, |p, cx| {
1747                p.find_or_create_local_worktree("/dir", false, cx)
1748            })
1749            .await
1750            .unwrap();
1751        worktree_a
1752            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1753            .await;
1754        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1755        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1756        project_a
1757            .update(&mut cx_a, |p, cx| p.share(cx))
1758            .await
1759            .unwrap();
1760
1761        // Join that project as client B
1762        let project_b = Project::remote(
1763            project_id,
1764            client_b.clone(),
1765            client_b.user_store.clone(),
1766            lang_registry.clone(),
1767            fs.clone(),
1768            &mut cx_b.to_async(),
1769        )
1770        .await
1771        .unwrap();
1772        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1773
1774        // Open a buffer as client B
1775        let buffer_b = project_b
1776            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1777            .await
1778            .unwrap();
1779        buffer_b.read_with(&cx_b, |buf, _| {
1780            assert!(!buf.is_dirty());
1781            assert!(!buf.has_conflict());
1782        });
1783
1784        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1785            .await
1786            .unwrap();
1787        buffer_b
1788            .condition(&cx_b, |buf, _| {
1789                buf.text() == "new contents" && !buf.is_dirty()
1790            })
1791            .await;
1792        buffer_b.read_with(&cx_b, |buf, _| {
1793            assert!(!buf.has_conflict());
1794        });
1795    }
1796
1797    #[gpui::test(iterations = 100)]
1798    async fn test_editing_while_guest_opens_buffer(
1799        mut cx_a: TestAppContext,
1800        mut cx_b: TestAppContext,
1801    ) {
1802        cx_a.foreground().forbid_parking();
1803        let lang_registry = Arc::new(LanguageRegistry::new());
1804        let fs = Arc::new(FakeFs::new(cx_a.background()));
1805
1806        // Connect to a server as 2 clients.
1807        let mut server = TestServer::start(cx_a.foreground()).await;
1808        let client_a = server.create_client(&mut cx_a, "user_a").await;
1809        let client_b = server.create_client(&mut cx_b, "user_b").await;
1810
1811        // Share a project as client A
1812        fs.insert_tree(
1813            "/dir",
1814            json!({
1815                ".zed.toml": r#"collaborators = ["user_b"]"#,
1816                "a.txt": "a-contents",
1817            }),
1818        )
1819        .await;
1820        let project_a = cx_a.update(|cx| {
1821            Project::local(
1822                client_a.clone(),
1823                client_a.user_store.clone(),
1824                lang_registry.clone(),
1825                fs.clone(),
1826                cx,
1827            )
1828        });
1829        let (worktree_a, _) = project_a
1830            .update(&mut cx_a, |p, cx| {
1831                p.find_or_create_local_worktree("/dir", false, cx)
1832            })
1833            .await
1834            .unwrap();
1835        worktree_a
1836            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1837            .await;
1838        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1839        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1840        project_a
1841            .update(&mut cx_a, |p, cx| p.share(cx))
1842            .await
1843            .unwrap();
1844
1845        // Join that project as client B
1846        let project_b = Project::remote(
1847            project_id,
1848            client_b.clone(),
1849            client_b.user_store.clone(),
1850            lang_registry.clone(),
1851            fs.clone(),
1852            &mut cx_b.to_async(),
1853        )
1854        .await
1855        .unwrap();
1856
1857        // Open a buffer as client A
1858        let buffer_a = project_a
1859            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1860            .await
1861            .unwrap();
1862
1863        // Start opening the same buffer as client B
1864        let buffer_b = cx_b
1865            .background()
1866            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1867        task::yield_now().await;
1868
1869        // Edit the buffer as client A while client B is still opening it.
1870        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1871
1872        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1873        let buffer_b = buffer_b.await.unwrap();
1874        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1875    }
1876
1877    #[gpui::test]
1878    async fn test_leaving_worktree_while_opening_buffer(
1879        mut cx_a: TestAppContext,
1880        mut cx_b: TestAppContext,
1881    ) {
1882        cx_a.foreground().forbid_parking();
1883        let lang_registry = Arc::new(LanguageRegistry::new());
1884        let fs = Arc::new(FakeFs::new(cx_a.background()));
1885
1886        // Connect to a server as 2 clients.
1887        let mut server = TestServer::start(cx_a.foreground()).await;
1888        let client_a = server.create_client(&mut cx_a, "user_a").await;
1889        let client_b = server.create_client(&mut cx_b, "user_b").await;
1890
1891        // Share a project as client A
1892        fs.insert_tree(
1893            "/dir",
1894            json!({
1895                ".zed.toml": r#"collaborators = ["user_b"]"#,
1896                "a.txt": "a-contents",
1897            }),
1898        )
1899        .await;
1900        let project_a = cx_a.update(|cx| {
1901            Project::local(
1902                client_a.clone(),
1903                client_a.user_store.clone(),
1904                lang_registry.clone(),
1905                fs.clone(),
1906                cx,
1907            )
1908        });
1909        let (worktree_a, _) = project_a
1910            .update(&mut cx_a, |p, cx| {
1911                p.find_or_create_local_worktree("/dir", false, cx)
1912            })
1913            .await
1914            .unwrap();
1915        worktree_a
1916            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1917            .await;
1918        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1919        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1920        project_a
1921            .update(&mut cx_a, |p, cx| p.share(cx))
1922            .await
1923            .unwrap();
1924
1925        // Join that project as client B
1926        let project_b = Project::remote(
1927            project_id,
1928            client_b.clone(),
1929            client_b.user_store.clone(),
1930            lang_registry.clone(),
1931            fs.clone(),
1932            &mut cx_b.to_async(),
1933        )
1934        .await
1935        .unwrap();
1936
1937        // See that a guest has joined as client A.
1938        project_a
1939            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1940            .await;
1941
1942        // Begin opening a buffer as client B, but leave the project before the open completes.
1943        let buffer_b = cx_b
1944            .background()
1945            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1946        cx_b.update(|_| drop(project_b));
1947        drop(buffer_b);
1948
1949        // See that the guest has left.
1950        project_a
1951            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1952            .await;
1953    }
1954
1955    #[gpui::test]
1956    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1957        cx_a.foreground().forbid_parking();
1958        let lang_registry = Arc::new(LanguageRegistry::new());
1959        let fs = Arc::new(FakeFs::new(cx_a.background()));
1960
1961        // Connect to a server as 2 clients.
1962        let mut server = TestServer::start(cx_a.foreground()).await;
1963        let client_a = server.create_client(&mut cx_a, "user_a").await;
1964        let client_b = server.create_client(&mut cx_b, "user_b").await;
1965
1966        // Share a project as client A
1967        fs.insert_tree(
1968            "/a",
1969            json!({
1970                ".zed.toml": r#"collaborators = ["user_b"]"#,
1971                "a.txt": "a-contents",
1972                "b.txt": "b-contents",
1973            }),
1974        )
1975        .await;
1976        let project_a = cx_a.update(|cx| {
1977            Project::local(
1978                client_a.clone(),
1979                client_a.user_store.clone(),
1980                lang_registry.clone(),
1981                fs.clone(),
1982                cx,
1983            )
1984        });
1985        let (worktree_a, _) = project_a
1986            .update(&mut cx_a, |p, cx| {
1987                p.find_or_create_local_worktree("/a", false, cx)
1988            })
1989            .await
1990            .unwrap();
1991        worktree_a
1992            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1993            .await;
1994        let project_id = project_a
1995            .update(&mut cx_a, |project, _| project.next_remote_id())
1996            .await;
1997        project_a
1998            .update(&mut cx_a, |project, cx| project.share(cx))
1999            .await
2000            .unwrap();
2001
2002        // Join that project as client B
2003        let _project_b = Project::remote(
2004            project_id,
2005            client_b.clone(),
2006            client_b.user_store.clone(),
2007            lang_registry.clone(),
2008            fs.clone(),
2009            &mut cx_b.to_async(),
2010        )
2011        .await
2012        .unwrap();
2013
2014        // See that a guest has joined as client A.
2015        project_a
2016            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2017            .await;
2018
2019        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2020        client_b.disconnect(&cx_b.to_async()).unwrap();
2021        project_a
2022            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2023            .await;
2024    }
2025
2026    #[gpui::test]
2027    async fn test_collaborating_with_diagnostics(
2028        mut cx_a: TestAppContext,
2029        mut cx_b: TestAppContext,
2030    ) {
2031        cx_a.foreground().forbid_parking();
2032        let mut lang_registry = Arc::new(LanguageRegistry::new());
2033        let fs = Arc::new(FakeFs::new(cx_a.background()));
2034
2035        // Set up a fake language server.
2036        let (language_server_config, mut fake_language_server) =
2037            LanguageServerConfig::fake(cx_a.background()).await;
2038        Arc::get_mut(&mut lang_registry)
2039            .unwrap()
2040            .add(Arc::new(Language::new(
2041                LanguageConfig {
2042                    name: "Rust".to_string(),
2043                    path_suffixes: vec!["rs".to_string()],
2044                    language_server: Some(language_server_config),
2045                    ..Default::default()
2046                },
2047                Some(tree_sitter_rust::language()),
2048            )));
2049
2050        // Connect to a server as 2 clients.
2051        let mut server = TestServer::start(cx_a.foreground()).await;
2052        let client_a = server.create_client(&mut cx_a, "user_a").await;
2053        let client_b = server.create_client(&mut cx_b, "user_b").await;
2054
2055        // Share a project as client A
2056        fs.insert_tree(
2057            "/a",
2058            json!({
2059                ".zed.toml": r#"collaborators = ["user_b"]"#,
2060                "a.rs": "let one = two",
2061                "other.rs": "",
2062            }),
2063        )
2064        .await;
2065        let project_a = cx_a.update(|cx| {
2066            Project::local(
2067                client_a.clone(),
2068                client_a.user_store.clone(),
2069                lang_registry.clone(),
2070                fs.clone(),
2071                cx,
2072            )
2073        });
2074        let (worktree_a, _) = project_a
2075            .update(&mut cx_a, |p, cx| {
2076                p.find_or_create_local_worktree("/a", false, cx)
2077            })
2078            .await
2079            .unwrap();
2080        worktree_a
2081            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2082            .await;
2083        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2084        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2085        project_a
2086            .update(&mut cx_a, |p, cx| p.share(cx))
2087            .await
2088            .unwrap();
2089
2090        // Cause the language server to start.
2091        let _ = cx_a
2092            .background()
2093            .spawn(project_a.update(&mut cx_a, |project, cx| {
2094                project.open_buffer(
2095                    ProjectPath {
2096                        worktree_id,
2097                        path: Path::new("other.rs").into(),
2098                    },
2099                    cx,
2100                )
2101            }))
2102            .await
2103            .unwrap();
2104
2105        // Simulate a language server reporting errors for a file.
2106        fake_language_server
2107            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2108                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2109                version: None,
2110                diagnostics: vec![lsp::Diagnostic {
2111                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2112                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2113                    message: "message 1".to_string(),
2114                    ..Default::default()
2115                }],
2116            })
2117            .await;
2118
2119        // Wait for server to see the diagnostics update.
2120        server
2121            .condition(|store| {
2122                let worktree = store
2123                    .project(project_id)
2124                    .unwrap()
2125                    .worktrees
2126                    .get(&worktree_id.to_proto())
2127                    .unwrap();
2128
2129                !worktree
2130                    .share
2131                    .as_ref()
2132                    .unwrap()
2133                    .diagnostic_summaries
2134                    .is_empty()
2135            })
2136            .await;
2137
2138        // Join the worktree as client B.
2139        let project_b = Project::remote(
2140            project_id,
2141            client_b.clone(),
2142            client_b.user_store.clone(),
2143            lang_registry.clone(),
2144            fs.clone(),
2145            &mut cx_b.to_async(),
2146        )
2147        .await
2148        .unwrap();
2149
2150        project_b.read_with(&cx_b, |project, cx| {
2151            assert_eq!(
2152                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2153                &[(
2154                    ProjectPath {
2155                        worktree_id,
2156                        path: Arc::from(Path::new("a.rs")),
2157                    },
2158                    DiagnosticSummary {
2159                        error_count: 1,
2160                        warning_count: 0,
2161                        ..Default::default()
2162                    },
2163                )]
2164            )
2165        });
2166
2167        // Simulate a language server reporting more errors for a file.
2168        fake_language_server
2169            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2170                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2171                version: None,
2172                diagnostics: vec![
2173                    lsp::Diagnostic {
2174                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2175                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2176                        message: "message 1".to_string(),
2177                        ..Default::default()
2178                    },
2179                    lsp::Diagnostic {
2180                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2181                        range: lsp::Range::new(
2182                            lsp::Position::new(0, 10),
2183                            lsp::Position::new(0, 13),
2184                        ),
2185                        message: "message 2".to_string(),
2186                        ..Default::default()
2187                    },
2188                ],
2189            })
2190            .await;
2191
2192        // Client b gets the updated summaries
2193        project_b
2194            .condition(&cx_b, |project, cx| {
2195                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2196                    == &[(
2197                        ProjectPath {
2198                            worktree_id,
2199                            path: Arc::from(Path::new("a.rs")),
2200                        },
2201                        DiagnosticSummary {
2202                            error_count: 1,
2203                            warning_count: 1,
2204                            ..Default::default()
2205                        },
2206                    )]
2207            })
2208            .await;
2209
2210        // Open the file with the errors on client B. They should be present.
2211        let buffer_b = cx_b
2212            .background()
2213            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2214            .await
2215            .unwrap();
2216
2217        buffer_b.read_with(&cx_b, |buffer, _| {
2218            assert_eq!(
2219                buffer
2220                    .snapshot()
2221                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2222                    .map(|entry| entry)
2223                    .collect::<Vec<_>>(),
2224                &[
2225                    DiagnosticEntry {
2226                        range: Point::new(0, 4)..Point::new(0, 7),
2227                        diagnostic: Diagnostic {
2228                            group_id: 0,
2229                            message: "message 1".to_string(),
2230                            severity: lsp::DiagnosticSeverity::ERROR,
2231                            is_primary: true,
2232                            ..Default::default()
2233                        }
2234                    },
2235                    DiagnosticEntry {
2236                        range: Point::new(0, 10)..Point::new(0, 13),
2237                        diagnostic: Diagnostic {
2238                            group_id: 1,
2239                            severity: lsp::DiagnosticSeverity::WARNING,
2240                            message: "message 2".to_string(),
2241                            is_primary: true,
2242                            ..Default::default()
2243                        }
2244                    }
2245                ]
2246            );
2247        });
2248    }
2249
2250    #[gpui::test]
2251    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2252        cx_a.foreground().forbid_parking();
2253        let mut lang_registry = Arc::new(LanguageRegistry::new());
2254        let fs = Arc::new(FakeFs::new(cx_a.background()));
2255
2256        // Set up a fake language server.
2257        let (language_server_config, mut fake_language_server) =
2258            LanguageServerConfig::fake(cx_a.background()).await;
2259        Arc::get_mut(&mut lang_registry)
2260            .unwrap()
2261            .add(Arc::new(Language::new(
2262                LanguageConfig {
2263                    name: "Rust".to_string(),
2264                    path_suffixes: vec!["rs".to_string()],
2265                    language_server: Some(language_server_config),
2266                    ..Default::default()
2267                },
2268                Some(tree_sitter_rust::language()),
2269            )));
2270
2271        // Connect to a server as 2 clients.
2272        let mut server = TestServer::start(cx_a.foreground()).await;
2273        let client_a = server.create_client(&mut cx_a, "user_a").await;
2274        let client_b = server.create_client(&mut cx_b, "user_b").await;
2275
2276        // Share a project as client A
2277        fs.insert_tree(
2278            "/a",
2279            json!({
2280                ".zed.toml": r#"collaborators = ["user_b"]"#,
2281                "a.rs": "let one = two",
2282            }),
2283        )
2284        .await;
2285        let project_a = cx_a.update(|cx| {
2286            Project::local(
2287                client_a.clone(),
2288                client_a.user_store.clone(),
2289                lang_registry.clone(),
2290                fs.clone(),
2291                cx,
2292            )
2293        });
2294        let (worktree_a, _) = project_a
2295            .update(&mut cx_a, |p, cx| {
2296                p.find_or_create_local_worktree("/a", false, cx)
2297            })
2298            .await
2299            .unwrap();
2300        worktree_a
2301            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2302            .await;
2303        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2304        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2305        project_a
2306            .update(&mut cx_a, |p, cx| p.share(cx))
2307            .await
2308            .unwrap();
2309
2310        // Join the worktree as client B.
2311        let project_b = Project::remote(
2312            project_id,
2313            client_b.clone(),
2314            client_b.user_store.clone(),
2315            lang_registry.clone(),
2316            fs.clone(),
2317            &mut cx_b.to_async(),
2318        )
2319        .await
2320        .unwrap();
2321
2322        let buffer_b = cx_b
2323            .background()
2324            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2325            .await
2326            .unwrap();
2327
2328        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2329        let (request_id, _) = fake_language_server
2330            .receive_request::<lsp::request::Formatting>()
2331            .await;
2332        fake_language_server
2333            .respond(
2334                request_id,
2335                Some(vec![
2336                    lsp::TextEdit {
2337                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2338                        new_text: "h".to_string(),
2339                    },
2340                    lsp::TextEdit {
2341                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2342                        new_text: "y".to_string(),
2343                    },
2344                ]),
2345            )
2346            .await;
2347        format.await.unwrap();
2348        assert_eq!(
2349            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2350            "let honey = two"
2351        );
2352    }
2353
2354    #[gpui::test]
2355    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2356        cx_a.foreground().forbid_parking();
2357        let mut lang_registry = Arc::new(LanguageRegistry::new());
2358        let fs = Arc::new(FakeFs::new(cx_a.background()));
2359        fs.insert_tree(
2360            "/root-1",
2361            json!({
2362                ".zed.toml": r#"collaborators = ["user_b"]"#,
2363                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2364            }),
2365        )
2366        .await;
2367        fs.insert_tree(
2368            "/root-2",
2369            json!({
2370                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2371            }),
2372        )
2373        .await;
2374
2375        // Set up a fake language server.
2376        let (language_server_config, mut fake_language_server) =
2377            LanguageServerConfig::fake(cx_a.background()).await;
2378        Arc::get_mut(&mut lang_registry)
2379            .unwrap()
2380            .add(Arc::new(Language::new(
2381                LanguageConfig {
2382                    name: "Rust".to_string(),
2383                    path_suffixes: vec!["rs".to_string()],
2384                    language_server: Some(language_server_config),
2385                    ..Default::default()
2386                },
2387                Some(tree_sitter_rust::language()),
2388            )));
2389
2390        // Connect to a server as 2 clients.
2391        let mut server = TestServer::start(cx_a.foreground()).await;
2392        let client_a = server.create_client(&mut cx_a, "user_a").await;
2393        let client_b = server.create_client(&mut cx_b, "user_b").await;
2394
2395        // Share a project as client A
2396        let project_a = cx_a.update(|cx| {
2397            Project::local(
2398                client_a.clone(),
2399                client_a.user_store.clone(),
2400                lang_registry.clone(),
2401                fs.clone(),
2402                cx,
2403            )
2404        });
2405        let (worktree_a, _) = project_a
2406            .update(&mut cx_a, |p, cx| {
2407                p.find_or_create_local_worktree("/root-1", false, cx)
2408            })
2409            .await
2410            .unwrap();
2411        worktree_a
2412            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2413            .await;
2414        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2415        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2416        project_a
2417            .update(&mut cx_a, |p, cx| p.share(cx))
2418            .await
2419            .unwrap();
2420
2421        // Join the worktree as client B.
2422        let project_b = Project::remote(
2423            project_id,
2424            client_b.clone(),
2425            client_b.user_store.clone(),
2426            lang_registry.clone(),
2427            fs.clone(),
2428            &mut cx_b.to_async(),
2429        )
2430        .await
2431        .unwrap();
2432
2433        // Open the file to be formatted on client B.
2434        let buffer_b = cx_b
2435            .background()
2436            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2437            .await
2438            .unwrap();
2439
2440        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2441        let (request_id, _) = fake_language_server
2442            .receive_request::<lsp::request::GotoDefinition>()
2443            .await;
2444        fake_language_server
2445            .respond(
2446                request_id,
2447                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2448                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2449                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2450                ))),
2451            )
2452            .await;
2453        let definitions_1 = definitions_1.await.unwrap();
2454        cx_b.read(|cx| {
2455            assert_eq!(definitions_1.len(), 1);
2456            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2457            let target_buffer = definitions_1[0].target_buffer.read(cx);
2458            assert_eq!(
2459                target_buffer.text(),
2460                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2461            );
2462            assert_eq!(
2463                definitions_1[0].target_range.to_point(target_buffer),
2464                Point::new(0, 6)..Point::new(0, 9)
2465            );
2466        });
2467
2468        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2469        // the previous call to `definition`.
2470        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2471        let (request_id, _) = fake_language_server
2472            .receive_request::<lsp::request::GotoDefinition>()
2473            .await;
2474        fake_language_server
2475            .respond(
2476                request_id,
2477                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2478                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2479                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2480                ))),
2481            )
2482            .await;
2483        let definitions_2 = definitions_2.await.unwrap();
2484        cx_b.read(|cx| {
2485            assert_eq!(definitions_2.len(), 1);
2486            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2487            let target_buffer = definitions_2[0].target_buffer.read(cx);
2488            assert_eq!(
2489                target_buffer.text(),
2490                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2491            );
2492            assert_eq!(
2493                definitions_2[0].target_range.to_point(target_buffer),
2494                Point::new(1, 6)..Point::new(1, 11)
2495            );
2496        });
2497        assert_eq!(
2498            definitions_1[0].target_buffer,
2499            definitions_2[0].target_buffer
2500        );
2501
2502        cx_b.update(|_| {
2503            drop(definitions_1);
2504            drop(definitions_2);
2505        });
2506        project_b
2507            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2508            .await;
2509    }
2510
2511    #[gpui::test]
2512    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2513        mut cx_a: TestAppContext,
2514        mut cx_b: TestAppContext,
2515        mut rng: StdRng,
2516    ) {
2517        cx_a.foreground().forbid_parking();
2518        let mut lang_registry = Arc::new(LanguageRegistry::new());
2519        let fs = Arc::new(FakeFs::new(cx_a.background()));
2520        fs.insert_tree(
2521            "/root",
2522            json!({
2523                ".zed.toml": r#"collaborators = ["user_b"]"#,
2524                "a.rs": "const ONE: usize = b::TWO;",
2525                "b.rs": "const TWO: usize = 2",
2526            }),
2527        )
2528        .await;
2529
2530        // Set up a fake language server.
2531        let (language_server_config, mut fake_language_server) =
2532            LanguageServerConfig::fake(cx_a.background()).await;
2533        Arc::get_mut(&mut lang_registry)
2534            .unwrap()
2535            .add(Arc::new(Language::new(
2536                LanguageConfig {
2537                    name: "Rust".to_string(),
2538                    path_suffixes: vec!["rs".to_string()],
2539                    language_server: Some(language_server_config),
2540                    ..Default::default()
2541                },
2542                Some(tree_sitter_rust::language()),
2543            )));
2544
2545        // Connect to a server as 2 clients.
2546        let mut server = TestServer::start(cx_a.foreground()).await;
2547        let client_a = server.create_client(&mut cx_a, "user_a").await;
2548        let client_b = server.create_client(&mut cx_b, "user_b").await;
2549
2550        // Share a project as client A
2551        let project_a = cx_a.update(|cx| {
2552            Project::local(
2553                client_a.clone(),
2554                client_a.user_store.clone(),
2555                lang_registry.clone(),
2556                fs.clone(),
2557                cx,
2558            )
2559        });
2560        let (worktree_a, _) = project_a
2561            .update(&mut cx_a, |p, cx| {
2562                p.find_or_create_local_worktree("/root", false, cx)
2563            })
2564            .await
2565            .unwrap();
2566        worktree_a
2567            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2568            .await;
2569        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2570        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2571        project_a
2572            .update(&mut cx_a, |p, cx| p.share(cx))
2573            .await
2574            .unwrap();
2575
2576        // Join the worktree as client B.
2577        let project_b = Project::remote(
2578            project_id,
2579            client_b.clone(),
2580            client_b.user_store.clone(),
2581            lang_registry.clone(),
2582            fs.clone(),
2583            &mut cx_b.to_async(),
2584        )
2585        .await
2586        .unwrap();
2587
2588        let buffer_b1 = cx_b
2589            .background()
2590            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2591            .await
2592            .unwrap();
2593
2594        let definitions;
2595        let buffer_b2;
2596        if rng.gen() {
2597            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2598            buffer_b2 =
2599                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2600        } else {
2601            buffer_b2 =
2602                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2603            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2604        }
2605
2606        let (request_id, _) = fake_language_server
2607            .receive_request::<lsp::request::GotoDefinition>()
2608            .await;
2609        fake_language_server
2610            .respond(
2611                request_id,
2612                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2613                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2614                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2615                ))),
2616            )
2617            .await;
2618
2619        let buffer_b2 = buffer_b2.await.unwrap();
2620        let definitions = definitions.await.unwrap();
2621        assert_eq!(definitions.len(), 1);
2622        assert_eq!(definitions[0].target_buffer, buffer_b2);
2623    }
2624
2625    #[gpui::test]
2626    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2627        cx_a.foreground().forbid_parking();
2628
2629        // Connect to a server as 2 clients.
2630        let mut server = TestServer::start(cx_a.foreground()).await;
2631        let client_a = server.create_client(&mut cx_a, "user_a").await;
2632        let client_b = server.create_client(&mut cx_b, "user_b").await;
2633
2634        // Create an org that includes these 2 users.
2635        let db = &server.app_state.db;
2636        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2637        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2638            .await
2639            .unwrap();
2640        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2641            .await
2642            .unwrap();
2643
2644        // Create a channel that includes all the users.
2645        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2646        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2647            .await
2648            .unwrap();
2649        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2650            .await
2651            .unwrap();
2652        db.create_channel_message(
2653            channel_id,
2654            client_b.current_user_id(&cx_b),
2655            "hello A, it's B.",
2656            OffsetDateTime::now_utc(),
2657            1,
2658        )
2659        .await
2660        .unwrap();
2661
2662        let channels_a = cx_a
2663            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2664        channels_a
2665            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2666            .await;
2667        channels_a.read_with(&cx_a, |list, _| {
2668            assert_eq!(
2669                list.available_channels().unwrap(),
2670                &[ChannelDetails {
2671                    id: channel_id.to_proto(),
2672                    name: "test-channel".to_string()
2673                }]
2674            )
2675        });
2676        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2677            this.get_channel(channel_id.to_proto(), cx).unwrap()
2678        });
2679        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2680        channel_a
2681            .condition(&cx_a, |channel, _| {
2682                channel_messages(channel)
2683                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2684            })
2685            .await;
2686
2687        let channels_b = cx_b
2688            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2689        channels_b
2690            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2691            .await;
2692        channels_b.read_with(&cx_b, |list, _| {
2693            assert_eq!(
2694                list.available_channels().unwrap(),
2695                &[ChannelDetails {
2696                    id: channel_id.to_proto(),
2697                    name: "test-channel".to_string()
2698                }]
2699            )
2700        });
2701
2702        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2703            this.get_channel(channel_id.to_proto(), cx).unwrap()
2704        });
2705        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2706        channel_b
2707            .condition(&cx_b, |channel, _| {
2708                channel_messages(channel)
2709                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2710            })
2711            .await;
2712
2713        channel_a
2714            .update(&mut cx_a, |channel, cx| {
2715                channel
2716                    .send_message("oh, hi B.".to_string(), cx)
2717                    .unwrap()
2718                    .detach();
2719                let task = channel.send_message("sup".to_string(), cx).unwrap();
2720                assert_eq!(
2721                    channel_messages(channel),
2722                    &[
2723                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2724                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2725                        ("user_a".to_string(), "sup".to_string(), true)
2726                    ]
2727                );
2728                task
2729            })
2730            .await
2731            .unwrap();
2732
2733        channel_b
2734            .condition(&cx_b, |channel, _| {
2735                channel_messages(channel)
2736                    == [
2737                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2738                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2739                        ("user_a".to_string(), "sup".to_string(), false),
2740                    ]
2741            })
2742            .await;
2743
2744        assert_eq!(
2745            server
2746                .state()
2747                .await
2748                .channel(channel_id)
2749                .unwrap()
2750                .connection_ids
2751                .len(),
2752            2
2753        );
2754        cx_b.update(|_| drop(channel_b));
2755        server
2756            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2757            .await;
2758
2759        cx_a.update(|_| drop(channel_a));
2760        server
2761            .condition(|state| state.channel(channel_id).is_none())
2762            .await;
2763    }
2764
2765    #[gpui::test]
2766    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2767        cx_a.foreground().forbid_parking();
2768
2769        let mut server = TestServer::start(cx_a.foreground()).await;
2770        let client_a = server.create_client(&mut cx_a, "user_a").await;
2771
2772        let db = &server.app_state.db;
2773        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2774        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2775        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2776            .await
2777            .unwrap();
2778        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2779            .await
2780            .unwrap();
2781
2782        let channels_a = cx_a
2783            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2784        channels_a
2785            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2786            .await;
2787        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2788            this.get_channel(channel_id.to_proto(), cx).unwrap()
2789        });
2790
2791        // Messages aren't allowed to be too long.
2792        channel_a
2793            .update(&mut cx_a, |channel, cx| {
2794                let long_body = "this is long.\n".repeat(1024);
2795                channel.send_message(long_body, cx).unwrap()
2796            })
2797            .await
2798            .unwrap_err();
2799
2800        // Messages aren't allowed to be blank.
2801        channel_a.update(&mut cx_a, |channel, cx| {
2802            channel.send_message(String::new(), cx).unwrap_err()
2803        });
2804
2805        // Leading and trailing whitespace are trimmed.
2806        channel_a
2807            .update(&mut cx_a, |channel, cx| {
2808                channel
2809                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2810                    .unwrap()
2811            })
2812            .await
2813            .unwrap();
2814        assert_eq!(
2815            db.get_channel_messages(channel_id, 10, None)
2816                .await
2817                .unwrap()
2818                .iter()
2819                .map(|m| &m.body)
2820                .collect::<Vec<_>>(),
2821            &["surrounded by whitespace"]
2822        );
2823    }
2824
2825    #[gpui::test]
2826    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2827        cx_a.foreground().forbid_parking();
2828
2829        // Connect to a server as 2 clients.
2830        let mut server = TestServer::start(cx_a.foreground()).await;
2831        let client_a = server.create_client(&mut cx_a, "user_a").await;
2832        let client_b = server.create_client(&mut cx_b, "user_b").await;
2833        let mut status_b = client_b.status();
2834
2835        // Create an org that includes these 2 users.
2836        let db = &server.app_state.db;
2837        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2838        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2839            .await
2840            .unwrap();
2841        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2842            .await
2843            .unwrap();
2844
2845        // Create a channel that includes all the users.
2846        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2847        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2848            .await
2849            .unwrap();
2850        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2851            .await
2852            .unwrap();
2853        db.create_channel_message(
2854            channel_id,
2855            client_b.current_user_id(&cx_b),
2856            "hello A, it's B.",
2857            OffsetDateTime::now_utc(),
2858            2,
2859        )
2860        .await
2861        .unwrap();
2862
2863        let channels_a = cx_a
2864            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2865        channels_a
2866            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2867            .await;
2868
2869        channels_a.read_with(&cx_a, |list, _| {
2870            assert_eq!(
2871                list.available_channels().unwrap(),
2872                &[ChannelDetails {
2873                    id: channel_id.to_proto(),
2874                    name: "test-channel".to_string()
2875                }]
2876            )
2877        });
2878        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2879            this.get_channel(channel_id.to_proto(), cx).unwrap()
2880        });
2881        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2882        channel_a
2883            .condition(&cx_a, |channel, _| {
2884                channel_messages(channel)
2885                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2886            })
2887            .await;
2888
2889        let channels_b = cx_b
2890            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2891        channels_b
2892            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2893            .await;
2894        channels_b.read_with(&cx_b, |list, _| {
2895            assert_eq!(
2896                list.available_channels().unwrap(),
2897                &[ChannelDetails {
2898                    id: channel_id.to_proto(),
2899                    name: "test-channel".to_string()
2900                }]
2901            )
2902        });
2903
2904        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2905            this.get_channel(channel_id.to_proto(), cx).unwrap()
2906        });
2907        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2908        channel_b
2909            .condition(&cx_b, |channel, _| {
2910                channel_messages(channel)
2911                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2912            })
2913            .await;
2914
2915        // Disconnect client B, ensuring we can still access its cached channel data.
2916        server.forbid_connections();
2917        server.disconnect_client(client_b.current_user_id(&cx_b));
2918        while !matches!(
2919            status_b.next().await,
2920            Some(client::Status::ReconnectionError { .. })
2921        ) {}
2922
2923        channels_b.read_with(&cx_b, |channels, _| {
2924            assert_eq!(
2925                channels.available_channels().unwrap(),
2926                [ChannelDetails {
2927                    id: channel_id.to_proto(),
2928                    name: "test-channel".to_string()
2929                }]
2930            )
2931        });
2932        channel_b.read_with(&cx_b, |channel, _| {
2933            assert_eq!(
2934                channel_messages(channel),
2935                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2936            )
2937        });
2938
2939        // Send a message from client B while it is disconnected.
2940        channel_b
2941            .update(&mut cx_b, |channel, cx| {
2942                let task = channel
2943                    .send_message("can you see this?".to_string(), cx)
2944                    .unwrap();
2945                assert_eq!(
2946                    channel_messages(channel),
2947                    &[
2948                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2949                        ("user_b".to_string(), "can you see this?".to_string(), true)
2950                    ]
2951                );
2952                task
2953            })
2954            .await
2955            .unwrap_err();
2956
2957        // Send a message from client A while B is disconnected.
2958        channel_a
2959            .update(&mut cx_a, |channel, cx| {
2960                channel
2961                    .send_message("oh, hi B.".to_string(), cx)
2962                    .unwrap()
2963                    .detach();
2964                let task = channel.send_message("sup".to_string(), cx).unwrap();
2965                assert_eq!(
2966                    channel_messages(channel),
2967                    &[
2968                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2969                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2970                        ("user_a".to_string(), "sup".to_string(), true)
2971                    ]
2972                );
2973                task
2974            })
2975            .await
2976            .unwrap();
2977
2978        // Give client B a chance to reconnect.
2979        server.allow_connections();
2980        cx_b.foreground().advance_clock(Duration::from_secs(10));
2981
2982        // Verify that B sees the new messages upon reconnection, as well as the message client B
2983        // sent while offline.
2984        channel_b
2985            .condition(&cx_b, |channel, _| {
2986                channel_messages(channel)
2987                    == [
2988                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2989                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2990                        ("user_a".to_string(), "sup".to_string(), false),
2991                        ("user_b".to_string(), "can you see this?".to_string(), false),
2992                    ]
2993            })
2994            .await;
2995
2996        // Ensure client A and B can communicate normally after reconnection.
2997        channel_a
2998            .update(&mut cx_a, |channel, cx| {
2999                channel.send_message("you online?".to_string(), cx).unwrap()
3000            })
3001            .await
3002            .unwrap();
3003        channel_b
3004            .condition(&cx_b, |channel, _| {
3005                channel_messages(channel)
3006                    == [
3007                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3008                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3009                        ("user_a".to_string(), "sup".to_string(), false),
3010                        ("user_b".to_string(), "can you see this?".to_string(), false),
3011                        ("user_a".to_string(), "you online?".to_string(), false),
3012                    ]
3013            })
3014            .await;
3015
3016        channel_b
3017            .update(&mut cx_b, |channel, cx| {
3018                channel.send_message("yep".to_string(), cx).unwrap()
3019            })
3020            .await
3021            .unwrap();
3022        channel_a
3023            .condition(&cx_a, |channel, _| {
3024                channel_messages(channel)
3025                    == [
3026                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3027                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3028                        ("user_a".to_string(), "sup".to_string(), false),
3029                        ("user_b".to_string(), "can you see this?".to_string(), false),
3030                        ("user_a".to_string(), "you online?".to_string(), false),
3031                        ("user_b".to_string(), "yep".to_string(), false),
3032                    ]
3033            })
3034            .await;
3035    }
3036
3037    #[gpui::test]
3038    async fn test_contacts(
3039        mut cx_a: TestAppContext,
3040        mut cx_b: TestAppContext,
3041        mut cx_c: TestAppContext,
3042    ) {
3043        cx_a.foreground().forbid_parking();
3044        let lang_registry = Arc::new(LanguageRegistry::new());
3045        let fs = Arc::new(FakeFs::new(cx_a.background()));
3046
3047        // Connect to a server as 3 clients.
3048        let mut server = TestServer::start(cx_a.foreground()).await;
3049        let client_a = server.create_client(&mut cx_a, "user_a").await;
3050        let client_b = server.create_client(&mut cx_b, "user_b").await;
3051        let client_c = server.create_client(&mut cx_c, "user_c").await;
3052
3053        // Share a worktree as client A.
3054        fs.insert_tree(
3055            "/a",
3056            json!({
3057                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3058            }),
3059        )
3060        .await;
3061
3062        let project_a = cx_a.update(|cx| {
3063            Project::local(
3064                client_a.clone(),
3065                client_a.user_store.clone(),
3066                lang_registry.clone(),
3067                fs.clone(),
3068                cx,
3069            )
3070        });
3071        let (worktree_a, _) = project_a
3072            .update(&mut cx_a, |p, cx| {
3073                p.find_or_create_local_worktree("/a", false, cx)
3074            })
3075            .await
3076            .unwrap();
3077        worktree_a
3078            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3079            .await;
3080
3081        client_a
3082            .user_store
3083            .condition(&cx_a, |user_store, _| {
3084                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3085            })
3086            .await;
3087        client_b
3088            .user_store
3089            .condition(&cx_b, |user_store, _| {
3090                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3091            })
3092            .await;
3093        client_c
3094            .user_store
3095            .condition(&cx_c, |user_store, _| {
3096                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3097            })
3098            .await;
3099
3100        let project_id = project_a
3101            .update(&mut cx_a, |project, _| project.next_remote_id())
3102            .await;
3103        project_a
3104            .update(&mut cx_a, |project, cx| project.share(cx))
3105            .await
3106            .unwrap();
3107
3108        let _project_b = Project::remote(
3109            project_id,
3110            client_b.clone(),
3111            client_b.user_store.clone(),
3112            lang_registry.clone(),
3113            fs.clone(),
3114            &mut cx_b.to_async(),
3115        )
3116        .await
3117        .unwrap();
3118
3119        client_a
3120            .user_store
3121            .condition(&cx_a, |user_store, _| {
3122                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3123            })
3124            .await;
3125        client_b
3126            .user_store
3127            .condition(&cx_b, |user_store, _| {
3128                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3129            })
3130            .await;
3131        client_c
3132            .user_store
3133            .condition(&cx_c, |user_store, _| {
3134                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3135            })
3136            .await;
3137
3138        project_a
3139            .condition(&cx_a, |project, _| {
3140                project.collaborators().contains_key(&client_b.peer_id)
3141            })
3142            .await;
3143
3144        cx_a.update(move |_| drop(project_a));
3145        client_a
3146            .user_store
3147            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3148            .await;
3149        client_b
3150            .user_store
3151            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3152            .await;
3153        client_c
3154            .user_store
3155            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3156            .await;
3157
3158        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3159            user_store
3160                .contacts()
3161                .iter()
3162                .map(|contact| {
3163                    let worktrees = contact
3164                        .projects
3165                        .iter()
3166                        .map(|p| {
3167                            (
3168                                p.worktree_root_names[0].as_str(),
3169                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3170                            )
3171                        })
3172                        .collect();
3173                    (contact.user.github_login.as_str(), worktrees)
3174                })
3175                .collect()
3176        }
3177    }
3178
3179    struct TestServer {
3180        peer: Arc<Peer>,
3181        app_state: Arc<AppState>,
3182        server: Arc<Server>,
3183        foreground: Rc<executor::Foreground>,
3184        notifications: mpsc::Receiver<()>,
3185        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3186        forbid_connections: Arc<AtomicBool>,
3187        _test_db: TestDb,
3188    }
3189
3190    impl TestServer {
3191        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3192            let test_db = TestDb::new();
3193            let app_state = Self::build_app_state(&test_db).await;
3194            let peer = Peer::new();
3195            let notifications = mpsc::channel(128);
3196            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3197            Self {
3198                peer,
3199                app_state,
3200                server,
3201                foreground,
3202                notifications: notifications.1,
3203                connection_killers: Default::default(),
3204                forbid_connections: Default::default(),
3205                _test_db: test_db,
3206            }
3207        }
3208
3209        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3210            let http = FakeHttpClient::with_404_response();
3211            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3212            let client_name = name.to_string();
3213            let mut client = Client::new(http.clone());
3214            let server = self.server.clone();
3215            let connection_killers = self.connection_killers.clone();
3216            let forbid_connections = self.forbid_connections.clone();
3217            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3218
3219            Arc::get_mut(&mut client)
3220                .unwrap()
3221                .override_authenticate(move |cx| {
3222                    cx.spawn(|_| async move {
3223                        let access_token = "the-token".to_string();
3224                        Ok(Credentials {
3225                            user_id: user_id.0 as u64,
3226                            access_token,
3227                        })
3228                    })
3229                })
3230                .override_establish_connection(move |credentials, cx| {
3231                    assert_eq!(credentials.user_id, user_id.0 as u64);
3232                    assert_eq!(credentials.access_token, "the-token");
3233
3234                    let server = server.clone();
3235                    let connection_killers = connection_killers.clone();
3236                    let forbid_connections = forbid_connections.clone();
3237                    let client_name = client_name.clone();
3238                    let connection_id_tx = connection_id_tx.clone();
3239                    cx.spawn(move |cx| async move {
3240                        if forbid_connections.load(SeqCst) {
3241                            Err(EstablishConnectionError::other(anyhow!(
3242                                "server is forbidding connections"
3243                            )))
3244                        } else {
3245                            let (client_conn, server_conn, kill_conn) =
3246                                Connection::in_memory(cx.background());
3247                            connection_killers.lock().insert(user_id, kill_conn);
3248                            cx.background()
3249                                .spawn(server.handle_connection(
3250                                    server_conn,
3251                                    client_name,
3252                                    user_id,
3253                                    Some(connection_id_tx),
3254                                ))
3255                                .detach();
3256                            Ok(client_conn)
3257                        }
3258                    })
3259                });
3260
3261            client
3262                .authenticate_and_connect(&cx.to_async())
3263                .await
3264                .unwrap();
3265
3266            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3267            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3268            let mut authed_user =
3269                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3270            while authed_user.next().await.unwrap().is_none() {}
3271
3272            TestClient {
3273                client,
3274                peer_id,
3275                user_store,
3276            }
3277        }
3278
3279        fn disconnect_client(&self, user_id: UserId) {
3280            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3281                let _ = kill_conn.try_send(Some(()));
3282            }
3283        }
3284
3285        fn forbid_connections(&self) {
3286            self.forbid_connections.store(true, SeqCst);
3287        }
3288
3289        fn allow_connections(&self) {
3290            self.forbid_connections.store(false, SeqCst);
3291        }
3292
3293        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3294            let mut config = Config::default();
3295            config.session_secret = "a".repeat(32);
3296            config.database_url = test_db.url.clone();
3297            let github_client = github::AppClient::test();
3298            Arc::new(AppState {
3299                db: test_db.db().clone(),
3300                handlebars: Default::default(),
3301                auth_client: auth::build_client("", ""),
3302                repo_client: github::RepoClient::test(&github_client),
3303                github_client,
3304                config,
3305            })
3306        }
3307
3308        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3309            self.server.store.read()
3310        }
3311
3312        async fn condition<F>(&mut self, mut predicate: F)
3313        where
3314            F: FnMut(&Store) -> bool,
3315        {
3316            async_std::future::timeout(Duration::from_millis(500), async {
3317                while !(predicate)(&*self.server.store.read()) {
3318                    self.foreground.start_waiting();
3319                    self.notifications.next().await;
3320                    self.foreground.finish_waiting();
3321                }
3322            })
3323            .await
3324            .expect("condition timed out");
3325        }
3326    }
3327
3328    impl Drop for TestServer {
3329        fn drop(&mut self) {
3330            self.peer.reset();
3331        }
3332    }
3333
3334    struct TestClient {
3335        client: Arc<Client>,
3336        pub peer_id: PeerId,
3337        pub user_store: ModelHandle<UserStore>,
3338    }
3339
3340    impl Deref for TestClient {
3341        type Target = Arc<Client>;
3342
3343        fn deref(&self) -> &Self::Target {
3344            &self.client
3345        }
3346    }
3347
3348    impl TestClient {
3349        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3350            UserId::from_proto(
3351                self.user_store
3352                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3353            )
3354        }
3355    }
3356
3357    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3358        channel
3359            .messages()
3360            .cursor::<()>()
3361            .map(|m| {
3362                (
3363                    m.sender.github_login.clone(),
3364                    m.body.clone(),
3365                    m.is_pending(),
3366                )
3367            })
3368            .collect()
3369    }
3370
3371    struct EmptyView;
3372
3373    impl gpui::Entity for EmptyView {
3374        type Event = ();
3375    }
3376
3377    impl gpui::View for EmptyView {
3378        fn ui_name() -> &'static str {
3379            "empty view"
3380        }
3381
3382        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3383            gpui::Element::boxed(gpui::elements::Empty)
3384        }
3385    }
3386}