rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_channels)
  89            .add_handler(Server::get_users)
  90            .add_handler(Server::join_channel)
  91            .add_handler(Server::leave_channel)
  92            .add_handler(Server::send_channel_message)
  93            .add_handler(Server::get_channel_messages);
  94
  95        Arc::new(server)
  96    }
  97
  98    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  99    where
 100        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 101        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 102        M: EnvelopedMessage,
 103    {
 104        let prev_handler = self.handlers.insert(
 105            TypeId::of::<M>(),
 106            Box::new(move |server, envelope| {
 107                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 108                (handler)(server, *envelope).boxed()
 109            }),
 110        );
 111        if prev_handler.is_some() {
 112            panic!("registered a handler for the same message twice");
 113        }
 114        self
 115    }
 116
 117    pub fn handle_connection(
 118        self: &Arc<Self>,
 119        connection: Connection,
 120        addr: String,
 121        user_id: UserId,
 122        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 123    ) -> impl Future<Output = ()> {
 124        let mut this = self.clone();
 125        async move {
 126            let (connection_id, handle_io, mut incoming_rx) =
 127                this.peer.add_connection(connection).await;
 128
 129            if let Some(send_connection_id) = send_connection_id.as_mut() {
 130                let _ = send_connection_id.send(connection_id).await;
 131            }
 132
 133            this.state_mut().add_connection(connection_id, user_id);
 134            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 135                log::error!("error updating contacts for {:?}: {}", user_id, err);
 136            }
 137
 138            let handle_io = handle_io.fuse();
 139            futures::pin_mut!(handle_io);
 140            loop {
 141                let next_message = incoming_rx.next().fuse();
 142                futures::pin_mut!(next_message);
 143                futures::select_biased! {
 144                    message = next_message => {
 145                        if let Some(message) = message {
 146                            let start_time = Instant::now();
 147                            log::info!("RPC message received: {}", message.payload_type_name());
 148                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 149                                if let Err(err) = (handler)(this.clone(), message).await {
 150                                    log::error!("error handling message: {:?}", err);
 151                                } else {
 152                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 153                                }
 154
 155                                if let Some(mut notifications) = this.notifications.clone() {
 156                                    let _ = notifications.send(()).await;
 157                                }
 158                            } else {
 159                                log::warn!("unhandled message: {}", message.payload_type_name());
 160                            }
 161                        } else {
 162                            log::info!("rpc connection closed {:?}", addr);
 163                            break;
 164                        }
 165                    }
 166                    handle_io = handle_io => {
 167                        if let Err(err) = handle_io {
 168                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 169                        }
 170                        break;
 171                    }
 172                }
 173            }
 174
 175            if let Err(err) = this.sign_out(connection_id).await {
 176                log::error!("error signing out connection {:?} - {:?}", addr, err);
 177            }
 178        }
 179    }
 180
 181    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 182        self.peer.disconnect(connection_id);
 183        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 184
 185        for (project_id, project) in removed_connection.hosted_projects {
 186            if let Some(share) = project.share {
 187                broadcast(
 188                    connection_id,
 189                    share.guests.keys().copied().collect(),
 190                    |conn_id| {
 191                        self.peer
 192                            .send(conn_id, proto::UnshareProject { project_id })
 193                    },
 194                )
 195                .await?;
 196            }
 197        }
 198
 199        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 200            broadcast(connection_id, peer_ids, |conn_id| {
 201                self.peer.send(
 202                    conn_id,
 203                    proto::RemoveProjectCollaborator {
 204                        project_id,
 205                        peer_id: connection_id.0,
 206                    },
 207                )
 208            })
 209            .await?;
 210        }
 211
 212        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 213            .await?;
 214
 215        Ok(())
 216    }
 217
 218    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 219        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 220        Ok(())
 221    }
 222
 223    async fn register_project(
 224        mut self: Arc<Server>,
 225        request: TypedEnvelope<proto::RegisterProject>,
 226    ) -> tide::Result<()> {
 227        let project_id = {
 228            let mut state = self.state_mut();
 229            let user_id = state.user_id_for_connection(request.sender_id)?;
 230            state.register_project(request.sender_id, user_id)
 231        };
 232        self.peer
 233            .respond(
 234                request.receipt(),
 235                proto::RegisterProjectResponse { project_id },
 236            )
 237            .await?;
 238        Ok(())
 239    }
 240
 241    async fn unregister_project(
 242        mut self: Arc<Server>,
 243        request: TypedEnvelope<proto::UnregisterProject>,
 244    ) -> tide::Result<()> {
 245        let project = self
 246            .state_mut()
 247            .unregister_project(request.payload.project_id, request.sender_id)
 248            .ok_or_else(|| anyhow!("no such project"))?;
 249        self.update_contacts_for_users(project.authorized_user_ids().iter())
 250            .await?;
 251        Ok(())
 252    }
 253
 254    async fn share_project(
 255        mut self: Arc<Server>,
 256        request: TypedEnvelope<proto::ShareProject>,
 257    ) -> tide::Result<()> {
 258        self.state_mut()
 259            .share_project(request.payload.project_id, request.sender_id);
 260        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 261        Ok(())
 262    }
 263
 264    async fn unshare_project(
 265        mut self: Arc<Server>,
 266        request: TypedEnvelope<proto::UnshareProject>,
 267    ) -> tide::Result<()> {
 268        let project_id = request.payload.project_id;
 269        let project = self
 270            .state_mut()
 271            .unshare_project(project_id, request.sender_id)?;
 272
 273        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 274            self.peer
 275                .send(conn_id, proto::UnshareProject { project_id })
 276        })
 277        .await?;
 278        self.update_contacts_for_users(&project.authorized_user_ids)
 279            .await?;
 280
 281        Ok(())
 282    }
 283
 284    async fn join_project(
 285        mut self: Arc<Server>,
 286        request: TypedEnvelope<proto::JoinProject>,
 287    ) -> tide::Result<()> {
 288        let project_id = request.payload.project_id;
 289
 290        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 291        let response_data = self
 292            .state_mut()
 293            .join_project(request.sender_id, user_id, project_id)
 294            .and_then(|joined| {
 295                let share = joined.project.share()?;
 296                let peer_count = share.guests.len();
 297                let mut collaborators = Vec::with_capacity(peer_count);
 298                collaborators.push(proto::Collaborator {
 299                    peer_id: joined.project.host_connection_id.0,
 300                    replica_id: 0,
 301                    user_id: joined.project.host_user_id.to_proto(),
 302                });
 303                let worktrees = joined
 304                    .project
 305                    .worktrees
 306                    .iter()
 307                    .filter_map(|(id, worktree)| {
 308                        worktree.share.as_ref().map(|share| proto::Worktree {
 309                            id: *id,
 310                            root_name: worktree.root_name.clone(),
 311                            entries: share.entries.values().cloned().collect(),
 312                            diagnostic_summaries: share
 313                                .diagnostic_summaries
 314                                .values()
 315                                .cloned()
 316                                .collect(),
 317                            weak: worktree.weak,
 318                        })
 319                    })
 320                    .collect();
 321                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 322                    if *peer_conn_id != request.sender_id {
 323                        collaborators.push(proto::Collaborator {
 324                            peer_id: peer_conn_id.0,
 325                            replica_id: *peer_replica_id as u32,
 326                            user_id: peer_user_id.to_proto(),
 327                        });
 328                    }
 329                }
 330                let response = proto::JoinProjectResponse {
 331                    worktrees,
 332                    replica_id: joined.replica_id as u32,
 333                    collaborators,
 334                };
 335                let connection_ids = joined.project.connection_ids();
 336                let contact_user_ids = joined.project.authorized_user_ids();
 337                Ok((response, connection_ids, contact_user_ids))
 338            });
 339
 340        match response_data {
 341            Ok((response, connection_ids, contact_user_ids)) => {
 342                broadcast(request.sender_id, connection_ids, |conn_id| {
 343                    self.peer.send(
 344                        conn_id,
 345                        proto::AddProjectCollaborator {
 346                            project_id: project_id,
 347                            collaborator: Some(proto::Collaborator {
 348                                peer_id: request.sender_id.0,
 349                                replica_id: response.replica_id,
 350                                user_id: user_id.to_proto(),
 351                            }),
 352                        },
 353                    )
 354                })
 355                .await?;
 356                self.peer.respond(request.receipt(), response).await?;
 357                self.update_contacts_for_users(&contact_user_ids).await?;
 358            }
 359            Err(error) => {
 360                self.peer
 361                    .respond_with_error(
 362                        request.receipt(),
 363                        proto::Error {
 364                            message: error.to_string(),
 365                        },
 366                    )
 367                    .await?;
 368            }
 369        }
 370
 371        Ok(())
 372    }
 373
 374    async fn leave_project(
 375        mut self: Arc<Server>,
 376        request: TypedEnvelope<proto::LeaveProject>,
 377    ) -> tide::Result<()> {
 378        let sender_id = request.sender_id;
 379        let project_id = request.payload.project_id;
 380        let worktree = self.state_mut().leave_project(sender_id, project_id);
 381        if let Some(worktree) = worktree {
 382            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 383                self.peer.send(
 384                    conn_id,
 385                    proto::RemoveProjectCollaborator {
 386                        project_id,
 387                        peer_id: sender_id.0,
 388                    },
 389                )
 390            })
 391            .await?;
 392            self.update_contacts_for_users(&worktree.authorized_user_ids)
 393                .await?;
 394        }
 395        Ok(())
 396    }
 397
 398    async fn register_worktree(
 399        mut self: Arc<Server>,
 400        request: TypedEnvelope<proto::RegisterWorktree>,
 401    ) -> tide::Result<()> {
 402        let receipt = request.receipt();
 403        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 404
 405        let mut contact_user_ids = HashSet::default();
 406        contact_user_ids.insert(host_user_id);
 407        for github_login in request.payload.authorized_logins {
 408            match self.app_state.db.create_user(&github_login, false).await {
 409                Ok(contact_user_id) => {
 410                    contact_user_ids.insert(contact_user_id);
 411                }
 412                Err(err) => {
 413                    let message = err.to_string();
 414                    self.peer
 415                        .respond_with_error(receipt, proto::Error { message })
 416                        .await?;
 417                    return Ok(());
 418                }
 419            }
 420        }
 421
 422        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 423        let ok = self.state_mut().register_worktree(
 424            request.payload.project_id,
 425            request.payload.worktree_id,
 426            Worktree {
 427                authorized_user_ids: contact_user_ids.clone(),
 428                root_name: request.payload.root_name,
 429                share: None,
 430                weak: false,
 431            },
 432        );
 433
 434        if ok {
 435            self.peer.respond(receipt, proto::Ack {}).await?;
 436            self.update_contacts_for_users(&contact_user_ids).await?;
 437        } else {
 438            self.peer
 439                .respond_with_error(
 440                    receipt,
 441                    proto::Error {
 442                        message: NO_SUCH_PROJECT.to_string(),
 443                    },
 444                )
 445                .await?;
 446        }
 447
 448        Ok(())
 449    }
 450
 451    async fn unregister_worktree(
 452        mut self: Arc<Server>,
 453        request: TypedEnvelope<proto::UnregisterWorktree>,
 454    ) -> tide::Result<()> {
 455        let project_id = request.payload.project_id;
 456        let worktree_id = request.payload.worktree_id;
 457        let (worktree, guest_connection_ids) =
 458            self.state_mut()
 459                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 460
 461        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 462            self.peer.send(
 463                conn_id,
 464                proto::UnregisterWorktree {
 465                    project_id,
 466                    worktree_id,
 467                },
 468            )
 469        })
 470        .await?;
 471        self.update_contacts_for_users(&worktree.authorized_user_ids)
 472            .await?;
 473        Ok(())
 474    }
 475
 476    async fn share_worktree(
 477        mut self: Arc<Server>,
 478        mut request: TypedEnvelope<proto::ShareWorktree>,
 479    ) -> tide::Result<()> {
 480        let worktree = request
 481            .payload
 482            .worktree
 483            .as_mut()
 484            .ok_or_else(|| anyhow!("missing worktree"))?;
 485        let entries = worktree
 486            .entries
 487            .iter()
 488            .map(|entry| (entry.id, entry.clone()))
 489            .collect();
 490        let diagnostic_summaries = worktree
 491            .diagnostic_summaries
 492            .iter()
 493            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 494            .collect();
 495
 496        let shared_worktree = self.state_mut().share_worktree(
 497            request.payload.project_id,
 498            worktree.id,
 499            request.sender_id,
 500            entries,
 501            diagnostic_summaries,
 502        );
 503        if let Some(shared_worktree) = shared_worktree {
 504            broadcast(
 505                request.sender_id,
 506                shared_worktree.connection_ids,
 507                |connection_id| {
 508                    self.peer.forward_send(
 509                        request.sender_id,
 510                        connection_id,
 511                        request.payload.clone(),
 512                    )
 513                },
 514            )
 515            .await?;
 516            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 517            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
 518                .await?;
 519        } else {
 520            self.peer
 521                .respond_with_error(
 522                    request.receipt(),
 523                    proto::Error {
 524                        message: "no such worktree".to_string(),
 525                    },
 526                )
 527                .await?;
 528        }
 529        Ok(())
 530    }
 531
 532    async fn update_worktree(
 533        mut self: Arc<Server>,
 534        request: TypedEnvelope<proto::UpdateWorktree>,
 535    ) -> tide::Result<()> {
 536        let connection_ids = self
 537            .state_mut()
 538            .update_worktree(
 539                request.sender_id,
 540                request.payload.project_id,
 541                request.payload.worktree_id,
 542                &request.payload.removed_entries,
 543                &request.payload.updated_entries,
 544            )
 545            .ok_or_else(|| anyhow!("no such worktree"))?;
 546
 547        broadcast(request.sender_id, connection_ids, |connection_id| {
 548            self.peer
 549                .forward_send(request.sender_id, connection_id, request.payload.clone())
 550        })
 551        .await?;
 552
 553        Ok(())
 554    }
 555
 556    async fn update_diagnostic_summary(
 557        mut self: Arc<Server>,
 558        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 559    ) -> tide::Result<()> {
 560        let receiver_ids = request
 561            .payload
 562            .summary
 563            .clone()
 564            .and_then(|summary| {
 565                self.state_mut().update_diagnostic_summary(
 566                    request.payload.project_id,
 567                    request.payload.worktree_id,
 568                    request.sender_id,
 569                    summary,
 570                )
 571            })
 572            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 573
 574        broadcast(request.sender_id, receiver_ids, |connection_id| {
 575            self.peer
 576                .forward_send(request.sender_id, connection_id, request.payload.clone())
 577        })
 578        .await?;
 579        Ok(())
 580    }
 581
 582    async fn disk_based_diagnostics_updating(
 583        self: Arc<Server>,
 584        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 585    ) -> tide::Result<()> {
 586        let receiver_ids = self
 587            .state()
 588            .project_connection_ids(request.payload.project_id, request.sender_id)
 589            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 590        broadcast(request.sender_id, receiver_ids, |connection_id| {
 591            self.peer
 592                .forward_send(request.sender_id, connection_id, request.payload.clone())
 593        })
 594        .await?;
 595        Ok(())
 596    }
 597
 598    async fn disk_based_diagnostics_updated(
 599        self: Arc<Server>,
 600        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 601    ) -> tide::Result<()> {
 602        let receiver_ids = self
 603            .state()
 604            .project_connection_ids(request.payload.project_id, request.sender_id)
 605            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 606        broadcast(request.sender_id, receiver_ids, |connection_id| {
 607            self.peer
 608                .forward_send(request.sender_id, connection_id, request.payload.clone())
 609        })
 610        .await?;
 611        Ok(())
 612    }
 613
 614    async fn get_definition(
 615        self: Arc<Server>,
 616        request: TypedEnvelope<proto::GetDefinition>,
 617    ) -> tide::Result<()> {
 618        let receipt = request.receipt();
 619        let host_connection_id = self
 620            .state()
 621            .read_project(request.payload.project_id, request.sender_id)
 622            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 623            .host_connection_id;
 624        let response = self
 625            .peer
 626            .forward_request(request.sender_id, host_connection_id, request.payload)
 627            .await?;
 628        self.peer.respond(receipt, response).await?;
 629        Ok(())
 630    }
 631
 632    async fn open_buffer(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::OpenBuffer>,
 635    ) -> tide::Result<()> {
 636        let receipt = request.receipt();
 637        let host_connection_id = self
 638            .state()
 639            .read_project(request.payload.project_id, request.sender_id)
 640            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 641            .host_connection_id;
 642        let response = self
 643            .peer
 644            .forward_request(request.sender_id, host_connection_id, request.payload)
 645            .await?;
 646        self.peer.respond(receipt, response).await?;
 647        Ok(())
 648    }
 649
 650    async fn close_buffer(
 651        self: Arc<Server>,
 652        request: TypedEnvelope<proto::CloseBuffer>,
 653    ) -> tide::Result<()> {
 654        let host_connection_id = self
 655            .state()
 656            .read_project(request.payload.project_id, request.sender_id)
 657            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 658            .host_connection_id;
 659        self.peer
 660            .forward_send(request.sender_id, host_connection_id, request.payload)
 661            .await?;
 662        Ok(())
 663    }
 664
 665    async fn save_buffer(
 666        self: Arc<Server>,
 667        request: TypedEnvelope<proto::SaveBuffer>,
 668    ) -> tide::Result<()> {
 669        let host;
 670        let guests;
 671        {
 672            let state = self.state();
 673            let project = state
 674                .read_project(request.payload.project_id, request.sender_id)
 675                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 676            host = project.host_connection_id;
 677            guests = project.guest_connection_ids()
 678        }
 679
 680        let sender = request.sender_id;
 681        let receipt = request.receipt();
 682        let response = self
 683            .peer
 684            .forward_request(sender, host, request.payload.clone())
 685            .await?;
 686
 687        broadcast(host, guests, |conn_id| {
 688            let response = response.clone();
 689            let peer = &self.peer;
 690            async move {
 691                if conn_id == sender {
 692                    peer.respond(receipt, response).await
 693                } else {
 694                    peer.forward_send(host, conn_id, response).await
 695                }
 696            }
 697        })
 698        .await?;
 699
 700        Ok(())
 701    }
 702
 703    async fn format_buffer(
 704        self: Arc<Server>,
 705        request: TypedEnvelope<proto::FormatBuffer>,
 706    ) -> tide::Result<()> {
 707        let host;
 708        {
 709            let state = self.state();
 710            let project = state
 711                .read_project(request.payload.project_id, request.sender_id)
 712                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 713            host = project.host_connection_id;
 714        }
 715
 716        let sender = request.sender_id;
 717        let receipt = request.receipt();
 718        let response = self
 719            .peer
 720            .forward_request(sender, host, request.payload.clone())
 721            .await?;
 722        self.peer.respond(receipt, response).await?;
 723
 724        Ok(())
 725    }
 726
 727    async fn get_completions(
 728        self: Arc<Server>,
 729        request: TypedEnvelope<proto::GetCompletions>,
 730    ) -> tide::Result<()> {
 731        let host;
 732        {
 733            let state = self.state();
 734            let project = state
 735                .read_project(request.payload.project_id, request.sender_id)
 736                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 737            host = project.host_connection_id;
 738        }
 739
 740        let sender = request.sender_id;
 741        let receipt = request.receipt();
 742        let response = self
 743            .peer
 744            .forward_request(sender, host, request.payload.clone())
 745            .await?;
 746        self.peer.respond(receipt, response).await?;
 747
 748        Ok(())
 749    }
 750
 751    async fn apply_additional_edits_for_completion(
 752        self: Arc<Server>,
 753        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 754    ) -> tide::Result<()> {
 755        let host;
 756        {
 757            let state = self.state();
 758            let project = state
 759                .read_project(request.payload.project_id, request.sender_id)
 760                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 761            host = project.host_connection_id;
 762        }
 763
 764        let sender = request.sender_id;
 765        let receipt = request.receipt();
 766        let response = self
 767            .peer
 768            .forward_request(sender, host, request.payload.clone())
 769            .await?;
 770        self.peer.respond(receipt, response).await?;
 771
 772        Ok(())
 773    }
 774
 775    async fn update_buffer(
 776        self: Arc<Server>,
 777        request: TypedEnvelope<proto::UpdateBuffer>,
 778    ) -> tide::Result<()> {
 779        let receiver_ids = self
 780            .state()
 781            .project_connection_ids(request.payload.project_id, request.sender_id)
 782            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 783        broadcast(request.sender_id, receiver_ids, |connection_id| {
 784            self.peer
 785                .forward_send(request.sender_id, connection_id, request.payload.clone())
 786        })
 787        .await?;
 788        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 789        Ok(())
 790    }
 791
 792    async fn update_buffer_file(
 793        self: Arc<Server>,
 794        request: TypedEnvelope<proto::UpdateBufferFile>,
 795    ) -> tide::Result<()> {
 796        let receiver_ids = self
 797            .state()
 798            .project_connection_ids(request.payload.project_id, request.sender_id)
 799            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 800        broadcast(request.sender_id, receiver_ids, |connection_id| {
 801            self.peer
 802                .forward_send(request.sender_id, connection_id, request.payload.clone())
 803        })
 804        .await?;
 805        Ok(())
 806    }
 807
 808    async fn buffer_reloaded(
 809        self: Arc<Server>,
 810        request: TypedEnvelope<proto::BufferReloaded>,
 811    ) -> tide::Result<()> {
 812        let receiver_ids = self
 813            .state()
 814            .project_connection_ids(request.payload.project_id, request.sender_id)
 815            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 816        broadcast(request.sender_id, receiver_ids, |connection_id| {
 817            self.peer
 818                .forward_send(request.sender_id, connection_id, request.payload.clone())
 819        })
 820        .await?;
 821        Ok(())
 822    }
 823
 824    async fn buffer_saved(
 825        self: Arc<Server>,
 826        request: TypedEnvelope<proto::BufferSaved>,
 827    ) -> tide::Result<()> {
 828        let receiver_ids = self
 829            .state()
 830            .project_connection_ids(request.payload.project_id, request.sender_id)
 831            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 832        broadcast(request.sender_id, receiver_ids, |connection_id| {
 833            self.peer
 834                .forward_send(request.sender_id, connection_id, request.payload.clone())
 835        })
 836        .await?;
 837        Ok(())
 838    }
 839
 840    async fn get_channels(
 841        self: Arc<Server>,
 842        request: TypedEnvelope<proto::GetChannels>,
 843    ) -> tide::Result<()> {
 844        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 845        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 846        self.peer
 847            .respond(
 848                request.receipt(),
 849                proto::GetChannelsResponse {
 850                    channels: channels
 851                        .into_iter()
 852                        .map(|chan| proto::Channel {
 853                            id: chan.id.to_proto(),
 854                            name: chan.name,
 855                        })
 856                        .collect(),
 857                },
 858            )
 859            .await?;
 860        Ok(())
 861    }
 862
 863    async fn get_users(
 864        self: Arc<Server>,
 865        request: TypedEnvelope<proto::GetUsers>,
 866    ) -> tide::Result<()> {
 867        let receipt = request.receipt();
 868        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 869        let users = self
 870            .app_state
 871            .db
 872            .get_users_by_ids(user_ids)
 873            .await?
 874            .into_iter()
 875            .map(|user| proto::User {
 876                id: user.id.to_proto(),
 877                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 878                github_login: user.github_login,
 879            })
 880            .collect();
 881        self.peer
 882            .respond(receipt, proto::GetUsersResponse { users })
 883            .await?;
 884        Ok(())
 885    }
 886
 887    async fn update_contacts_for_users<'a>(
 888        self: &Arc<Server>,
 889        user_ids: impl IntoIterator<Item = &'a UserId>,
 890    ) -> tide::Result<()> {
 891        let mut send_futures = Vec::new();
 892
 893        {
 894            let state = self.state();
 895            for user_id in user_ids {
 896                let contacts = state.contacts_for_user(*user_id);
 897                for connection_id in state.connection_ids_for_user(*user_id) {
 898                    send_futures.push(self.peer.send(
 899                        connection_id,
 900                        proto::UpdateContacts {
 901                            contacts: contacts.clone(),
 902                        },
 903                    ));
 904                }
 905            }
 906        }
 907        futures::future::try_join_all(send_futures).await?;
 908
 909        Ok(())
 910    }
 911
 912    async fn join_channel(
 913        mut self: Arc<Self>,
 914        request: TypedEnvelope<proto::JoinChannel>,
 915    ) -> tide::Result<()> {
 916        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 917        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 918        if !self
 919            .app_state
 920            .db
 921            .can_user_access_channel(user_id, channel_id)
 922            .await?
 923        {
 924            Err(anyhow!("access denied"))?;
 925        }
 926
 927        self.state_mut().join_channel(request.sender_id, channel_id);
 928        let messages = self
 929            .app_state
 930            .db
 931            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 932            .await?
 933            .into_iter()
 934            .map(|msg| proto::ChannelMessage {
 935                id: msg.id.to_proto(),
 936                body: msg.body,
 937                timestamp: msg.sent_at.unix_timestamp() as u64,
 938                sender_id: msg.sender_id.to_proto(),
 939                nonce: Some(msg.nonce.as_u128().into()),
 940            })
 941            .collect::<Vec<_>>();
 942        self.peer
 943            .respond(
 944                request.receipt(),
 945                proto::JoinChannelResponse {
 946                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 947                    messages,
 948                },
 949            )
 950            .await?;
 951        Ok(())
 952    }
 953
 954    async fn leave_channel(
 955        mut self: Arc<Self>,
 956        request: TypedEnvelope<proto::LeaveChannel>,
 957    ) -> tide::Result<()> {
 958        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 959        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 960        if !self
 961            .app_state
 962            .db
 963            .can_user_access_channel(user_id, channel_id)
 964            .await?
 965        {
 966            Err(anyhow!("access denied"))?;
 967        }
 968
 969        self.state_mut()
 970            .leave_channel(request.sender_id, channel_id);
 971
 972        Ok(())
 973    }
 974
 975    async fn send_channel_message(
 976        self: Arc<Self>,
 977        request: TypedEnvelope<proto::SendChannelMessage>,
 978    ) -> tide::Result<()> {
 979        let receipt = request.receipt();
 980        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 981        let user_id;
 982        let connection_ids;
 983        {
 984            let state = self.state();
 985            user_id = state.user_id_for_connection(request.sender_id)?;
 986            if let Some(ids) = state.channel_connection_ids(channel_id) {
 987                connection_ids = ids;
 988            } else {
 989                return Ok(());
 990            }
 991        }
 992
 993        // Validate the message body.
 994        let body = request.payload.body.trim().to_string();
 995        if body.len() > MAX_MESSAGE_LEN {
 996            self.peer
 997                .respond_with_error(
 998                    receipt,
 999                    proto::Error {
1000                        message: "message is too long".to_string(),
1001                    },
1002                )
1003                .await?;
1004            return Ok(());
1005        }
1006        if body.is_empty() {
1007            self.peer
1008                .respond_with_error(
1009                    receipt,
1010                    proto::Error {
1011                        message: "message can't be blank".to_string(),
1012                    },
1013                )
1014                .await?;
1015            return Ok(());
1016        }
1017
1018        let timestamp = OffsetDateTime::now_utc();
1019        let nonce = if let Some(nonce) = request.payload.nonce {
1020            nonce
1021        } else {
1022            self.peer
1023                .respond_with_error(
1024                    receipt,
1025                    proto::Error {
1026                        message: "nonce can't be blank".to_string(),
1027                    },
1028                )
1029                .await?;
1030            return Ok(());
1031        };
1032
1033        let message_id = self
1034            .app_state
1035            .db
1036            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1037            .await?
1038            .to_proto();
1039        let message = proto::ChannelMessage {
1040            sender_id: user_id.to_proto(),
1041            id: message_id,
1042            body,
1043            timestamp: timestamp.unix_timestamp() as u64,
1044            nonce: Some(nonce),
1045        };
1046        broadcast(request.sender_id, connection_ids, |conn_id| {
1047            self.peer.send(
1048                conn_id,
1049                proto::ChannelMessageSent {
1050                    channel_id: channel_id.to_proto(),
1051                    message: Some(message.clone()),
1052                },
1053            )
1054        })
1055        .await?;
1056        self.peer
1057            .respond(
1058                receipt,
1059                proto::SendChannelMessageResponse {
1060                    message: Some(message),
1061                },
1062            )
1063            .await?;
1064        Ok(())
1065    }
1066
1067    async fn get_channel_messages(
1068        self: Arc<Self>,
1069        request: TypedEnvelope<proto::GetChannelMessages>,
1070    ) -> tide::Result<()> {
1071        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1072        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1073        if !self
1074            .app_state
1075            .db
1076            .can_user_access_channel(user_id, channel_id)
1077            .await?
1078        {
1079            Err(anyhow!("access denied"))?;
1080        }
1081
1082        let messages = self
1083            .app_state
1084            .db
1085            .get_channel_messages(
1086                channel_id,
1087                MESSAGE_COUNT_PER_PAGE,
1088                Some(MessageId::from_proto(request.payload.before_message_id)),
1089            )
1090            .await?
1091            .into_iter()
1092            .map(|msg| proto::ChannelMessage {
1093                id: msg.id.to_proto(),
1094                body: msg.body,
1095                timestamp: msg.sent_at.unix_timestamp() as u64,
1096                sender_id: msg.sender_id.to_proto(),
1097                nonce: Some(msg.nonce.as_u128().into()),
1098            })
1099            .collect::<Vec<_>>();
1100        self.peer
1101            .respond(
1102                request.receipt(),
1103                proto::GetChannelMessagesResponse {
1104                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1105                    messages,
1106                },
1107            )
1108            .await?;
1109        Ok(())
1110    }
1111
1112    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1113        self.store.read()
1114    }
1115
1116    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1117        self.store.write()
1118    }
1119}
1120
1121pub async fn broadcast<F, T>(
1122    sender_id: ConnectionId,
1123    receiver_ids: Vec<ConnectionId>,
1124    mut f: F,
1125) -> anyhow::Result<()>
1126where
1127    F: FnMut(ConnectionId) -> T,
1128    T: Future<Output = anyhow::Result<()>>,
1129{
1130    let futures = receiver_ids
1131        .into_iter()
1132        .filter(|id| *id != sender_id)
1133        .map(|id| f(id));
1134    futures::future::try_join_all(futures).await?;
1135    Ok(())
1136}
1137
1138pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1139    let server = Server::new(app.state().clone(), rpc.clone(), None);
1140    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1141        let server = server.clone();
1142        async move {
1143            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1144
1145            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1146            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1147            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1148            let client_protocol_version: Option<u32> = request
1149                .header("X-Zed-Protocol-Version")
1150                .and_then(|v| v.as_str().parse().ok());
1151
1152            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1153                return Ok(Response::new(StatusCode::UpgradeRequired));
1154            }
1155
1156            let header = match request.header("Sec-Websocket-Key") {
1157                Some(h) => h.as_str(),
1158                None => return Err(anyhow!("expected sec-websocket-key"))?,
1159            };
1160
1161            let user_id = process_auth_header(&request).await?;
1162
1163            let mut response = Response::new(StatusCode::SwitchingProtocols);
1164            response.insert_header(UPGRADE, "websocket");
1165            response.insert_header(CONNECTION, "Upgrade");
1166            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1167            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1168            response.insert_header("Sec-Websocket-Version", "13");
1169
1170            let http_res: &mut tide::http::Response = response.as_mut();
1171            let upgrade_receiver = http_res.recv_upgrade().await;
1172            let addr = request.remote().unwrap_or("unknown").to_string();
1173            task::spawn(async move {
1174                if let Some(stream) = upgrade_receiver.await {
1175                    server
1176                        .handle_connection(
1177                            Connection::new(
1178                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1179                            ),
1180                            addr,
1181                            user_id,
1182                            None,
1183                        )
1184                        .await;
1185                }
1186            });
1187
1188            Ok(response)
1189        }
1190    });
1191}
1192
1193fn header_contains_ignore_case<T>(
1194    request: &tide::Request<T>,
1195    header_name: HeaderName,
1196    value: &str,
1197) -> bool {
1198    request
1199        .header(header_name)
1200        .map(|h| {
1201            h.as_str()
1202                .split(',')
1203                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1204        })
1205        .unwrap_or(false)
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211    use crate::{
1212        auth,
1213        db::{tests::TestDb, UserId},
1214        github, AppState, Config,
1215    };
1216    use ::rpc::Peer;
1217    use async_std::task;
1218    use gpui::{executor, ModelHandle, TestAppContext};
1219    use parking_lot::Mutex;
1220    use postage::{mpsc, watch};
1221    use rand::prelude::*;
1222    use rpc::PeerId;
1223    use serde_json::json;
1224    use sqlx::types::time::OffsetDateTime;
1225    use std::{
1226        ops::Deref,
1227        path::Path,
1228        rc::Rc,
1229        sync::{
1230            atomic::{AtomicBool, Ordering::SeqCst},
1231            Arc,
1232        },
1233        time::Duration,
1234    };
1235    use zed::{
1236        client::{
1237            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1238            EstablishConnectionError, UserStore,
1239        },
1240        editor::{Editor, EditorSettings, Input, MultiBuffer},
1241        fs::{FakeFs, Fs as _},
1242        language::{
1243            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1244            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1245        },
1246        lsp,
1247        project::{DiagnosticSummary, Project, ProjectPath},
1248    };
1249
1250    #[gpui::test]
1251    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1252        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1253        let lang_registry = Arc::new(LanguageRegistry::new());
1254        let fs = Arc::new(FakeFs::new(cx_a.background()));
1255        cx_a.foreground().forbid_parking();
1256
1257        // Connect to a server as 2 clients.
1258        let mut server = TestServer::start(cx_a.foreground()).await;
1259        let client_a = server.create_client(&mut cx_a, "user_a").await;
1260        let client_b = server.create_client(&mut cx_b, "user_b").await;
1261
1262        // Share a project as client A
1263        fs.insert_tree(
1264            "/a",
1265            json!({
1266                ".zed.toml": r#"collaborators = ["user_b"]"#,
1267                "a.txt": "a-contents",
1268                "b.txt": "b-contents",
1269            }),
1270        )
1271        .await;
1272        let project_a = cx_a.update(|cx| {
1273            Project::local(
1274                client_a.clone(),
1275                client_a.user_store.clone(),
1276                lang_registry.clone(),
1277                fs.clone(),
1278                cx,
1279            )
1280        });
1281        let (worktree_a, _) = project_a
1282            .update(&mut cx_a, |p, cx| {
1283                p.find_or_create_local_worktree("/a", false, cx)
1284            })
1285            .await
1286            .unwrap();
1287        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1288        worktree_a
1289            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1290            .await;
1291        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1292        project_a
1293            .update(&mut cx_a, |p, cx| p.share(cx))
1294            .await
1295            .unwrap();
1296
1297        // Join that project as client B
1298        let project_b = Project::remote(
1299            project_id,
1300            client_b.clone(),
1301            client_b.user_store.clone(),
1302            lang_registry.clone(),
1303            fs.clone(),
1304            &mut cx_b.to_async(),
1305        )
1306        .await
1307        .unwrap();
1308
1309        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1310            assert_eq!(
1311                project
1312                    .collaborators()
1313                    .get(&client_a.peer_id)
1314                    .unwrap()
1315                    .user
1316                    .github_login,
1317                "user_a"
1318            );
1319            project.replica_id()
1320        });
1321        project_a
1322            .condition(&cx_a, |tree, _| {
1323                tree.collaborators()
1324                    .get(&client_b.peer_id)
1325                    .map_or(false, |collaborator| {
1326                        collaborator.replica_id == replica_id_b
1327                            && collaborator.user.github_login == "user_b"
1328                    })
1329            })
1330            .await;
1331
1332        // Open the same file as client B and client A.
1333        let buffer_b = project_b
1334            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1335            .await
1336            .unwrap();
1337        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1338        buffer_b.read_with(&cx_b, |buf, cx| {
1339            assert_eq!(buf.read(cx).text(), "b-contents")
1340        });
1341        project_a.read_with(&cx_a, |project, cx| {
1342            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1343        });
1344        let buffer_a = project_a
1345            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1346            .await
1347            .unwrap();
1348
1349        let editor_b = cx_b.add_view(window_b, |cx| {
1350            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1351        });
1352
1353        // TODO
1354        // // Create a selection set as client B and see that selection set as client A.
1355        // buffer_a
1356        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1357        //     .await;
1358
1359        // Edit the buffer as client B and see that edit as client A.
1360        editor_b.update(&mut cx_b, |editor, cx| {
1361            editor.handle_input(&Input("ok, ".into()), cx)
1362        });
1363        buffer_a
1364            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1365            .await;
1366
1367        // TODO
1368        // // Remove the selection set as client B, see those selections disappear as client A.
1369        cx_b.update(move |_| drop(editor_b));
1370        // buffer_a
1371        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1372        //     .await;
1373
1374        // Close the buffer as client A, see that the buffer is closed.
1375        cx_a.update(move |_| drop(buffer_a));
1376        project_a
1377            .condition(&cx_a, |project, cx| {
1378                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1379            })
1380            .await;
1381
1382        // Dropping the client B's project removes client B from client A's collaborators.
1383        cx_b.update(move |_| drop(project_b));
1384        project_a
1385            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1386            .await;
1387    }
1388
1389    #[gpui::test]
1390    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1391        let lang_registry = Arc::new(LanguageRegistry::new());
1392        let fs = Arc::new(FakeFs::new(cx_a.background()));
1393        cx_a.foreground().forbid_parking();
1394
1395        // Connect to a server as 2 clients.
1396        let mut server = TestServer::start(cx_a.foreground()).await;
1397        let client_a = server.create_client(&mut cx_a, "user_a").await;
1398        let client_b = server.create_client(&mut cx_b, "user_b").await;
1399
1400        // Share a project as client A
1401        fs.insert_tree(
1402            "/a",
1403            json!({
1404                ".zed.toml": r#"collaborators = ["user_b"]"#,
1405                "a.txt": "a-contents",
1406                "b.txt": "b-contents",
1407            }),
1408        )
1409        .await;
1410        let project_a = cx_a.update(|cx| {
1411            Project::local(
1412                client_a.clone(),
1413                client_a.user_store.clone(),
1414                lang_registry.clone(),
1415                fs.clone(),
1416                cx,
1417            )
1418        });
1419        let (worktree_a, _) = project_a
1420            .update(&mut cx_a, |p, cx| {
1421                p.find_or_create_local_worktree("/a", false, cx)
1422            })
1423            .await
1424            .unwrap();
1425        worktree_a
1426            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1427            .await;
1428        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1429        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1430        project_a
1431            .update(&mut cx_a, |p, cx| p.share(cx))
1432            .await
1433            .unwrap();
1434        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1435
1436        // Join that project as client B
1437        let project_b = Project::remote(
1438            project_id,
1439            client_b.clone(),
1440            client_b.user_store.clone(),
1441            lang_registry.clone(),
1442            fs.clone(),
1443            &mut cx_b.to_async(),
1444        )
1445        .await
1446        .unwrap();
1447        project_b
1448            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1449            .await
1450            .unwrap();
1451
1452        // Unshare the project as client A
1453        project_a
1454            .update(&mut cx_a, |project, cx| project.unshare(cx))
1455            .await
1456            .unwrap();
1457        project_b
1458            .condition(&mut cx_b, |project, _| project.is_read_only())
1459            .await;
1460        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1461        drop(project_b);
1462
1463        // Share the project again and ensure guests can still join.
1464        project_a
1465            .update(&mut cx_a, |project, cx| project.share(cx))
1466            .await
1467            .unwrap();
1468        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1469
1470        let project_c = Project::remote(
1471            project_id,
1472            client_b.clone(),
1473            client_b.user_store.clone(),
1474            lang_registry.clone(),
1475            fs.clone(),
1476            &mut cx_b.to_async(),
1477        )
1478        .await
1479        .unwrap();
1480        project_c
1481            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1482            .await
1483            .unwrap();
1484    }
1485
1486    #[gpui::test]
1487    async fn test_propagate_saves_and_fs_changes(
1488        mut cx_a: TestAppContext,
1489        mut cx_b: TestAppContext,
1490        mut cx_c: TestAppContext,
1491    ) {
1492        let lang_registry = Arc::new(LanguageRegistry::new());
1493        let fs = Arc::new(FakeFs::new(cx_a.background()));
1494        cx_a.foreground().forbid_parking();
1495
1496        // Connect to a server as 3 clients.
1497        let mut server = TestServer::start(cx_a.foreground()).await;
1498        let client_a = server.create_client(&mut cx_a, "user_a").await;
1499        let client_b = server.create_client(&mut cx_b, "user_b").await;
1500        let client_c = server.create_client(&mut cx_c, "user_c").await;
1501
1502        // Share a worktree as client A.
1503        fs.insert_tree(
1504            "/a",
1505            json!({
1506                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1507                "file1": "",
1508                "file2": ""
1509            }),
1510        )
1511        .await;
1512        let project_a = cx_a.update(|cx| {
1513            Project::local(
1514                client_a.clone(),
1515                client_a.user_store.clone(),
1516                lang_registry.clone(),
1517                fs.clone(),
1518                cx,
1519            )
1520        });
1521        let (worktree_a, _) = project_a
1522            .update(&mut cx_a, |p, cx| {
1523                p.find_or_create_local_worktree("/a", false, cx)
1524            })
1525            .await
1526            .unwrap();
1527        worktree_a
1528            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529            .await;
1530        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1531        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1532        project_a
1533            .update(&mut cx_a, |p, cx| p.share(cx))
1534            .await
1535            .unwrap();
1536
1537        // Join that worktree as clients B and C.
1538        let project_b = Project::remote(
1539            project_id,
1540            client_b.clone(),
1541            client_b.user_store.clone(),
1542            lang_registry.clone(),
1543            fs.clone(),
1544            &mut cx_b.to_async(),
1545        )
1546        .await
1547        .unwrap();
1548        let project_c = Project::remote(
1549            project_id,
1550            client_c.clone(),
1551            client_c.user_store.clone(),
1552            lang_registry.clone(),
1553            fs.clone(),
1554            &mut cx_c.to_async(),
1555        )
1556        .await
1557        .unwrap();
1558        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1559        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1560
1561        // Open and edit a buffer as both guests B and C.
1562        let buffer_b = project_b
1563            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1564            .await
1565            .unwrap();
1566        let buffer_c = project_c
1567            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1568            .await
1569            .unwrap();
1570        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1571        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1572
1573        // Open and edit that buffer as the host.
1574        let buffer_a = project_a
1575            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1576            .await
1577            .unwrap();
1578
1579        buffer_a
1580            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1581            .await;
1582        buffer_a.update(&mut cx_a, |buf, cx| {
1583            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1584        });
1585
1586        // Wait for edits to propagate
1587        buffer_a
1588            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1589            .await;
1590        buffer_b
1591            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1592            .await;
1593        buffer_c
1594            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1595            .await;
1596
1597        // Edit the buffer as the host and concurrently save as guest B.
1598        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1599        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1600        save_b.await.unwrap();
1601        assert_eq!(
1602            fs.load("/a/file1".as_ref()).await.unwrap(),
1603            "hi-a, i-am-c, i-am-b, i-am-a"
1604        );
1605        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1606        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1607        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1608
1609        // Make changes on host's file system, see those changes on guest worktrees.
1610        fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1611            .await
1612            .unwrap();
1613        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1614            .await
1615            .unwrap();
1616        fs.insert_file(Path::new("/a/file4"), "4".into())
1617            .await
1618            .unwrap();
1619
1620        worktree_a
1621            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1622            .await;
1623        worktree_b
1624            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1625            .await;
1626        worktree_c
1627            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1628            .await;
1629        worktree_a.read_with(&cx_a, |tree, _| {
1630            assert_eq!(
1631                tree.paths()
1632                    .map(|p| p.to_string_lossy())
1633                    .collect::<Vec<_>>(),
1634                &[".zed.toml", "file1-renamed", "file3", "file4"]
1635            )
1636        });
1637        worktree_b.read_with(&cx_b, |tree, _| {
1638            assert_eq!(
1639                tree.paths()
1640                    .map(|p| p.to_string_lossy())
1641                    .collect::<Vec<_>>(),
1642                &[".zed.toml", "file1-renamed", "file3", "file4"]
1643            )
1644        });
1645        worktree_c.read_with(&cx_c, |tree, _| {
1646            assert_eq!(
1647                tree.paths()
1648                    .map(|p| p.to_string_lossy())
1649                    .collect::<Vec<_>>(),
1650                &[".zed.toml", "file1-renamed", "file3", "file4"]
1651            )
1652        });
1653
1654        // Ensure buffer files are updated as well.
1655        buffer_a
1656            .condition(&cx_a, |buf, _| {
1657                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1658            })
1659            .await;
1660        buffer_b
1661            .condition(&cx_b, |buf, _| {
1662                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1663            })
1664            .await;
1665        buffer_c
1666            .condition(&cx_c, |buf, _| {
1667                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1668            })
1669            .await;
1670    }
1671
1672    #[gpui::test]
1673    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1674        cx_a.foreground().forbid_parking();
1675        let lang_registry = Arc::new(LanguageRegistry::new());
1676        let fs = Arc::new(FakeFs::new(cx_a.background()));
1677
1678        // Connect to a server as 2 clients.
1679        let mut server = TestServer::start(cx_a.foreground()).await;
1680        let client_a = server.create_client(&mut cx_a, "user_a").await;
1681        let client_b = server.create_client(&mut cx_b, "user_b").await;
1682
1683        // Share a project as client A
1684        fs.insert_tree(
1685            "/dir",
1686            json!({
1687                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1688                "a.txt": "a-contents",
1689            }),
1690        )
1691        .await;
1692
1693        let project_a = cx_a.update(|cx| {
1694            Project::local(
1695                client_a.clone(),
1696                client_a.user_store.clone(),
1697                lang_registry.clone(),
1698                fs.clone(),
1699                cx,
1700            )
1701        });
1702        let (worktree_a, _) = project_a
1703            .update(&mut cx_a, |p, cx| {
1704                p.find_or_create_local_worktree("/dir", false, cx)
1705            })
1706            .await
1707            .unwrap();
1708        worktree_a
1709            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1710            .await;
1711        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1712        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1713        project_a
1714            .update(&mut cx_a, |p, cx| p.share(cx))
1715            .await
1716            .unwrap();
1717
1718        // Join that project as client B
1719        let project_b = Project::remote(
1720            project_id,
1721            client_b.clone(),
1722            client_b.user_store.clone(),
1723            lang_registry.clone(),
1724            fs.clone(),
1725            &mut cx_b.to_async(),
1726        )
1727        .await
1728        .unwrap();
1729        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1730
1731        // Open a buffer as client B
1732        let buffer_b = project_b
1733            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1734            .await
1735            .unwrap();
1736        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1737
1738        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1739        buffer_b.read_with(&cx_b, |buf, _| {
1740            assert!(buf.is_dirty());
1741            assert!(!buf.has_conflict());
1742        });
1743
1744        buffer_b
1745            .update(&mut cx_b, |buf, cx| buf.save(cx))
1746            .await
1747            .unwrap();
1748        worktree_b
1749            .condition(&cx_b, |_, cx| {
1750                buffer_b.read(cx).file().unwrap().mtime() != mtime
1751            })
1752            .await;
1753        buffer_b.read_with(&cx_b, |buf, _| {
1754            assert!(!buf.is_dirty());
1755            assert!(!buf.has_conflict());
1756        });
1757
1758        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1759        buffer_b.read_with(&cx_b, |buf, _| {
1760            assert!(buf.is_dirty());
1761            assert!(!buf.has_conflict());
1762        });
1763    }
1764
1765    #[gpui::test]
1766    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1767        cx_a.foreground().forbid_parking();
1768        let lang_registry = Arc::new(LanguageRegistry::new());
1769        let fs = Arc::new(FakeFs::new(cx_a.background()));
1770
1771        // Connect to a server as 2 clients.
1772        let mut server = TestServer::start(cx_a.foreground()).await;
1773        let client_a = server.create_client(&mut cx_a, "user_a").await;
1774        let client_b = server.create_client(&mut cx_b, "user_b").await;
1775
1776        // Share a project as client A
1777        fs.insert_tree(
1778            "/dir",
1779            json!({
1780                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1781                "a.txt": "a-contents",
1782            }),
1783        )
1784        .await;
1785
1786        let project_a = cx_a.update(|cx| {
1787            Project::local(
1788                client_a.clone(),
1789                client_a.user_store.clone(),
1790                lang_registry.clone(),
1791                fs.clone(),
1792                cx,
1793            )
1794        });
1795        let (worktree_a, _) = project_a
1796            .update(&mut cx_a, |p, cx| {
1797                p.find_or_create_local_worktree("/dir", false, cx)
1798            })
1799            .await
1800            .unwrap();
1801        worktree_a
1802            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1803            .await;
1804        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1805        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1806        project_a
1807            .update(&mut cx_a, |p, cx| p.share(cx))
1808            .await
1809            .unwrap();
1810
1811        // Join that project as client B
1812        let project_b = Project::remote(
1813            project_id,
1814            client_b.clone(),
1815            client_b.user_store.clone(),
1816            lang_registry.clone(),
1817            fs.clone(),
1818            &mut cx_b.to_async(),
1819        )
1820        .await
1821        .unwrap();
1822        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1823
1824        // Open a buffer as client B
1825        let buffer_b = project_b
1826            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1827            .await
1828            .unwrap();
1829        buffer_b.read_with(&cx_b, |buf, _| {
1830            assert!(!buf.is_dirty());
1831            assert!(!buf.has_conflict());
1832        });
1833
1834        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1835            .await
1836            .unwrap();
1837        buffer_b
1838            .condition(&cx_b, |buf, _| {
1839                buf.text() == "new contents" && !buf.is_dirty()
1840            })
1841            .await;
1842        buffer_b.read_with(&cx_b, |buf, _| {
1843            assert!(!buf.has_conflict());
1844        });
1845    }
1846
1847    #[gpui::test(iterations = 100)]
1848    async fn test_editing_while_guest_opens_buffer(
1849        mut cx_a: TestAppContext,
1850        mut cx_b: TestAppContext,
1851    ) {
1852        cx_a.foreground().forbid_parking();
1853        let lang_registry = Arc::new(LanguageRegistry::new());
1854        let fs = Arc::new(FakeFs::new(cx_a.background()));
1855
1856        // Connect to a server as 2 clients.
1857        let mut server = TestServer::start(cx_a.foreground()).await;
1858        let client_a = server.create_client(&mut cx_a, "user_a").await;
1859        let client_b = server.create_client(&mut cx_b, "user_b").await;
1860
1861        // Share a project as client A
1862        fs.insert_tree(
1863            "/dir",
1864            json!({
1865                ".zed.toml": r#"collaborators = ["user_b"]"#,
1866                "a.txt": "a-contents",
1867            }),
1868        )
1869        .await;
1870        let project_a = cx_a.update(|cx| {
1871            Project::local(
1872                client_a.clone(),
1873                client_a.user_store.clone(),
1874                lang_registry.clone(),
1875                fs.clone(),
1876                cx,
1877            )
1878        });
1879        let (worktree_a, _) = project_a
1880            .update(&mut cx_a, |p, cx| {
1881                p.find_or_create_local_worktree("/dir", false, cx)
1882            })
1883            .await
1884            .unwrap();
1885        worktree_a
1886            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1887            .await;
1888        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1889        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1890        project_a
1891            .update(&mut cx_a, |p, cx| p.share(cx))
1892            .await
1893            .unwrap();
1894
1895        // Join that project as client B
1896        let project_b = Project::remote(
1897            project_id,
1898            client_b.clone(),
1899            client_b.user_store.clone(),
1900            lang_registry.clone(),
1901            fs.clone(),
1902            &mut cx_b.to_async(),
1903        )
1904        .await
1905        .unwrap();
1906
1907        // Open a buffer as client A
1908        let buffer_a = project_a
1909            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1910            .await
1911            .unwrap();
1912
1913        // Start opening the same buffer as client B
1914        let buffer_b = cx_b
1915            .background()
1916            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1917        task::yield_now().await;
1918
1919        // Edit the buffer as client A while client B is still opening it.
1920        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1921
1922        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1923        let buffer_b = buffer_b.await.unwrap();
1924        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1925    }
1926
1927    #[gpui::test]
1928    async fn test_leaving_worktree_while_opening_buffer(
1929        mut cx_a: TestAppContext,
1930        mut cx_b: TestAppContext,
1931    ) {
1932        cx_a.foreground().forbid_parking();
1933        let lang_registry = Arc::new(LanguageRegistry::new());
1934        let fs = Arc::new(FakeFs::new(cx_a.background()));
1935
1936        // Connect to a server as 2 clients.
1937        let mut server = TestServer::start(cx_a.foreground()).await;
1938        let client_a = server.create_client(&mut cx_a, "user_a").await;
1939        let client_b = server.create_client(&mut cx_b, "user_b").await;
1940
1941        // Share a project as client A
1942        fs.insert_tree(
1943            "/dir",
1944            json!({
1945                ".zed.toml": r#"collaborators = ["user_b"]"#,
1946                "a.txt": "a-contents",
1947            }),
1948        )
1949        .await;
1950        let project_a = cx_a.update(|cx| {
1951            Project::local(
1952                client_a.clone(),
1953                client_a.user_store.clone(),
1954                lang_registry.clone(),
1955                fs.clone(),
1956                cx,
1957            )
1958        });
1959        let (worktree_a, _) = project_a
1960            .update(&mut cx_a, |p, cx| {
1961                p.find_or_create_local_worktree("/dir", false, cx)
1962            })
1963            .await
1964            .unwrap();
1965        worktree_a
1966            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1967            .await;
1968        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1969        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1970        project_a
1971            .update(&mut cx_a, |p, cx| p.share(cx))
1972            .await
1973            .unwrap();
1974
1975        // Join that project as client B
1976        let project_b = Project::remote(
1977            project_id,
1978            client_b.clone(),
1979            client_b.user_store.clone(),
1980            lang_registry.clone(),
1981            fs.clone(),
1982            &mut cx_b.to_async(),
1983        )
1984        .await
1985        .unwrap();
1986
1987        // See that a guest has joined as client A.
1988        project_a
1989            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1990            .await;
1991
1992        // Begin opening a buffer as client B, but leave the project before the open completes.
1993        let buffer_b = cx_b
1994            .background()
1995            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1996        cx_b.update(|_| drop(project_b));
1997        drop(buffer_b);
1998
1999        // See that the guest has left.
2000        project_a
2001            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2002            .await;
2003    }
2004
2005    #[gpui::test]
2006    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2007        cx_a.foreground().forbid_parking();
2008        let lang_registry = Arc::new(LanguageRegistry::new());
2009        let fs = Arc::new(FakeFs::new(cx_a.background()));
2010
2011        // Connect to a server as 2 clients.
2012        let mut server = TestServer::start(cx_a.foreground()).await;
2013        let client_a = server.create_client(&mut cx_a, "user_a").await;
2014        let client_b = server.create_client(&mut cx_b, "user_b").await;
2015
2016        // Share a project as client A
2017        fs.insert_tree(
2018            "/a",
2019            json!({
2020                ".zed.toml": r#"collaborators = ["user_b"]"#,
2021                "a.txt": "a-contents",
2022                "b.txt": "b-contents",
2023            }),
2024        )
2025        .await;
2026        let project_a = cx_a.update(|cx| {
2027            Project::local(
2028                client_a.clone(),
2029                client_a.user_store.clone(),
2030                lang_registry.clone(),
2031                fs.clone(),
2032                cx,
2033            )
2034        });
2035        let (worktree_a, _) = project_a
2036            .update(&mut cx_a, |p, cx| {
2037                p.find_or_create_local_worktree("/a", false, cx)
2038            })
2039            .await
2040            .unwrap();
2041        worktree_a
2042            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2043            .await;
2044        let project_id = project_a
2045            .update(&mut cx_a, |project, _| project.next_remote_id())
2046            .await;
2047        project_a
2048            .update(&mut cx_a, |project, cx| project.share(cx))
2049            .await
2050            .unwrap();
2051
2052        // Join that project as client B
2053        let _project_b = Project::remote(
2054            project_id,
2055            client_b.clone(),
2056            client_b.user_store.clone(),
2057            lang_registry.clone(),
2058            fs.clone(),
2059            &mut cx_b.to_async(),
2060        )
2061        .await
2062        .unwrap();
2063
2064        // See that a guest has joined as client A.
2065        project_a
2066            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2067            .await;
2068
2069        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2070        client_b.disconnect(&cx_b.to_async()).unwrap();
2071        project_a
2072            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2073            .await;
2074    }
2075
2076    #[gpui::test]
2077    async fn test_collaborating_with_diagnostics(
2078        mut cx_a: TestAppContext,
2079        mut cx_b: TestAppContext,
2080    ) {
2081        cx_a.foreground().forbid_parking();
2082        let mut lang_registry = Arc::new(LanguageRegistry::new());
2083        let fs = Arc::new(FakeFs::new(cx_a.background()));
2084
2085        // Set up a fake language server.
2086        let (language_server_config, mut fake_language_server) =
2087            LanguageServerConfig::fake(cx_a.background()).await;
2088        Arc::get_mut(&mut lang_registry)
2089            .unwrap()
2090            .add(Arc::new(Language::new(
2091                LanguageConfig {
2092                    name: "Rust".to_string(),
2093                    path_suffixes: vec!["rs".to_string()],
2094                    language_server: Some(language_server_config),
2095                    ..Default::default()
2096                },
2097                Some(tree_sitter_rust::language()),
2098            )));
2099
2100        // Connect to a server as 2 clients.
2101        let mut server = TestServer::start(cx_a.foreground()).await;
2102        let client_a = server.create_client(&mut cx_a, "user_a").await;
2103        let client_b = server.create_client(&mut cx_b, "user_b").await;
2104
2105        // Share a project as client A
2106        fs.insert_tree(
2107            "/a",
2108            json!({
2109                ".zed.toml": r#"collaborators = ["user_b"]"#,
2110                "a.rs": "let one = two",
2111                "other.rs": "",
2112            }),
2113        )
2114        .await;
2115        let project_a = cx_a.update(|cx| {
2116            Project::local(
2117                client_a.clone(),
2118                client_a.user_store.clone(),
2119                lang_registry.clone(),
2120                fs.clone(),
2121                cx,
2122            )
2123        });
2124        let (worktree_a, _) = project_a
2125            .update(&mut cx_a, |p, cx| {
2126                p.find_or_create_local_worktree("/a", false, cx)
2127            })
2128            .await
2129            .unwrap();
2130        worktree_a
2131            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2132            .await;
2133        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2134        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2135        project_a
2136            .update(&mut cx_a, |p, cx| p.share(cx))
2137            .await
2138            .unwrap();
2139
2140        // Cause the language server to start.
2141        let _ = cx_a
2142            .background()
2143            .spawn(project_a.update(&mut cx_a, |project, cx| {
2144                project.open_buffer(
2145                    ProjectPath {
2146                        worktree_id,
2147                        path: Path::new("other.rs").into(),
2148                    },
2149                    cx,
2150                )
2151            }))
2152            .await
2153            .unwrap();
2154
2155        // Simulate a language server reporting errors for a file.
2156        fake_language_server
2157            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2158                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2159                version: None,
2160                diagnostics: vec![lsp::Diagnostic {
2161                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2162                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2163                    message: "message 1".to_string(),
2164                    ..Default::default()
2165                }],
2166            })
2167            .await;
2168
2169        // Wait for server to see the diagnostics update.
2170        server
2171            .condition(|store| {
2172                let worktree = store
2173                    .project(project_id)
2174                    .unwrap()
2175                    .worktrees
2176                    .get(&worktree_id.to_proto())
2177                    .unwrap();
2178
2179                !worktree
2180                    .share
2181                    .as_ref()
2182                    .unwrap()
2183                    .diagnostic_summaries
2184                    .is_empty()
2185            })
2186            .await;
2187
2188        // Join the worktree as client B.
2189        let project_b = Project::remote(
2190            project_id,
2191            client_b.clone(),
2192            client_b.user_store.clone(),
2193            lang_registry.clone(),
2194            fs.clone(),
2195            &mut cx_b.to_async(),
2196        )
2197        .await
2198        .unwrap();
2199
2200        project_b.read_with(&cx_b, |project, cx| {
2201            assert_eq!(
2202                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2203                &[(
2204                    ProjectPath {
2205                        worktree_id,
2206                        path: Arc::from(Path::new("a.rs")),
2207                    },
2208                    DiagnosticSummary {
2209                        error_count: 1,
2210                        warning_count: 0,
2211                        ..Default::default()
2212                    },
2213                )]
2214            )
2215        });
2216
2217        // Simulate a language server reporting more errors for a file.
2218        fake_language_server
2219            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2220                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2221                version: None,
2222                diagnostics: vec![
2223                    lsp::Diagnostic {
2224                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2225                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2226                        message: "message 1".to_string(),
2227                        ..Default::default()
2228                    },
2229                    lsp::Diagnostic {
2230                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2231                        range: lsp::Range::new(
2232                            lsp::Position::new(0, 10),
2233                            lsp::Position::new(0, 13),
2234                        ),
2235                        message: "message 2".to_string(),
2236                        ..Default::default()
2237                    },
2238                ],
2239            })
2240            .await;
2241
2242        // Client b gets the updated summaries
2243        project_b
2244            .condition(&cx_b, |project, cx| {
2245                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2246                    == &[(
2247                        ProjectPath {
2248                            worktree_id,
2249                            path: Arc::from(Path::new("a.rs")),
2250                        },
2251                        DiagnosticSummary {
2252                            error_count: 1,
2253                            warning_count: 1,
2254                            ..Default::default()
2255                        },
2256                    )]
2257            })
2258            .await;
2259
2260        // Open the file with the errors on client B. They should be present.
2261        let buffer_b = cx_b
2262            .background()
2263            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2264            .await
2265            .unwrap();
2266
2267        buffer_b.read_with(&cx_b, |buffer, _| {
2268            assert_eq!(
2269                buffer
2270                    .snapshot()
2271                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2272                    .map(|entry| entry)
2273                    .collect::<Vec<_>>(),
2274                &[
2275                    DiagnosticEntry {
2276                        range: Point::new(0, 4)..Point::new(0, 7),
2277                        diagnostic: Diagnostic {
2278                            group_id: 0,
2279                            message: "message 1".to_string(),
2280                            severity: lsp::DiagnosticSeverity::ERROR,
2281                            is_primary: true,
2282                            ..Default::default()
2283                        }
2284                    },
2285                    DiagnosticEntry {
2286                        range: Point::new(0, 10)..Point::new(0, 13),
2287                        diagnostic: Diagnostic {
2288                            group_id: 1,
2289                            severity: lsp::DiagnosticSeverity::WARNING,
2290                            message: "message 2".to_string(),
2291                            is_primary: true,
2292                            ..Default::default()
2293                        }
2294                    }
2295                ]
2296            );
2297        });
2298    }
2299
2300    #[gpui::test]
2301    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2302        cx_a.foreground().forbid_parking();
2303        let mut lang_registry = Arc::new(LanguageRegistry::new());
2304        let fs = Arc::new(FakeFs::new(cx_a.background()));
2305
2306        // Set up a fake language server.
2307        let (language_server_config, mut fake_language_server) =
2308            LanguageServerConfig::fake(cx_a.background()).await;
2309        Arc::get_mut(&mut lang_registry)
2310            .unwrap()
2311            .add(Arc::new(Language::new(
2312                LanguageConfig {
2313                    name: "Rust".to_string(),
2314                    path_suffixes: vec!["rs".to_string()],
2315                    language_server: Some(language_server_config),
2316                    ..Default::default()
2317                },
2318                Some(tree_sitter_rust::language()),
2319            )));
2320
2321        // Connect to a server as 2 clients.
2322        let mut server = TestServer::start(cx_a.foreground()).await;
2323        let client_a = server.create_client(&mut cx_a, "user_a").await;
2324        let client_b = server.create_client(&mut cx_b, "user_b").await;
2325
2326        // Share a project as client A
2327        fs.insert_tree(
2328            "/a",
2329            json!({
2330                ".zed.toml": r#"collaborators = ["user_b"]"#,
2331                "a.rs": "let one = two",
2332            }),
2333        )
2334        .await;
2335        let project_a = cx_a.update(|cx| {
2336            Project::local(
2337                client_a.clone(),
2338                client_a.user_store.clone(),
2339                lang_registry.clone(),
2340                fs.clone(),
2341                cx,
2342            )
2343        });
2344        let (worktree_a, _) = project_a
2345            .update(&mut cx_a, |p, cx| {
2346                p.find_or_create_local_worktree("/a", false, cx)
2347            })
2348            .await
2349            .unwrap();
2350        worktree_a
2351            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2352            .await;
2353        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2354        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2355        project_a
2356            .update(&mut cx_a, |p, cx| p.share(cx))
2357            .await
2358            .unwrap();
2359
2360        // Join the worktree as client B.
2361        let project_b = Project::remote(
2362            project_id,
2363            client_b.clone(),
2364            client_b.user_store.clone(),
2365            lang_registry.clone(),
2366            fs.clone(),
2367            &mut cx_b.to_async(),
2368        )
2369        .await
2370        .unwrap();
2371
2372        let buffer_b = cx_b
2373            .background()
2374            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2375            .await
2376            .unwrap();
2377
2378        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2379        let (request_id, _) = fake_language_server
2380            .receive_request::<lsp::request::Formatting>()
2381            .await;
2382        fake_language_server
2383            .respond(
2384                request_id,
2385                Some(vec![
2386                    lsp::TextEdit {
2387                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2388                        new_text: "h".to_string(),
2389                    },
2390                    lsp::TextEdit {
2391                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2392                        new_text: "y".to_string(),
2393                    },
2394                ]),
2395            )
2396            .await;
2397        format.await.unwrap();
2398        assert_eq!(
2399            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2400            "let honey = two"
2401        );
2402    }
2403
2404    #[gpui::test]
2405    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2406        cx_a.foreground().forbid_parking();
2407        let mut lang_registry = Arc::new(LanguageRegistry::new());
2408        let fs = Arc::new(FakeFs::new(cx_a.background()));
2409        fs.insert_tree(
2410            "/root-1",
2411            json!({
2412                ".zed.toml": r#"collaborators = ["user_b"]"#,
2413                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2414            }),
2415        )
2416        .await;
2417        fs.insert_tree(
2418            "/root-2",
2419            json!({
2420                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2421            }),
2422        )
2423        .await;
2424
2425        // Set up a fake language server.
2426        let (language_server_config, mut fake_language_server) =
2427            LanguageServerConfig::fake(cx_a.background()).await;
2428        Arc::get_mut(&mut lang_registry)
2429            .unwrap()
2430            .add(Arc::new(Language::new(
2431                LanguageConfig {
2432                    name: "Rust".to_string(),
2433                    path_suffixes: vec!["rs".to_string()],
2434                    language_server: Some(language_server_config),
2435                    ..Default::default()
2436                },
2437                Some(tree_sitter_rust::language()),
2438            )));
2439
2440        // Connect to a server as 2 clients.
2441        let mut server = TestServer::start(cx_a.foreground()).await;
2442        let client_a = server.create_client(&mut cx_a, "user_a").await;
2443        let client_b = server.create_client(&mut cx_b, "user_b").await;
2444
2445        // Share a project as client A
2446        let project_a = cx_a.update(|cx| {
2447            Project::local(
2448                client_a.clone(),
2449                client_a.user_store.clone(),
2450                lang_registry.clone(),
2451                fs.clone(),
2452                cx,
2453            )
2454        });
2455        let (worktree_a, _) = project_a
2456            .update(&mut cx_a, |p, cx| {
2457                p.find_or_create_local_worktree("/root-1", false, cx)
2458            })
2459            .await
2460            .unwrap();
2461        worktree_a
2462            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2463            .await;
2464        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2465        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2466        project_a
2467            .update(&mut cx_a, |p, cx| p.share(cx))
2468            .await
2469            .unwrap();
2470
2471        // Join the worktree as client B.
2472        let project_b = Project::remote(
2473            project_id,
2474            client_b.clone(),
2475            client_b.user_store.clone(),
2476            lang_registry.clone(),
2477            fs.clone(),
2478            &mut cx_b.to_async(),
2479        )
2480        .await
2481        .unwrap();
2482
2483        // Open the file to be formatted on client B.
2484        let buffer_b = cx_b
2485            .background()
2486            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2487            .await
2488            .unwrap();
2489
2490        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2491        let (request_id, _) = fake_language_server
2492            .receive_request::<lsp::request::GotoDefinition>()
2493            .await;
2494        fake_language_server
2495            .respond(
2496                request_id,
2497                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2498                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2499                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2500                ))),
2501            )
2502            .await;
2503        let definitions_1 = definitions_1.await.unwrap();
2504        cx_b.read(|cx| {
2505            assert_eq!(definitions_1.len(), 1);
2506            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2507            let target_buffer = definitions_1[0].target_buffer.read(cx);
2508            assert_eq!(
2509                target_buffer.text(),
2510                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2511            );
2512            assert_eq!(
2513                definitions_1[0].target_range.to_point(target_buffer),
2514                Point::new(0, 6)..Point::new(0, 9)
2515            );
2516        });
2517
2518        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2519        // the previous call to `definition`.
2520        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2521        let (request_id, _) = fake_language_server
2522            .receive_request::<lsp::request::GotoDefinition>()
2523            .await;
2524        fake_language_server
2525            .respond(
2526                request_id,
2527                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2528                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2529                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2530                ))),
2531            )
2532            .await;
2533        let definitions_2 = definitions_2.await.unwrap();
2534        cx_b.read(|cx| {
2535            assert_eq!(definitions_2.len(), 1);
2536            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2537            let target_buffer = definitions_2[0].target_buffer.read(cx);
2538            assert_eq!(
2539                target_buffer.text(),
2540                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2541            );
2542            assert_eq!(
2543                definitions_2[0].target_range.to_point(target_buffer),
2544                Point::new(1, 6)..Point::new(1, 11)
2545            );
2546        });
2547        assert_eq!(
2548            definitions_1[0].target_buffer,
2549            definitions_2[0].target_buffer
2550        );
2551
2552        cx_b.update(|_| {
2553            drop(definitions_1);
2554            drop(definitions_2);
2555        });
2556        project_b
2557            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2558            .await;
2559    }
2560
2561    #[gpui::test]
2562    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2563        mut cx_a: TestAppContext,
2564        mut cx_b: TestAppContext,
2565        mut rng: StdRng,
2566    ) {
2567        cx_a.foreground().forbid_parking();
2568        let mut lang_registry = Arc::new(LanguageRegistry::new());
2569        let fs = Arc::new(FakeFs::new(cx_a.background()));
2570        fs.insert_tree(
2571            "/root",
2572            json!({
2573                ".zed.toml": r#"collaborators = ["user_b"]"#,
2574                "a.rs": "const ONE: usize = b::TWO;",
2575                "b.rs": "const TWO: usize = 2",
2576            }),
2577        )
2578        .await;
2579
2580        // Set up a fake language server.
2581        let (language_server_config, mut fake_language_server) =
2582            LanguageServerConfig::fake(cx_a.background()).await;
2583        Arc::get_mut(&mut lang_registry)
2584            .unwrap()
2585            .add(Arc::new(Language::new(
2586                LanguageConfig {
2587                    name: "Rust".to_string(),
2588                    path_suffixes: vec!["rs".to_string()],
2589                    language_server: Some(language_server_config),
2590                    ..Default::default()
2591                },
2592                Some(tree_sitter_rust::language()),
2593            )));
2594
2595        // Connect to a server as 2 clients.
2596        let mut server = TestServer::start(cx_a.foreground()).await;
2597        let client_a = server.create_client(&mut cx_a, "user_a").await;
2598        let client_b = server.create_client(&mut cx_b, "user_b").await;
2599
2600        // Share a project as client A
2601        let project_a = cx_a.update(|cx| {
2602            Project::local(
2603                client_a.clone(),
2604                client_a.user_store.clone(),
2605                lang_registry.clone(),
2606                fs.clone(),
2607                cx,
2608            )
2609        });
2610        let (worktree_a, _) = project_a
2611            .update(&mut cx_a, |p, cx| {
2612                p.find_or_create_local_worktree("/root", false, cx)
2613            })
2614            .await
2615            .unwrap();
2616        worktree_a
2617            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2618            .await;
2619        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2620        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2621        project_a
2622            .update(&mut cx_a, |p, cx| p.share(cx))
2623            .await
2624            .unwrap();
2625
2626        // Join the worktree as client B.
2627        let project_b = Project::remote(
2628            project_id,
2629            client_b.clone(),
2630            client_b.user_store.clone(),
2631            lang_registry.clone(),
2632            fs.clone(),
2633            &mut cx_b.to_async(),
2634        )
2635        .await
2636        .unwrap();
2637
2638        let buffer_b1 = cx_b
2639            .background()
2640            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2641            .await
2642            .unwrap();
2643
2644        let definitions;
2645        let buffer_b2;
2646        if rng.gen() {
2647            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2648            buffer_b2 =
2649                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2650        } else {
2651            buffer_b2 =
2652                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2653            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2654        }
2655
2656        let (request_id, _) = fake_language_server
2657            .receive_request::<lsp::request::GotoDefinition>()
2658            .await;
2659        fake_language_server
2660            .respond(
2661                request_id,
2662                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2663                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2664                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2665                ))),
2666            )
2667            .await;
2668
2669        let buffer_b2 = buffer_b2.await.unwrap();
2670        let definitions = definitions.await.unwrap();
2671        assert_eq!(definitions.len(), 1);
2672        assert_eq!(definitions[0].target_buffer, buffer_b2);
2673    }
2674
2675    #[gpui::test]
2676    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2677        cx_a.foreground().forbid_parking();
2678
2679        // Connect to a server as 2 clients.
2680        let mut server = TestServer::start(cx_a.foreground()).await;
2681        let client_a = server.create_client(&mut cx_a, "user_a").await;
2682        let client_b = server.create_client(&mut cx_b, "user_b").await;
2683
2684        // Create an org that includes these 2 users.
2685        let db = &server.app_state.db;
2686        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2687        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2688            .await
2689            .unwrap();
2690        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2691            .await
2692            .unwrap();
2693
2694        // Create a channel that includes all the users.
2695        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2696        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2697            .await
2698            .unwrap();
2699        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2700            .await
2701            .unwrap();
2702        db.create_channel_message(
2703            channel_id,
2704            client_b.current_user_id(&cx_b),
2705            "hello A, it's B.",
2706            OffsetDateTime::now_utc(),
2707            1,
2708        )
2709        .await
2710        .unwrap();
2711
2712        let channels_a = cx_a
2713            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2714        channels_a
2715            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2716            .await;
2717        channels_a.read_with(&cx_a, |list, _| {
2718            assert_eq!(
2719                list.available_channels().unwrap(),
2720                &[ChannelDetails {
2721                    id: channel_id.to_proto(),
2722                    name: "test-channel".to_string()
2723                }]
2724            )
2725        });
2726        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2727            this.get_channel(channel_id.to_proto(), cx).unwrap()
2728        });
2729        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2730        channel_a
2731            .condition(&cx_a, |channel, _| {
2732                channel_messages(channel)
2733                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2734            })
2735            .await;
2736
2737        let channels_b = cx_b
2738            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2739        channels_b
2740            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2741            .await;
2742        channels_b.read_with(&cx_b, |list, _| {
2743            assert_eq!(
2744                list.available_channels().unwrap(),
2745                &[ChannelDetails {
2746                    id: channel_id.to_proto(),
2747                    name: "test-channel".to_string()
2748                }]
2749            )
2750        });
2751
2752        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2753            this.get_channel(channel_id.to_proto(), cx).unwrap()
2754        });
2755        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2756        channel_b
2757            .condition(&cx_b, |channel, _| {
2758                channel_messages(channel)
2759                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2760            })
2761            .await;
2762
2763        channel_a
2764            .update(&mut cx_a, |channel, cx| {
2765                channel
2766                    .send_message("oh, hi B.".to_string(), cx)
2767                    .unwrap()
2768                    .detach();
2769                let task = channel.send_message("sup".to_string(), cx).unwrap();
2770                assert_eq!(
2771                    channel_messages(channel),
2772                    &[
2773                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2774                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2775                        ("user_a".to_string(), "sup".to_string(), true)
2776                    ]
2777                );
2778                task
2779            })
2780            .await
2781            .unwrap();
2782
2783        channel_b
2784            .condition(&cx_b, |channel, _| {
2785                channel_messages(channel)
2786                    == [
2787                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2788                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2789                        ("user_a".to_string(), "sup".to_string(), false),
2790                    ]
2791            })
2792            .await;
2793
2794        assert_eq!(
2795            server
2796                .state()
2797                .await
2798                .channel(channel_id)
2799                .unwrap()
2800                .connection_ids
2801                .len(),
2802            2
2803        );
2804        cx_b.update(|_| drop(channel_b));
2805        server
2806            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2807            .await;
2808
2809        cx_a.update(|_| drop(channel_a));
2810        server
2811            .condition(|state| state.channel(channel_id).is_none())
2812            .await;
2813    }
2814
2815    #[gpui::test]
2816    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2817        cx_a.foreground().forbid_parking();
2818
2819        let mut server = TestServer::start(cx_a.foreground()).await;
2820        let client_a = server.create_client(&mut cx_a, "user_a").await;
2821
2822        let db = &server.app_state.db;
2823        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2824        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2825        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2826            .await
2827            .unwrap();
2828        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2829            .await
2830            .unwrap();
2831
2832        let channels_a = cx_a
2833            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2834        channels_a
2835            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2836            .await;
2837        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2838            this.get_channel(channel_id.to_proto(), cx).unwrap()
2839        });
2840
2841        // Messages aren't allowed to be too long.
2842        channel_a
2843            .update(&mut cx_a, |channel, cx| {
2844                let long_body = "this is long.\n".repeat(1024);
2845                channel.send_message(long_body, cx).unwrap()
2846            })
2847            .await
2848            .unwrap_err();
2849
2850        // Messages aren't allowed to be blank.
2851        channel_a.update(&mut cx_a, |channel, cx| {
2852            channel.send_message(String::new(), cx).unwrap_err()
2853        });
2854
2855        // Leading and trailing whitespace are trimmed.
2856        channel_a
2857            .update(&mut cx_a, |channel, cx| {
2858                channel
2859                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2860                    .unwrap()
2861            })
2862            .await
2863            .unwrap();
2864        assert_eq!(
2865            db.get_channel_messages(channel_id, 10, None)
2866                .await
2867                .unwrap()
2868                .iter()
2869                .map(|m| &m.body)
2870                .collect::<Vec<_>>(),
2871            &["surrounded by whitespace"]
2872        );
2873    }
2874
2875    #[gpui::test]
2876    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2877        cx_a.foreground().forbid_parking();
2878
2879        // Connect to a server as 2 clients.
2880        let mut server = TestServer::start(cx_a.foreground()).await;
2881        let client_a = server.create_client(&mut cx_a, "user_a").await;
2882        let client_b = server.create_client(&mut cx_b, "user_b").await;
2883        let mut status_b = client_b.status();
2884
2885        // Create an org that includes these 2 users.
2886        let db = &server.app_state.db;
2887        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2888        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2889            .await
2890            .unwrap();
2891        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2892            .await
2893            .unwrap();
2894
2895        // Create a channel that includes all the users.
2896        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2897        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2898            .await
2899            .unwrap();
2900        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2901            .await
2902            .unwrap();
2903        db.create_channel_message(
2904            channel_id,
2905            client_b.current_user_id(&cx_b),
2906            "hello A, it's B.",
2907            OffsetDateTime::now_utc(),
2908            2,
2909        )
2910        .await
2911        .unwrap();
2912
2913        let channels_a = cx_a
2914            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2915        channels_a
2916            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2917            .await;
2918
2919        channels_a.read_with(&cx_a, |list, _| {
2920            assert_eq!(
2921                list.available_channels().unwrap(),
2922                &[ChannelDetails {
2923                    id: channel_id.to_proto(),
2924                    name: "test-channel".to_string()
2925                }]
2926            )
2927        });
2928        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2929            this.get_channel(channel_id.to_proto(), cx).unwrap()
2930        });
2931        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2932        channel_a
2933            .condition(&cx_a, |channel, _| {
2934                channel_messages(channel)
2935                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2936            })
2937            .await;
2938
2939        let channels_b = cx_b
2940            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2941        channels_b
2942            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2943            .await;
2944        channels_b.read_with(&cx_b, |list, _| {
2945            assert_eq!(
2946                list.available_channels().unwrap(),
2947                &[ChannelDetails {
2948                    id: channel_id.to_proto(),
2949                    name: "test-channel".to_string()
2950                }]
2951            )
2952        });
2953
2954        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2955            this.get_channel(channel_id.to_proto(), cx).unwrap()
2956        });
2957        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2958        channel_b
2959            .condition(&cx_b, |channel, _| {
2960                channel_messages(channel)
2961                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2962            })
2963            .await;
2964
2965        // Disconnect client B, ensuring we can still access its cached channel data.
2966        server.forbid_connections();
2967        server.disconnect_client(client_b.current_user_id(&cx_b));
2968        while !matches!(
2969            status_b.next().await,
2970            Some(client::Status::ReconnectionError { .. })
2971        ) {}
2972
2973        channels_b.read_with(&cx_b, |channels, _| {
2974            assert_eq!(
2975                channels.available_channels().unwrap(),
2976                [ChannelDetails {
2977                    id: channel_id.to_proto(),
2978                    name: "test-channel".to_string()
2979                }]
2980            )
2981        });
2982        channel_b.read_with(&cx_b, |channel, _| {
2983            assert_eq!(
2984                channel_messages(channel),
2985                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2986            )
2987        });
2988
2989        // Send a message from client B while it is disconnected.
2990        channel_b
2991            .update(&mut cx_b, |channel, cx| {
2992                let task = channel
2993                    .send_message("can you see this?".to_string(), cx)
2994                    .unwrap();
2995                assert_eq!(
2996                    channel_messages(channel),
2997                    &[
2998                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2999                        ("user_b".to_string(), "can you see this?".to_string(), true)
3000                    ]
3001                );
3002                task
3003            })
3004            .await
3005            .unwrap_err();
3006
3007        // Send a message from client A while B is disconnected.
3008        channel_a
3009            .update(&mut cx_a, |channel, cx| {
3010                channel
3011                    .send_message("oh, hi B.".to_string(), cx)
3012                    .unwrap()
3013                    .detach();
3014                let task = channel.send_message("sup".to_string(), cx).unwrap();
3015                assert_eq!(
3016                    channel_messages(channel),
3017                    &[
3018                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3019                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3020                        ("user_a".to_string(), "sup".to_string(), true)
3021                    ]
3022                );
3023                task
3024            })
3025            .await
3026            .unwrap();
3027
3028        // Give client B a chance to reconnect.
3029        server.allow_connections();
3030        cx_b.foreground().advance_clock(Duration::from_secs(10));
3031
3032        // Verify that B sees the new messages upon reconnection, as well as the message client B
3033        // sent while offline.
3034        channel_b
3035            .condition(&cx_b, |channel, _| {
3036                channel_messages(channel)
3037                    == [
3038                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3039                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3040                        ("user_a".to_string(), "sup".to_string(), false),
3041                        ("user_b".to_string(), "can you see this?".to_string(), false),
3042                    ]
3043            })
3044            .await;
3045
3046        // Ensure client A and B can communicate normally after reconnection.
3047        channel_a
3048            .update(&mut cx_a, |channel, cx| {
3049                channel.send_message("you online?".to_string(), cx).unwrap()
3050            })
3051            .await
3052            .unwrap();
3053        channel_b
3054            .condition(&cx_b, |channel, _| {
3055                channel_messages(channel)
3056                    == [
3057                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3058                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3059                        ("user_a".to_string(), "sup".to_string(), false),
3060                        ("user_b".to_string(), "can you see this?".to_string(), false),
3061                        ("user_a".to_string(), "you online?".to_string(), false),
3062                    ]
3063            })
3064            .await;
3065
3066        channel_b
3067            .update(&mut cx_b, |channel, cx| {
3068                channel.send_message("yep".to_string(), cx).unwrap()
3069            })
3070            .await
3071            .unwrap();
3072        channel_a
3073            .condition(&cx_a, |channel, _| {
3074                channel_messages(channel)
3075                    == [
3076                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3077                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3078                        ("user_a".to_string(), "sup".to_string(), false),
3079                        ("user_b".to_string(), "can you see this?".to_string(), false),
3080                        ("user_a".to_string(), "you online?".to_string(), false),
3081                        ("user_b".to_string(), "yep".to_string(), false),
3082                    ]
3083            })
3084            .await;
3085    }
3086
3087    #[gpui::test]
3088    async fn test_contacts(
3089        mut cx_a: TestAppContext,
3090        mut cx_b: TestAppContext,
3091        mut cx_c: TestAppContext,
3092    ) {
3093        cx_a.foreground().forbid_parking();
3094        let lang_registry = Arc::new(LanguageRegistry::new());
3095        let fs = Arc::new(FakeFs::new(cx_a.background()));
3096
3097        // Connect to a server as 3 clients.
3098        let mut server = TestServer::start(cx_a.foreground()).await;
3099        let client_a = server.create_client(&mut cx_a, "user_a").await;
3100        let client_b = server.create_client(&mut cx_b, "user_b").await;
3101        let client_c = server.create_client(&mut cx_c, "user_c").await;
3102
3103        // Share a worktree as client A.
3104        fs.insert_tree(
3105            "/a",
3106            json!({
3107                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3108            }),
3109        )
3110        .await;
3111
3112        let project_a = cx_a.update(|cx| {
3113            Project::local(
3114                client_a.clone(),
3115                client_a.user_store.clone(),
3116                lang_registry.clone(),
3117                fs.clone(),
3118                cx,
3119            )
3120        });
3121        let (worktree_a, _) = project_a
3122            .update(&mut cx_a, |p, cx| {
3123                p.find_or_create_local_worktree("/a", false, cx)
3124            })
3125            .await
3126            .unwrap();
3127        worktree_a
3128            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3129            .await;
3130
3131        client_a
3132            .user_store
3133            .condition(&cx_a, |user_store, _| {
3134                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3135            })
3136            .await;
3137        client_b
3138            .user_store
3139            .condition(&cx_b, |user_store, _| {
3140                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3141            })
3142            .await;
3143        client_c
3144            .user_store
3145            .condition(&cx_c, |user_store, _| {
3146                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3147            })
3148            .await;
3149
3150        let project_id = project_a
3151            .update(&mut cx_a, |project, _| project.next_remote_id())
3152            .await;
3153        project_a
3154            .update(&mut cx_a, |project, cx| project.share(cx))
3155            .await
3156            .unwrap();
3157
3158        let _project_b = Project::remote(
3159            project_id,
3160            client_b.clone(),
3161            client_b.user_store.clone(),
3162            lang_registry.clone(),
3163            fs.clone(),
3164            &mut cx_b.to_async(),
3165        )
3166        .await
3167        .unwrap();
3168
3169        client_a
3170            .user_store
3171            .condition(&cx_a, |user_store, _| {
3172                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3173            })
3174            .await;
3175        client_b
3176            .user_store
3177            .condition(&cx_b, |user_store, _| {
3178                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3179            })
3180            .await;
3181        client_c
3182            .user_store
3183            .condition(&cx_c, |user_store, _| {
3184                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3185            })
3186            .await;
3187
3188        project_a
3189            .condition(&cx_a, |project, _| {
3190                project.collaborators().contains_key(&client_b.peer_id)
3191            })
3192            .await;
3193
3194        cx_a.update(move |_| drop(project_a));
3195        client_a
3196            .user_store
3197            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3198            .await;
3199        client_b
3200            .user_store
3201            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3202            .await;
3203        client_c
3204            .user_store
3205            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3206            .await;
3207
3208        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3209            user_store
3210                .contacts()
3211                .iter()
3212                .map(|contact| {
3213                    let worktrees = contact
3214                        .projects
3215                        .iter()
3216                        .map(|p| {
3217                            (
3218                                p.worktree_root_names[0].as_str(),
3219                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3220                            )
3221                        })
3222                        .collect();
3223                    (contact.user.github_login.as_str(), worktrees)
3224                })
3225                .collect()
3226        }
3227    }
3228
3229    struct TestServer {
3230        peer: Arc<Peer>,
3231        app_state: Arc<AppState>,
3232        server: Arc<Server>,
3233        foreground: Rc<executor::Foreground>,
3234        notifications: mpsc::Receiver<()>,
3235        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3236        forbid_connections: Arc<AtomicBool>,
3237        _test_db: TestDb,
3238    }
3239
3240    impl TestServer {
3241        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3242            let test_db = TestDb::new();
3243            let app_state = Self::build_app_state(&test_db).await;
3244            let peer = Peer::new();
3245            let notifications = mpsc::channel(128);
3246            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3247            Self {
3248                peer,
3249                app_state,
3250                server,
3251                foreground,
3252                notifications: notifications.1,
3253                connection_killers: Default::default(),
3254                forbid_connections: Default::default(),
3255                _test_db: test_db,
3256            }
3257        }
3258
3259        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3260            let http = FakeHttpClient::with_404_response();
3261            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3262            let client_name = name.to_string();
3263            let mut client = Client::new(http.clone());
3264            let server = self.server.clone();
3265            let connection_killers = self.connection_killers.clone();
3266            let forbid_connections = self.forbid_connections.clone();
3267            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3268
3269            Arc::get_mut(&mut client)
3270                .unwrap()
3271                .override_authenticate(move |cx| {
3272                    cx.spawn(|_| async move {
3273                        let access_token = "the-token".to_string();
3274                        Ok(Credentials {
3275                            user_id: user_id.0 as u64,
3276                            access_token,
3277                        })
3278                    })
3279                })
3280                .override_establish_connection(move |credentials, cx| {
3281                    assert_eq!(credentials.user_id, user_id.0 as u64);
3282                    assert_eq!(credentials.access_token, "the-token");
3283
3284                    let server = server.clone();
3285                    let connection_killers = connection_killers.clone();
3286                    let forbid_connections = forbid_connections.clone();
3287                    let client_name = client_name.clone();
3288                    let connection_id_tx = connection_id_tx.clone();
3289                    cx.spawn(move |cx| async move {
3290                        if forbid_connections.load(SeqCst) {
3291                            Err(EstablishConnectionError::other(anyhow!(
3292                                "server is forbidding connections"
3293                            )))
3294                        } else {
3295                            let (client_conn, server_conn, kill_conn) =
3296                                Connection::in_memory(cx.background());
3297                            connection_killers.lock().insert(user_id, kill_conn);
3298                            cx.background()
3299                                .spawn(server.handle_connection(
3300                                    server_conn,
3301                                    client_name,
3302                                    user_id,
3303                                    Some(connection_id_tx),
3304                                ))
3305                                .detach();
3306                            Ok(client_conn)
3307                        }
3308                    })
3309                });
3310
3311            client
3312                .authenticate_and_connect(&cx.to_async())
3313                .await
3314                .unwrap();
3315
3316            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3317            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3318            let mut authed_user =
3319                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3320            while authed_user.next().await.unwrap().is_none() {}
3321
3322            TestClient {
3323                client,
3324                peer_id,
3325                user_store,
3326            }
3327        }
3328
3329        fn disconnect_client(&self, user_id: UserId) {
3330            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3331                let _ = kill_conn.try_send(Some(()));
3332            }
3333        }
3334
3335        fn forbid_connections(&self) {
3336            self.forbid_connections.store(true, SeqCst);
3337        }
3338
3339        fn allow_connections(&self) {
3340            self.forbid_connections.store(false, SeqCst);
3341        }
3342
3343        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3344            let mut config = Config::default();
3345            config.session_secret = "a".repeat(32);
3346            config.database_url = test_db.url.clone();
3347            let github_client = github::AppClient::test();
3348            Arc::new(AppState {
3349                db: test_db.db().clone(),
3350                handlebars: Default::default(),
3351                auth_client: auth::build_client("", ""),
3352                repo_client: github::RepoClient::test(&github_client),
3353                github_client,
3354                config,
3355            })
3356        }
3357
3358        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3359            self.server.store.read()
3360        }
3361
3362        async fn condition<F>(&mut self, mut predicate: F)
3363        where
3364            F: FnMut(&Store) -> bool,
3365        {
3366            async_std::future::timeout(Duration::from_millis(500), async {
3367                while !(predicate)(&*self.server.store.read()) {
3368                    self.foreground.start_waiting();
3369                    self.notifications.next().await;
3370                    self.foreground.finish_waiting();
3371                }
3372            })
3373            .await
3374            .expect("condition timed out");
3375        }
3376    }
3377
3378    impl Drop for TestServer {
3379        fn drop(&mut self) {
3380            self.peer.reset();
3381        }
3382    }
3383
3384    struct TestClient {
3385        client: Arc<Client>,
3386        pub peer_id: PeerId,
3387        pub user_store: ModelHandle<UserStore>,
3388    }
3389
3390    impl Deref for TestClient {
3391        type Target = Arc<Client>;
3392
3393        fn deref(&self) -> &Self::Target {
3394            &self.client
3395        }
3396    }
3397
3398    impl TestClient {
3399        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3400            UserId::from_proto(
3401                self.user_store
3402                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3403            )
3404        }
3405    }
3406
3407    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3408        channel
3409            .messages()
3410            .cursor::<()>()
3411            .map(|m| {
3412                (
3413                    m.sender.github_login.clone(),
3414                    m.body.clone(),
3415                    m.is_pending(),
3416                )
3417            })
3418            .collect()
3419    }
3420
3421    struct EmptyView;
3422
3423    impl gpui::Entity for EmptyView {
3424        type Event = ();
3425    }
3426
3427    impl gpui::View for EmptyView {
3428        fn ui_name() -> &'static str {
3429            "empty view"
3430        }
3431
3432        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3433            gpui::Element::boxed(gpui::elements::Empty)
3434        }
3435    }
3436}