rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_channels)
  89            .add_handler(Server::get_users)
  90            .add_handler(Server::join_channel)
  91            .add_handler(Server::leave_channel)
  92            .add_handler(Server::send_channel_message)
  93            .add_handler(Server::get_channel_messages);
  94
  95        Arc::new(server)
  96    }
  97
  98    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  99    where
 100        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 101        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 102        M: EnvelopedMessage,
 103    {
 104        let prev_handler = self.handlers.insert(
 105            TypeId::of::<M>(),
 106            Box::new(move |server, envelope| {
 107                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 108                (handler)(server, *envelope).boxed()
 109            }),
 110        );
 111        if prev_handler.is_some() {
 112            panic!("registered a handler for the same message twice");
 113        }
 114        self
 115    }
 116
 117    pub fn handle_connection(
 118        self: &Arc<Self>,
 119        connection: Connection,
 120        addr: String,
 121        user_id: UserId,
 122        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 123    ) -> impl Future<Output = ()> {
 124        let mut this = self.clone();
 125        async move {
 126            let (connection_id, handle_io, mut incoming_rx) =
 127                this.peer.add_connection(connection).await;
 128
 129            if let Some(send_connection_id) = send_connection_id.as_mut() {
 130                let _ = send_connection_id.send(connection_id).await;
 131            }
 132
 133            this.state_mut().add_connection(connection_id, user_id);
 134            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 135                log::error!("error updating contacts for {:?}: {}", user_id, err);
 136            }
 137
 138            let handle_io = handle_io.fuse();
 139            futures::pin_mut!(handle_io);
 140            loop {
 141                let next_message = incoming_rx.next().fuse();
 142                futures::pin_mut!(next_message);
 143                futures::select_biased! {
 144                    result = handle_io => {
 145                        if let Err(err) = result {
 146                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 147                        }
 148                        break;
 149                    }
 150                    message = next_message => {
 151                        if let Some(message) = message {
 152                            let start_time = Instant::now();
 153                            let type_name = message.payload_type_name();
 154                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 155                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 156                                if let Err(err) = (handler)(this.clone(), message).await {
 157                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 158                                } else {
 159                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 160                                }
 161
 162                                if let Some(mut notifications) = this.notifications.clone() {
 163                                    let _ = notifications.send(()).await;
 164                                }
 165                            } else {
 166                                log::warn!("unhandled message: {}", type_name);
 167                            }
 168                        } else {
 169                            log::info!("rpc connection closed {:?}", addr);
 170                            break;
 171                        }
 172                    }
 173                }
 174            }
 175
 176            if let Err(err) = this.sign_out(connection_id).await {
 177                log::error!("error signing out connection {:?} - {:?}", addr, err);
 178            }
 179        }
 180    }
 181
 182    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 183        self.peer.disconnect(connection_id);
 184        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 185
 186        for (project_id, project) in removed_connection.hosted_projects {
 187            if let Some(share) = project.share {
 188                broadcast(
 189                    connection_id,
 190                    share.guests.keys().copied().collect(),
 191                    |conn_id| {
 192                        self.peer
 193                            .send(conn_id, proto::UnshareProject { project_id })
 194                    },
 195                )?;
 196            }
 197        }
 198
 199        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 200            broadcast(connection_id, peer_ids, |conn_id| {
 201                self.peer.send(
 202                    conn_id,
 203                    proto::RemoveProjectCollaborator {
 204                        project_id,
 205                        peer_id: connection_id.0,
 206                    },
 207                )
 208            })?;
 209        }
 210
 211        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 212        Ok(())
 213    }
 214
 215    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 216        self.peer.respond(request.receipt(), proto::Ack {})?;
 217        Ok(())
 218    }
 219
 220    async fn register_project(
 221        mut self: Arc<Server>,
 222        request: TypedEnvelope<proto::RegisterProject>,
 223    ) -> tide::Result<()> {
 224        let project_id = {
 225            let mut state = self.state_mut();
 226            let user_id = state.user_id_for_connection(request.sender_id)?;
 227            state.register_project(request.sender_id, user_id)
 228        };
 229        self.peer.respond(
 230            request.receipt(),
 231            proto::RegisterProjectResponse { project_id },
 232        )?;
 233        Ok(())
 234    }
 235
 236    async fn unregister_project(
 237        mut self: Arc<Server>,
 238        request: TypedEnvelope<proto::UnregisterProject>,
 239    ) -> tide::Result<()> {
 240        let project = self
 241            .state_mut()
 242            .unregister_project(request.payload.project_id, request.sender_id)
 243            .ok_or_else(|| anyhow!("no such project"))?;
 244        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 245        Ok(())
 246    }
 247
 248    async fn share_project(
 249        mut self: Arc<Server>,
 250        request: TypedEnvelope<proto::ShareProject>,
 251    ) -> tide::Result<()> {
 252        self.state_mut()
 253            .share_project(request.payload.project_id, request.sender_id);
 254        self.peer.respond(request.receipt(), proto::Ack {})?;
 255        Ok(())
 256    }
 257
 258    async fn unshare_project(
 259        mut self: Arc<Server>,
 260        request: TypedEnvelope<proto::UnshareProject>,
 261    ) -> tide::Result<()> {
 262        let project_id = request.payload.project_id;
 263        let project = self
 264            .state_mut()
 265            .unshare_project(project_id, request.sender_id)?;
 266
 267        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 268            self.peer
 269                .send(conn_id, proto::UnshareProject { project_id })
 270        })?;
 271        self.update_contacts_for_users(&project.authorized_user_ids)?;
 272        Ok(())
 273    }
 274
 275    async fn join_project(
 276        mut self: Arc<Server>,
 277        request: TypedEnvelope<proto::JoinProject>,
 278    ) -> tide::Result<()> {
 279        let project_id = request.payload.project_id;
 280
 281        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 282        let response_data = self
 283            .state_mut()
 284            .join_project(request.sender_id, user_id, project_id)
 285            .and_then(|joined| {
 286                let share = joined.project.share()?;
 287                let peer_count = share.guests.len();
 288                let mut collaborators = Vec::with_capacity(peer_count);
 289                collaborators.push(proto::Collaborator {
 290                    peer_id: joined.project.host_connection_id.0,
 291                    replica_id: 0,
 292                    user_id: joined.project.host_user_id.to_proto(),
 293                });
 294                let worktrees = joined
 295                    .project
 296                    .worktrees
 297                    .iter()
 298                    .filter_map(|(id, worktree)| {
 299                        worktree.share.as_ref().map(|share| proto::Worktree {
 300                            id: *id,
 301                            root_name: worktree.root_name.clone(),
 302                            entries: share.entries.values().cloned().collect(),
 303                            diagnostic_summaries: share
 304                                .diagnostic_summaries
 305                                .values()
 306                                .cloned()
 307                                .collect(),
 308                            weak: worktree.weak,
 309                        })
 310                    })
 311                    .collect();
 312                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 313                    if *peer_conn_id != request.sender_id {
 314                        collaborators.push(proto::Collaborator {
 315                            peer_id: peer_conn_id.0,
 316                            replica_id: *peer_replica_id as u32,
 317                            user_id: peer_user_id.to_proto(),
 318                        });
 319                    }
 320                }
 321                let response = proto::JoinProjectResponse {
 322                    worktrees,
 323                    replica_id: joined.replica_id as u32,
 324                    collaborators,
 325                };
 326                let connection_ids = joined.project.connection_ids();
 327                let contact_user_ids = joined.project.authorized_user_ids();
 328                Ok((response, connection_ids, contact_user_ids))
 329            });
 330
 331        match response_data {
 332            Ok((response, connection_ids, contact_user_ids)) => {
 333                broadcast(request.sender_id, connection_ids, |conn_id| {
 334                    self.peer.send(
 335                        conn_id,
 336                        proto::AddProjectCollaborator {
 337                            project_id,
 338                            collaborator: Some(proto::Collaborator {
 339                                peer_id: request.sender_id.0,
 340                                replica_id: response.replica_id,
 341                                user_id: user_id.to_proto(),
 342                            }),
 343                        },
 344                    )
 345                })?;
 346                self.peer.respond(request.receipt(), response)?;
 347                self.update_contacts_for_users(&contact_user_ids)?;
 348            }
 349            Err(error) => {
 350                self.peer.respond_with_error(
 351                    request.receipt(),
 352                    proto::Error {
 353                        message: error.to_string(),
 354                    },
 355                )?;
 356            }
 357        }
 358
 359        Ok(())
 360    }
 361
 362    async fn leave_project(
 363        mut self: Arc<Server>,
 364        request: TypedEnvelope<proto::LeaveProject>,
 365    ) -> tide::Result<()> {
 366        let sender_id = request.sender_id;
 367        let project_id = request.payload.project_id;
 368        let worktree = self.state_mut().leave_project(sender_id, project_id);
 369        if let Some(worktree) = worktree {
 370            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 371                self.peer.send(
 372                    conn_id,
 373                    proto::RemoveProjectCollaborator {
 374                        project_id,
 375                        peer_id: sender_id.0,
 376                    },
 377                )
 378            })?;
 379            self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 380        }
 381        Ok(())
 382    }
 383
 384    async fn register_worktree(
 385        mut self: Arc<Server>,
 386        request: TypedEnvelope<proto::RegisterWorktree>,
 387    ) -> tide::Result<()> {
 388        let receipt = request.receipt();
 389        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 390
 391        let mut contact_user_ids = HashSet::default();
 392        contact_user_ids.insert(host_user_id);
 393        for github_login in request.payload.authorized_logins {
 394            match self.app_state.db.create_user(&github_login, false).await {
 395                Ok(contact_user_id) => {
 396                    contact_user_ids.insert(contact_user_id);
 397                }
 398                Err(err) => {
 399                    let message = err.to_string();
 400                    self.peer
 401                        .respond_with_error(receipt, proto::Error { message })?;
 402                    return Ok(());
 403                }
 404            }
 405        }
 406
 407        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 408        let ok = self.state_mut().register_worktree(
 409            request.payload.project_id,
 410            request.payload.worktree_id,
 411            Worktree {
 412                authorized_user_ids: contact_user_ids.clone(),
 413                root_name: request.payload.root_name,
 414                share: None,
 415                weak: false,
 416            },
 417        );
 418
 419        if ok {
 420            self.peer.respond(receipt, proto::Ack {})?;
 421            self.update_contacts_for_users(&contact_user_ids)?;
 422        } else {
 423            self.peer.respond_with_error(
 424                receipt,
 425                proto::Error {
 426                    message: NO_SUCH_PROJECT.to_string(),
 427                },
 428            )?;
 429        }
 430
 431        Ok(())
 432    }
 433
 434    async fn unregister_worktree(
 435        mut self: Arc<Server>,
 436        request: TypedEnvelope<proto::UnregisterWorktree>,
 437    ) -> tide::Result<()> {
 438        let project_id = request.payload.project_id;
 439        let worktree_id = request.payload.worktree_id;
 440        let (worktree, guest_connection_ids) =
 441            self.state_mut()
 442                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 443        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 444            self.peer.send(
 445                conn_id,
 446                proto::UnregisterWorktree {
 447                    project_id,
 448                    worktree_id,
 449                },
 450            )
 451        })?;
 452        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 453        Ok(())
 454    }
 455
 456    async fn share_worktree(
 457        mut self: Arc<Server>,
 458        mut request: TypedEnvelope<proto::ShareWorktree>,
 459    ) -> tide::Result<()> {
 460        let worktree = request
 461            .payload
 462            .worktree
 463            .as_mut()
 464            .ok_or_else(|| anyhow!("missing worktree"))?;
 465        let entries = worktree
 466            .entries
 467            .iter()
 468            .map(|entry| (entry.id, entry.clone()))
 469            .collect();
 470        let diagnostic_summaries = worktree
 471            .diagnostic_summaries
 472            .iter()
 473            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 474            .collect();
 475
 476        let shared_worktree = self.state_mut().share_worktree(
 477            request.payload.project_id,
 478            worktree.id,
 479            request.sender_id,
 480            entries,
 481            diagnostic_summaries,
 482        );
 483        if let Some(shared_worktree) = shared_worktree {
 484            broadcast(
 485                request.sender_id,
 486                shared_worktree.connection_ids,
 487                |connection_id| {
 488                    self.peer.forward_send(
 489                        request.sender_id,
 490                        connection_id,
 491                        request.payload.clone(),
 492                    )
 493                },
 494            )?;
 495            self.peer.respond(request.receipt(), proto::Ack {})?;
 496            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 497        } else {
 498            self.peer.respond_with_error(
 499                request.receipt(),
 500                proto::Error {
 501                    message: "no such worktree".to_string(),
 502                },
 503            )?;
 504        }
 505        Ok(())
 506    }
 507
 508    async fn update_worktree(
 509        mut self: Arc<Server>,
 510        request: TypedEnvelope<proto::UpdateWorktree>,
 511    ) -> tide::Result<()> {
 512        let connection_ids = self
 513            .state_mut()
 514            .update_worktree(
 515                request.sender_id,
 516                request.payload.project_id,
 517                request.payload.worktree_id,
 518                &request.payload.removed_entries,
 519                &request.payload.updated_entries,
 520            )
 521            .ok_or_else(|| anyhow!("no such worktree"))?;
 522
 523        broadcast(request.sender_id, connection_ids, |connection_id| {
 524            self.peer
 525                .forward_send(request.sender_id, connection_id, request.payload.clone())
 526        })?;
 527
 528        Ok(())
 529    }
 530
 531    async fn update_diagnostic_summary(
 532        mut self: Arc<Server>,
 533        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 534    ) -> tide::Result<()> {
 535        let receiver_ids = request
 536            .payload
 537            .summary
 538            .clone()
 539            .and_then(|summary| {
 540                self.state_mut().update_diagnostic_summary(
 541                    request.payload.project_id,
 542                    request.payload.worktree_id,
 543                    request.sender_id,
 544                    summary,
 545                )
 546            })
 547            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 548
 549        broadcast(request.sender_id, receiver_ids, |connection_id| {
 550            self.peer
 551                .forward_send(request.sender_id, connection_id, request.payload.clone())
 552        })?;
 553        Ok(())
 554    }
 555
 556    async fn disk_based_diagnostics_updating(
 557        self: Arc<Server>,
 558        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 559    ) -> tide::Result<()> {
 560        let receiver_ids = self
 561            .state()
 562            .project_connection_ids(request.payload.project_id, request.sender_id)
 563            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 564        broadcast(request.sender_id, receiver_ids, |connection_id| {
 565            self.peer
 566                .forward_send(request.sender_id, connection_id, request.payload.clone())
 567        })?;
 568        Ok(())
 569    }
 570
 571    async fn disk_based_diagnostics_updated(
 572        self: Arc<Server>,
 573        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 574    ) -> tide::Result<()> {
 575        let receiver_ids = self
 576            .state()
 577            .project_connection_ids(request.payload.project_id, request.sender_id)
 578            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 579        broadcast(request.sender_id, receiver_ids, |connection_id| {
 580            self.peer
 581                .forward_send(request.sender_id, connection_id, request.payload.clone())
 582        })?;
 583        Ok(())
 584    }
 585
 586    async fn get_definition(
 587        self: Arc<Server>,
 588        request: TypedEnvelope<proto::GetDefinition>,
 589    ) -> tide::Result<()> {
 590        let receipt = request.receipt();
 591        let host_connection_id = self
 592            .state()
 593            .read_project(request.payload.project_id, request.sender_id)
 594            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 595            .host_connection_id;
 596        let response = self
 597            .peer
 598            .forward_request(request.sender_id, host_connection_id, request.payload)
 599            .await?;
 600        self.peer.respond(receipt, response)?;
 601        Ok(())
 602    }
 603
 604    async fn open_buffer(
 605        self: Arc<Server>,
 606        request: TypedEnvelope<proto::OpenBuffer>,
 607    ) -> tide::Result<()> {
 608        let receipt = request.receipt();
 609        let host_connection_id = self
 610            .state()
 611            .read_project(request.payload.project_id, request.sender_id)
 612            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 613            .host_connection_id;
 614        let response = self
 615            .peer
 616            .forward_request(request.sender_id, host_connection_id, request.payload)
 617            .await?;
 618        self.peer.respond(receipt, response)?;
 619        Ok(())
 620    }
 621
 622    async fn close_buffer(
 623        self: Arc<Server>,
 624        request: TypedEnvelope<proto::CloseBuffer>,
 625    ) -> tide::Result<()> {
 626        let host_connection_id = self
 627            .state()
 628            .read_project(request.payload.project_id, request.sender_id)
 629            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 630            .host_connection_id;
 631        self.peer
 632            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 633        Ok(())
 634    }
 635
 636    async fn save_buffer(
 637        self: Arc<Server>,
 638        request: TypedEnvelope<proto::SaveBuffer>,
 639    ) -> tide::Result<()> {
 640        let host;
 641        let guests;
 642        {
 643            let state = self.state();
 644            let project = state
 645                .read_project(request.payload.project_id, request.sender_id)
 646                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 647            host = project.host_connection_id;
 648            guests = project.guest_connection_ids()
 649        }
 650
 651        let sender = request.sender_id;
 652        let receipt = request.receipt();
 653        let response = self
 654            .peer
 655            .forward_request(sender, host, request.payload.clone())
 656            .await?;
 657
 658        broadcast(host, guests, |conn_id| {
 659            let response = response.clone();
 660            if conn_id == sender {
 661                self.peer.respond(receipt, response)
 662            } else {
 663                self.peer.forward_send(host, conn_id, response)
 664            }
 665        })?;
 666
 667        Ok(())
 668    }
 669
 670    async fn format_buffer(
 671        self: Arc<Server>,
 672        request: TypedEnvelope<proto::FormatBuffer>,
 673    ) -> tide::Result<()> {
 674        let host;
 675        {
 676            let state = self.state();
 677            let project = state
 678                .read_project(request.payload.project_id, request.sender_id)
 679                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 680            host = project.host_connection_id;
 681        }
 682
 683        let sender = request.sender_id;
 684        let receipt = request.receipt();
 685        let response = self
 686            .peer
 687            .forward_request(sender, host, request.payload.clone())
 688            .await?;
 689        self.peer.respond(receipt, response)?;
 690
 691        Ok(())
 692    }
 693
 694    async fn get_completions(
 695        self: Arc<Server>,
 696        request: TypedEnvelope<proto::GetCompletions>,
 697    ) -> tide::Result<()> {
 698        let host;
 699        {
 700            let state = self.state();
 701            let project = state
 702                .read_project(request.payload.project_id, request.sender_id)
 703                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 704            host = project.host_connection_id;
 705        }
 706
 707        let sender = request.sender_id;
 708        let receipt = request.receipt();
 709        let response = self
 710            .peer
 711            .forward_request(sender, host, request.payload.clone())
 712            .await?;
 713        self.peer.respond(receipt, response)?;
 714        Ok(())
 715    }
 716
 717    async fn apply_additional_edits_for_completion(
 718        self: Arc<Server>,
 719        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 720    ) -> tide::Result<()> {
 721        let host;
 722        {
 723            let state = self.state();
 724            let project = state
 725                .read_project(request.payload.project_id, request.sender_id)
 726                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 727            host = project.host_connection_id;
 728        }
 729
 730        let sender = request.sender_id;
 731        let receipt = request.receipt();
 732        let response = self
 733            .peer
 734            .forward_request(sender, host, request.payload.clone())
 735            .await?;
 736        self.peer.respond(receipt, response)?;
 737        Ok(())
 738    }
 739
 740    async fn update_buffer(
 741        self: Arc<Server>,
 742        request: TypedEnvelope<proto::UpdateBuffer>,
 743    ) -> tide::Result<()> {
 744        let receiver_ids = self
 745            .state()
 746            .project_connection_ids(request.payload.project_id, request.sender_id)
 747            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 748        broadcast(request.sender_id, receiver_ids, |connection_id| {
 749            self.peer
 750                .forward_send(request.sender_id, connection_id, request.payload.clone())
 751        })?;
 752        self.peer.respond(request.receipt(), proto::Ack {})?;
 753        Ok(())
 754    }
 755
 756    async fn update_buffer_file(
 757        self: Arc<Server>,
 758        request: TypedEnvelope<proto::UpdateBufferFile>,
 759    ) -> tide::Result<()> {
 760        let receiver_ids = self
 761            .state()
 762            .project_connection_ids(request.payload.project_id, request.sender_id)
 763            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 764        broadcast(request.sender_id, receiver_ids, |connection_id| {
 765            self.peer
 766                .forward_send(request.sender_id, connection_id, request.payload.clone())
 767        })?;
 768        Ok(())
 769    }
 770
 771    async fn buffer_reloaded(
 772        self: Arc<Server>,
 773        request: TypedEnvelope<proto::BufferReloaded>,
 774    ) -> tide::Result<()> {
 775        let receiver_ids = self
 776            .state()
 777            .project_connection_ids(request.payload.project_id, request.sender_id)
 778            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 779        broadcast(request.sender_id, receiver_ids, |connection_id| {
 780            self.peer
 781                .forward_send(request.sender_id, connection_id, request.payload.clone())
 782        })?;
 783        Ok(())
 784    }
 785
 786    async fn buffer_saved(
 787        self: Arc<Server>,
 788        request: TypedEnvelope<proto::BufferSaved>,
 789    ) -> tide::Result<()> {
 790        let receiver_ids = self
 791            .state()
 792            .project_connection_ids(request.payload.project_id, request.sender_id)
 793            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 794        broadcast(request.sender_id, receiver_ids, |connection_id| {
 795            self.peer
 796                .forward_send(request.sender_id, connection_id, request.payload.clone())
 797        })?;
 798        Ok(())
 799    }
 800
 801    async fn get_channels(
 802        self: Arc<Server>,
 803        request: TypedEnvelope<proto::GetChannels>,
 804    ) -> tide::Result<()> {
 805        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 806        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 807        self.peer.respond(
 808            request.receipt(),
 809            proto::GetChannelsResponse {
 810                channels: channels
 811                    .into_iter()
 812                    .map(|chan| proto::Channel {
 813                        id: chan.id.to_proto(),
 814                        name: chan.name,
 815                    })
 816                    .collect(),
 817            },
 818        )?;
 819        Ok(())
 820    }
 821
 822    async fn get_users(
 823        self: Arc<Server>,
 824        request: TypedEnvelope<proto::GetUsers>,
 825    ) -> tide::Result<()> {
 826        let receipt = request.receipt();
 827        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 828        let users = self
 829            .app_state
 830            .db
 831            .get_users_by_ids(user_ids)
 832            .await?
 833            .into_iter()
 834            .map(|user| proto::User {
 835                id: user.id.to_proto(),
 836                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 837                github_login: user.github_login,
 838            })
 839            .collect();
 840        self.peer
 841            .respond(receipt, proto::GetUsersResponse { users })?;
 842        Ok(())
 843    }
 844
 845    fn update_contacts_for_users<'a>(
 846        self: &Arc<Server>,
 847        user_ids: impl IntoIterator<Item = &'a UserId>,
 848    ) -> anyhow::Result<()> {
 849        let mut result = Ok(());
 850        let state = self.state();
 851        for user_id in user_ids {
 852            let contacts = state.contacts_for_user(*user_id);
 853            for connection_id in state.connection_ids_for_user(*user_id) {
 854                if let Err(error) = self.peer.send(
 855                    connection_id,
 856                    proto::UpdateContacts {
 857                        contacts: contacts.clone(),
 858                    },
 859                ) {
 860                    result = Err(error);
 861                }
 862            }
 863        }
 864        result
 865    }
 866
 867    async fn join_channel(
 868        mut self: Arc<Self>,
 869        request: TypedEnvelope<proto::JoinChannel>,
 870    ) -> tide::Result<()> {
 871        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 872        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 873        if !self
 874            .app_state
 875            .db
 876            .can_user_access_channel(user_id, channel_id)
 877            .await?
 878        {
 879            Err(anyhow!("access denied"))?;
 880        }
 881
 882        self.state_mut().join_channel(request.sender_id, channel_id);
 883        let messages = self
 884            .app_state
 885            .db
 886            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 887            .await?
 888            .into_iter()
 889            .map(|msg| proto::ChannelMessage {
 890                id: msg.id.to_proto(),
 891                body: msg.body,
 892                timestamp: msg.sent_at.unix_timestamp() as u64,
 893                sender_id: msg.sender_id.to_proto(),
 894                nonce: Some(msg.nonce.as_u128().into()),
 895            })
 896            .collect::<Vec<_>>();
 897        self.peer.respond(
 898            request.receipt(),
 899            proto::JoinChannelResponse {
 900                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 901                messages,
 902            },
 903        )?;
 904        Ok(())
 905    }
 906
 907    async fn leave_channel(
 908        mut self: Arc<Self>,
 909        request: TypedEnvelope<proto::LeaveChannel>,
 910    ) -> tide::Result<()> {
 911        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 912        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 913        if !self
 914            .app_state
 915            .db
 916            .can_user_access_channel(user_id, channel_id)
 917            .await?
 918        {
 919            Err(anyhow!("access denied"))?;
 920        }
 921
 922        self.state_mut()
 923            .leave_channel(request.sender_id, channel_id);
 924
 925        Ok(())
 926    }
 927
 928    async fn send_channel_message(
 929        self: Arc<Self>,
 930        request: TypedEnvelope<proto::SendChannelMessage>,
 931    ) -> tide::Result<()> {
 932        let receipt = request.receipt();
 933        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 934        let user_id;
 935        let connection_ids;
 936        {
 937            let state = self.state();
 938            user_id = state.user_id_for_connection(request.sender_id)?;
 939            if let Some(ids) = state.channel_connection_ids(channel_id) {
 940                connection_ids = ids;
 941            } else {
 942                return Ok(());
 943            }
 944        }
 945
 946        // Validate the message body.
 947        let body = request.payload.body.trim().to_string();
 948        if body.len() > MAX_MESSAGE_LEN {
 949            self.peer.respond_with_error(
 950                receipt,
 951                proto::Error {
 952                    message: "message is too long".to_string(),
 953                },
 954            )?;
 955            return Ok(());
 956        }
 957        if body.is_empty() {
 958            self.peer.respond_with_error(
 959                receipt,
 960                proto::Error {
 961                    message: "message can't be blank".to_string(),
 962                },
 963            )?;
 964            return Ok(());
 965        }
 966
 967        let timestamp = OffsetDateTime::now_utc();
 968        let nonce = if let Some(nonce) = request.payload.nonce {
 969            nonce
 970        } else {
 971            self.peer.respond_with_error(
 972                receipt,
 973                proto::Error {
 974                    message: "nonce can't be blank".to_string(),
 975                },
 976            )?;
 977            return Ok(());
 978        };
 979
 980        let message_id = self
 981            .app_state
 982            .db
 983            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 984            .await?
 985            .to_proto();
 986        let message = proto::ChannelMessage {
 987            sender_id: user_id.to_proto(),
 988            id: message_id,
 989            body,
 990            timestamp: timestamp.unix_timestamp() as u64,
 991            nonce: Some(nonce),
 992        };
 993        broadcast(request.sender_id, connection_ids, |conn_id| {
 994            self.peer.send(
 995                conn_id,
 996                proto::ChannelMessageSent {
 997                    channel_id: channel_id.to_proto(),
 998                    message: Some(message.clone()),
 999                },
1000            )
1001        })?;
1002        self.peer.respond(
1003            receipt,
1004            proto::SendChannelMessageResponse {
1005                message: Some(message),
1006            },
1007        )?;
1008        Ok(())
1009    }
1010
1011    async fn get_channel_messages(
1012        self: Arc<Self>,
1013        request: TypedEnvelope<proto::GetChannelMessages>,
1014    ) -> tide::Result<()> {
1015        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1016        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1017        if !self
1018            .app_state
1019            .db
1020            .can_user_access_channel(user_id, channel_id)
1021            .await?
1022        {
1023            Err(anyhow!("access denied"))?;
1024        }
1025
1026        let messages = self
1027            .app_state
1028            .db
1029            .get_channel_messages(
1030                channel_id,
1031                MESSAGE_COUNT_PER_PAGE,
1032                Some(MessageId::from_proto(request.payload.before_message_id)),
1033            )
1034            .await?
1035            .into_iter()
1036            .map(|msg| proto::ChannelMessage {
1037                id: msg.id.to_proto(),
1038                body: msg.body,
1039                timestamp: msg.sent_at.unix_timestamp() as u64,
1040                sender_id: msg.sender_id.to_proto(),
1041                nonce: Some(msg.nonce.as_u128().into()),
1042            })
1043            .collect::<Vec<_>>();
1044        self.peer.respond(
1045            request.receipt(),
1046            proto::GetChannelMessagesResponse {
1047                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1048                messages,
1049            },
1050        )?;
1051        Ok(())
1052    }
1053
1054    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1055        self.store.read()
1056    }
1057
1058    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1059        self.store.write()
1060    }
1061}
1062
1063fn broadcast<F>(
1064    sender_id: ConnectionId,
1065    receiver_ids: Vec<ConnectionId>,
1066    mut f: F,
1067) -> anyhow::Result<()>
1068where
1069    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1070{
1071    let mut result = Ok(());
1072    for receiver_id in receiver_ids {
1073        if receiver_id != sender_id {
1074            if let Err(error) = f(receiver_id) {
1075                if result.is_ok() {
1076                    result = Err(error);
1077                }
1078            }
1079        }
1080    }
1081    result
1082}
1083
1084pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1085    let server = Server::new(app.state().clone(), rpc.clone(), None);
1086    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1087        let server = server.clone();
1088        async move {
1089            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1090
1091            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1092            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1093            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1094            let client_protocol_version: Option<u32> = request
1095                .header("X-Zed-Protocol-Version")
1096                .and_then(|v| v.as_str().parse().ok());
1097
1098            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1099                return Ok(Response::new(StatusCode::UpgradeRequired));
1100            }
1101
1102            let header = match request.header("Sec-Websocket-Key") {
1103                Some(h) => h.as_str(),
1104                None => return Err(anyhow!("expected sec-websocket-key"))?,
1105            };
1106
1107            let user_id = process_auth_header(&request).await?;
1108
1109            let mut response = Response::new(StatusCode::SwitchingProtocols);
1110            response.insert_header(UPGRADE, "websocket");
1111            response.insert_header(CONNECTION, "Upgrade");
1112            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1113            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1114            response.insert_header("Sec-Websocket-Version", "13");
1115
1116            let http_res: &mut tide::http::Response = response.as_mut();
1117            let upgrade_receiver = http_res.recv_upgrade().await;
1118            let addr = request.remote().unwrap_or("unknown").to_string();
1119            task::spawn(async move {
1120                if let Some(stream) = upgrade_receiver.await {
1121                    server
1122                        .handle_connection(
1123                            Connection::new(
1124                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1125                            ),
1126                            addr,
1127                            user_id,
1128                            None,
1129                        )
1130                        .await;
1131                }
1132            });
1133
1134            Ok(response)
1135        }
1136    });
1137}
1138
1139fn header_contains_ignore_case<T>(
1140    request: &tide::Request<T>,
1141    header_name: HeaderName,
1142    value: &str,
1143) -> bool {
1144    request
1145        .header(header_name)
1146        .map(|h| {
1147            h.as_str()
1148                .split(',')
1149                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1150        })
1151        .unwrap_or(false)
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156    use super::*;
1157    use crate::{
1158        auth,
1159        db::{tests::TestDb, UserId},
1160        github, AppState, Config,
1161    };
1162    use ::rpc::Peer;
1163    use async_std::task;
1164    use gpui::{executor, ModelHandle, TestAppContext};
1165    use parking_lot::Mutex;
1166    use postage::{mpsc, watch};
1167    use rand::prelude::*;
1168    use rpc::PeerId;
1169    use serde_json::json;
1170    use sqlx::types::time::OffsetDateTime;
1171    use std::{
1172        ops::Deref,
1173        path::Path,
1174        rc::Rc,
1175        sync::{
1176            atomic::{AtomicBool, Ordering::SeqCst},
1177            Arc,
1178        },
1179        time::Duration,
1180    };
1181    use zed::{
1182        client::{
1183            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1184            EstablishConnectionError, UserStore,
1185        },
1186        editor::{Editor, EditorSettings, Input, MultiBuffer},
1187        fs::{FakeFs, Fs as _},
1188        language::{
1189            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1190            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1191        },
1192        lsp,
1193        project::{DiagnosticSummary, Project, ProjectPath},
1194    };
1195
1196    #[cfg(test)]
1197    #[ctor::ctor]
1198    fn init_logger() {
1199        // std::env::set_var("RUST_LOG", "info");
1200        env_logger::init();
1201    }
1202
1203    #[gpui::test]
1204    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1205        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1206        let lang_registry = Arc::new(LanguageRegistry::new());
1207        let fs = Arc::new(FakeFs::new(cx_a.background()));
1208        cx_a.foreground().forbid_parking();
1209
1210        // Connect to a server as 2 clients.
1211        let mut server = TestServer::start(cx_a.foreground()).await;
1212        let client_a = server.create_client(&mut cx_a, "user_a").await;
1213        let client_b = server.create_client(&mut cx_b, "user_b").await;
1214
1215        // Share a project as client A
1216        fs.insert_tree(
1217            "/a",
1218            json!({
1219                ".zed.toml": r#"collaborators = ["user_b"]"#,
1220                "a.txt": "a-contents",
1221                "b.txt": "b-contents",
1222            }),
1223        )
1224        .await;
1225        let project_a = cx_a.update(|cx| {
1226            Project::local(
1227                client_a.clone(),
1228                client_a.user_store.clone(),
1229                lang_registry.clone(),
1230                fs.clone(),
1231                cx,
1232            )
1233        });
1234        let (worktree_a, _) = project_a
1235            .update(&mut cx_a, |p, cx| {
1236                p.find_or_create_local_worktree("/a", false, cx)
1237            })
1238            .await
1239            .unwrap();
1240        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1241        worktree_a
1242            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1243            .await;
1244        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1245        project_a
1246            .update(&mut cx_a, |p, cx| p.share(cx))
1247            .await
1248            .unwrap();
1249
1250        // Join that project as client B
1251        let project_b = Project::remote(
1252            project_id,
1253            client_b.clone(),
1254            client_b.user_store.clone(),
1255            lang_registry.clone(),
1256            fs.clone(),
1257            &mut cx_b.to_async(),
1258        )
1259        .await
1260        .unwrap();
1261
1262        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1263            assert_eq!(
1264                project
1265                    .collaborators()
1266                    .get(&client_a.peer_id)
1267                    .unwrap()
1268                    .user
1269                    .github_login,
1270                "user_a"
1271            );
1272            project.replica_id()
1273        });
1274        project_a
1275            .condition(&cx_a, |tree, _| {
1276                tree.collaborators()
1277                    .get(&client_b.peer_id)
1278                    .map_or(false, |collaborator| {
1279                        collaborator.replica_id == replica_id_b
1280                            && collaborator.user.github_login == "user_b"
1281                    })
1282            })
1283            .await;
1284
1285        // Open the same file as client B and client A.
1286        let buffer_b = project_b
1287            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1288            .await
1289            .unwrap();
1290        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1291        buffer_b.read_with(&cx_b, |buf, cx| {
1292            assert_eq!(buf.read(cx).text(), "b-contents")
1293        });
1294        project_a.read_with(&cx_a, |project, cx| {
1295            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1296        });
1297        let buffer_a = project_a
1298            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1299            .await
1300            .unwrap();
1301
1302        let editor_b = cx_b.add_view(window_b, |cx| {
1303            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1304        });
1305
1306        // TODO
1307        // // Create a selection set as client B and see that selection set as client A.
1308        // buffer_a
1309        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1310        //     .await;
1311
1312        // Edit the buffer as client B and see that edit as client A.
1313        editor_b.update(&mut cx_b, |editor, cx| {
1314            editor.handle_input(&Input("ok, ".into()), cx)
1315        });
1316        buffer_a
1317            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1318            .await;
1319
1320        // TODO
1321        // // Remove the selection set as client B, see those selections disappear as client A.
1322        cx_b.update(move |_| drop(editor_b));
1323        // buffer_a
1324        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1325        //     .await;
1326
1327        // Close the buffer as client A, see that the buffer is closed.
1328        cx_a.update(move |_| drop(buffer_a));
1329        project_a
1330            .condition(&cx_a, |project, cx| {
1331                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1332            })
1333            .await;
1334
1335        // Dropping the client B's project removes client B from client A's collaborators.
1336        cx_b.update(move |_| drop(project_b));
1337        project_a
1338            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1339            .await;
1340    }
1341
1342    #[gpui::test]
1343    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1344        let lang_registry = Arc::new(LanguageRegistry::new());
1345        let fs = Arc::new(FakeFs::new(cx_a.background()));
1346        cx_a.foreground().forbid_parking();
1347
1348        // Connect to a server as 2 clients.
1349        let mut server = TestServer::start(cx_a.foreground()).await;
1350        let client_a = server.create_client(&mut cx_a, "user_a").await;
1351        let client_b = server.create_client(&mut cx_b, "user_b").await;
1352
1353        // Share a project as client A
1354        fs.insert_tree(
1355            "/a",
1356            json!({
1357                ".zed.toml": r#"collaborators = ["user_b"]"#,
1358                "a.txt": "a-contents",
1359                "b.txt": "b-contents",
1360            }),
1361        )
1362        .await;
1363        let project_a = cx_a.update(|cx| {
1364            Project::local(
1365                client_a.clone(),
1366                client_a.user_store.clone(),
1367                lang_registry.clone(),
1368                fs.clone(),
1369                cx,
1370            )
1371        });
1372        let (worktree_a, _) = project_a
1373            .update(&mut cx_a, |p, cx| {
1374                p.find_or_create_local_worktree("/a", false, cx)
1375            })
1376            .await
1377            .unwrap();
1378        worktree_a
1379            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1380            .await;
1381        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1382        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1383        project_a
1384            .update(&mut cx_a, |p, cx| p.share(cx))
1385            .await
1386            .unwrap();
1387        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1388
1389        // Join that project as client B
1390        let project_b = Project::remote(
1391            project_id,
1392            client_b.clone(),
1393            client_b.user_store.clone(),
1394            lang_registry.clone(),
1395            fs.clone(),
1396            &mut cx_b.to_async(),
1397        )
1398        .await
1399        .unwrap();
1400        project_b
1401            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1402            .await
1403            .unwrap();
1404
1405        // Unshare the project as client A
1406        project_a
1407            .update(&mut cx_a, |project, cx| project.unshare(cx))
1408            .await
1409            .unwrap();
1410        project_b
1411            .condition(&mut cx_b, |project, _| project.is_read_only())
1412            .await;
1413        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1414        drop(project_b);
1415
1416        // Share the project again and ensure guests can still join.
1417        project_a
1418            .update(&mut cx_a, |project, cx| project.share(cx))
1419            .await
1420            .unwrap();
1421        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1422
1423        let project_c = Project::remote(
1424            project_id,
1425            client_b.clone(),
1426            client_b.user_store.clone(),
1427            lang_registry.clone(),
1428            fs.clone(),
1429            &mut cx_b.to_async(),
1430        )
1431        .await
1432        .unwrap();
1433        project_c
1434            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1435            .await
1436            .unwrap();
1437    }
1438
1439    #[gpui::test]
1440    async fn test_propagate_saves_and_fs_changes(
1441        mut cx_a: TestAppContext,
1442        mut cx_b: TestAppContext,
1443        mut cx_c: TestAppContext,
1444    ) {
1445        let lang_registry = Arc::new(LanguageRegistry::new());
1446        let fs = Arc::new(FakeFs::new(cx_a.background()));
1447        cx_a.foreground().forbid_parking();
1448
1449        // Connect to a server as 3 clients.
1450        let mut server = TestServer::start(cx_a.foreground()).await;
1451        let client_a = server.create_client(&mut cx_a, "user_a").await;
1452        let client_b = server.create_client(&mut cx_b, "user_b").await;
1453        let client_c = server.create_client(&mut cx_c, "user_c").await;
1454
1455        // Share a worktree as client A.
1456        fs.insert_tree(
1457            "/a",
1458            json!({
1459                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1460                "file1": "",
1461                "file2": ""
1462            }),
1463        )
1464        .await;
1465        let project_a = cx_a.update(|cx| {
1466            Project::local(
1467                client_a.clone(),
1468                client_a.user_store.clone(),
1469                lang_registry.clone(),
1470                fs.clone(),
1471                cx,
1472            )
1473        });
1474        let (worktree_a, _) = project_a
1475            .update(&mut cx_a, |p, cx| {
1476                p.find_or_create_local_worktree("/a", false, cx)
1477            })
1478            .await
1479            .unwrap();
1480        worktree_a
1481            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1482            .await;
1483        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1484        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1485        project_a
1486            .update(&mut cx_a, |p, cx| p.share(cx))
1487            .await
1488            .unwrap();
1489
1490        // Join that worktree as clients B and C.
1491        let project_b = Project::remote(
1492            project_id,
1493            client_b.clone(),
1494            client_b.user_store.clone(),
1495            lang_registry.clone(),
1496            fs.clone(),
1497            &mut cx_b.to_async(),
1498        )
1499        .await
1500        .unwrap();
1501        let project_c = Project::remote(
1502            project_id,
1503            client_c.clone(),
1504            client_c.user_store.clone(),
1505            lang_registry.clone(),
1506            fs.clone(),
1507            &mut cx_c.to_async(),
1508        )
1509        .await
1510        .unwrap();
1511        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1512        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1513
1514        // Open and edit a buffer as both guests B and C.
1515        let buffer_b = project_b
1516            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1517            .await
1518            .unwrap();
1519        let buffer_c = project_c
1520            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1521            .await
1522            .unwrap();
1523        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1524        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1525
1526        // Open and edit that buffer as the host.
1527        let buffer_a = project_a
1528            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1529            .await
1530            .unwrap();
1531
1532        buffer_a
1533            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1534            .await;
1535        buffer_a.update(&mut cx_a, |buf, cx| {
1536            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1537        });
1538
1539        // Wait for edits to propagate
1540        buffer_a
1541            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1542            .await;
1543        buffer_b
1544            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1545            .await;
1546        buffer_c
1547            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1548            .await;
1549
1550        // Edit the buffer as the host and concurrently save as guest B.
1551        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1552        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1553        save_b.await.unwrap();
1554        assert_eq!(
1555            fs.load("/a/file1".as_ref()).await.unwrap(),
1556            "hi-a, i-am-c, i-am-b, i-am-a"
1557        );
1558        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1559        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1560        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1561
1562        // Make changes on host's file system, see those changes on guest worktrees.
1563        fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1564            .await
1565            .unwrap();
1566        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1567            .await
1568            .unwrap();
1569        fs.insert_file(Path::new("/a/file4"), "4".into())
1570            .await
1571            .unwrap();
1572
1573        worktree_a
1574            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1575            .await;
1576        worktree_b
1577            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1578            .await;
1579        worktree_c
1580            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1581            .await;
1582        worktree_a.read_with(&cx_a, |tree, _| {
1583            assert_eq!(
1584                tree.paths()
1585                    .map(|p| p.to_string_lossy())
1586                    .collect::<Vec<_>>(),
1587                &[".zed.toml", "file1-renamed", "file3", "file4"]
1588            )
1589        });
1590        worktree_b.read_with(&cx_b, |tree, _| {
1591            assert_eq!(
1592                tree.paths()
1593                    .map(|p| p.to_string_lossy())
1594                    .collect::<Vec<_>>(),
1595                &[".zed.toml", "file1-renamed", "file3", "file4"]
1596            )
1597        });
1598        worktree_c.read_with(&cx_c, |tree, _| {
1599            assert_eq!(
1600                tree.paths()
1601                    .map(|p| p.to_string_lossy())
1602                    .collect::<Vec<_>>(),
1603                &[".zed.toml", "file1-renamed", "file3", "file4"]
1604            )
1605        });
1606
1607        // Ensure buffer files are updated as well.
1608        buffer_a
1609            .condition(&cx_a, |buf, _| {
1610                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1611            })
1612            .await;
1613        buffer_b
1614            .condition(&cx_b, |buf, _| {
1615                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1616            })
1617            .await;
1618        buffer_c
1619            .condition(&cx_c, |buf, _| {
1620                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1621            })
1622            .await;
1623    }
1624
1625    #[gpui::test]
1626    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1627        cx_a.foreground().forbid_parking();
1628        let lang_registry = Arc::new(LanguageRegistry::new());
1629        let fs = Arc::new(FakeFs::new(cx_a.background()));
1630
1631        // Connect to a server as 2 clients.
1632        let mut server = TestServer::start(cx_a.foreground()).await;
1633        let client_a = server.create_client(&mut cx_a, "user_a").await;
1634        let client_b = server.create_client(&mut cx_b, "user_b").await;
1635
1636        // Share a project as client A
1637        fs.insert_tree(
1638            "/dir",
1639            json!({
1640                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1641                "a.txt": "a-contents",
1642            }),
1643        )
1644        .await;
1645
1646        let project_a = cx_a.update(|cx| {
1647            Project::local(
1648                client_a.clone(),
1649                client_a.user_store.clone(),
1650                lang_registry.clone(),
1651                fs.clone(),
1652                cx,
1653            )
1654        });
1655        let (worktree_a, _) = project_a
1656            .update(&mut cx_a, |p, cx| {
1657                p.find_or_create_local_worktree("/dir", false, cx)
1658            })
1659            .await
1660            .unwrap();
1661        worktree_a
1662            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1663            .await;
1664        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1665        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1666        project_a
1667            .update(&mut cx_a, |p, cx| p.share(cx))
1668            .await
1669            .unwrap();
1670
1671        // Join that project as client B
1672        let project_b = Project::remote(
1673            project_id,
1674            client_b.clone(),
1675            client_b.user_store.clone(),
1676            lang_registry.clone(),
1677            fs.clone(),
1678            &mut cx_b.to_async(),
1679        )
1680        .await
1681        .unwrap();
1682        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1683
1684        // Open a buffer as client B
1685        let buffer_b = project_b
1686            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1687            .await
1688            .unwrap();
1689        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1690
1691        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1692        buffer_b.read_with(&cx_b, |buf, _| {
1693            assert!(buf.is_dirty());
1694            assert!(!buf.has_conflict());
1695        });
1696
1697        buffer_b
1698            .update(&mut cx_b, |buf, cx| buf.save(cx))
1699            .await
1700            .unwrap();
1701        worktree_b
1702            .condition(&cx_b, |_, cx| {
1703                buffer_b.read(cx).file().unwrap().mtime() != mtime
1704            })
1705            .await;
1706        buffer_b.read_with(&cx_b, |buf, _| {
1707            assert!(!buf.is_dirty());
1708            assert!(!buf.has_conflict());
1709        });
1710
1711        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1712        buffer_b.read_with(&cx_b, |buf, _| {
1713            assert!(buf.is_dirty());
1714            assert!(!buf.has_conflict());
1715        });
1716    }
1717
1718    #[gpui::test]
1719    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1720        cx_a.foreground().forbid_parking();
1721        let lang_registry = Arc::new(LanguageRegistry::new());
1722        let fs = Arc::new(FakeFs::new(cx_a.background()));
1723
1724        // Connect to a server as 2 clients.
1725        let mut server = TestServer::start(cx_a.foreground()).await;
1726        let client_a = server.create_client(&mut cx_a, "user_a").await;
1727        let client_b = server.create_client(&mut cx_b, "user_b").await;
1728
1729        // Share a project as client A
1730        fs.insert_tree(
1731            "/dir",
1732            json!({
1733                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1734                "a.txt": "a-contents",
1735            }),
1736        )
1737        .await;
1738
1739        let project_a = cx_a.update(|cx| {
1740            Project::local(
1741                client_a.clone(),
1742                client_a.user_store.clone(),
1743                lang_registry.clone(),
1744                fs.clone(),
1745                cx,
1746            )
1747        });
1748        let (worktree_a, _) = project_a
1749            .update(&mut cx_a, |p, cx| {
1750                p.find_or_create_local_worktree("/dir", false, cx)
1751            })
1752            .await
1753            .unwrap();
1754        worktree_a
1755            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1756            .await;
1757        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1758        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1759        project_a
1760            .update(&mut cx_a, |p, cx| p.share(cx))
1761            .await
1762            .unwrap();
1763
1764        // Join that project as client B
1765        let project_b = Project::remote(
1766            project_id,
1767            client_b.clone(),
1768            client_b.user_store.clone(),
1769            lang_registry.clone(),
1770            fs.clone(),
1771            &mut cx_b.to_async(),
1772        )
1773        .await
1774        .unwrap();
1775        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1776
1777        // Open a buffer as client B
1778        let buffer_b = project_b
1779            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1780            .await
1781            .unwrap();
1782        buffer_b.read_with(&cx_b, |buf, _| {
1783            assert!(!buf.is_dirty());
1784            assert!(!buf.has_conflict());
1785        });
1786
1787        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1788            .await
1789            .unwrap();
1790        buffer_b
1791            .condition(&cx_b, |buf, _| {
1792                buf.text() == "new contents" && !buf.is_dirty()
1793            })
1794            .await;
1795        buffer_b.read_with(&cx_b, |buf, _| {
1796            assert!(!buf.has_conflict());
1797        });
1798    }
1799
1800    #[gpui::test(iterations = 100)]
1801    async fn test_editing_while_guest_opens_buffer(
1802        mut cx_a: TestAppContext,
1803        mut cx_b: TestAppContext,
1804    ) {
1805        cx_a.foreground().forbid_parking();
1806        let lang_registry = Arc::new(LanguageRegistry::new());
1807        let fs = Arc::new(FakeFs::new(cx_a.background()));
1808
1809        // Connect to a server as 2 clients.
1810        let mut server = TestServer::start(cx_a.foreground()).await;
1811        let client_a = server.create_client(&mut cx_a, "user_a").await;
1812        let client_b = server.create_client(&mut cx_b, "user_b").await;
1813
1814        // Share a project as client A
1815        fs.insert_tree(
1816            "/dir",
1817            json!({
1818                ".zed.toml": r#"collaborators = ["user_b"]"#,
1819                "a.txt": "a-contents",
1820            }),
1821        )
1822        .await;
1823        let project_a = cx_a.update(|cx| {
1824            Project::local(
1825                client_a.clone(),
1826                client_a.user_store.clone(),
1827                lang_registry.clone(),
1828                fs.clone(),
1829                cx,
1830            )
1831        });
1832        let (worktree_a, _) = project_a
1833            .update(&mut cx_a, |p, cx| {
1834                p.find_or_create_local_worktree("/dir", false, cx)
1835            })
1836            .await
1837            .unwrap();
1838        worktree_a
1839            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1840            .await;
1841        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1842        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1843        project_a
1844            .update(&mut cx_a, |p, cx| p.share(cx))
1845            .await
1846            .unwrap();
1847
1848        // Join that project as client B
1849        let project_b = Project::remote(
1850            project_id,
1851            client_b.clone(),
1852            client_b.user_store.clone(),
1853            lang_registry.clone(),
1854            fs.clone(),
1855            &mut cx_b.to_async(),
1856        )
1857        .await
1858        .unwrap();
1859
1860        // Open a buffer as client A
1861        let buffer_a = project_a
1862            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1863            .await
1864            .unwrap();
1865
1866        // Start opening the same buffer as client B
1867        let buffer_b = cx_b
1868            .background()
1869            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1870        task::yield_now().await;
1871
1872        // Edit the buffer as client A while client B is still opening it.
1873        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1874
1875        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1876        let buffer_b = buffer_b.await.unwrap();
1877        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1878    }
1879
1880    #[gpui::test]
1881    async fn test_leaving_worktree_while_opening_buffer(
1882        mut cx_a: TestAppContext,
1883        mut cx_b: TestAppContext,
1884    ) {
1885        cx_a.foreground().forbid_parking();
1886        let lang_registry = Arc::new(LanguageRegistry::new());
1887        let fs = Arc::new(FakeFs::new(cx_a.background()));
1888
1889        // Connect to a server as 2 clients.
1890        let mut server = TestServer::start(cx_a.foreground()).await;
1891        let client_a = server.create_client(&mut cx_a, "user_a").await;
1892        let client_b = server.create_client(&mut cx_b, "user_b").await;
1893
1894        // Share a project as client A
1895        fs.insert_tree(
1896            "/dir",
1897            json!({
1898                ".zed.toml": r#"collaborators = ["user_b"]"#,
1899                "a.txt": "a-contents",
1900            }),
1901        )
1902        .await;
1903        let project_a = cx_a.update(|cx| {
1904            Project::local(
1905                client_a.clone(),
1906                client_a.user_store.clone(),
1907                lang_registry.clone(),
1908                fs.clone(),
1909                cx,
1910            )
1911        });
1912        let (worktree_a, _) = project_a
1913            .update(&mut cx_a, |p, cx| {
1914                p.find_or_create_local_worktree("/dir", false, cx)
1915            })
1916            .await
1917            .unwrap();
1918        worktree_a
1919            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1920            .await;
1921        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1922        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1923        project_a
1924            .update(&mut cx_a, |p, cx| p.share(cx))
1925            .await
1926            .unwrap();
1927
1928        // Join that project as client B
1929        let project_b = Project::remote(
1930            project_id,
1931            client_b.clone(),
1932            client_b.user_store.clone(),
1933            lang_registry.clone(),
1934            fs.clone(),
1935            &mut cx_b.to_async(),
1936        )
1937        .await
1938        .unwrap();
1939
1940        // See that a guest has joined as client A.
1941        project_a
1942            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1943            .await;
1944
1945        // Begin opening a buffer as client B, but leave the project before the open completes.
1946        let buffer_b = cx_b
1947            .background()
1948            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1949        cx_b.update(|_| drop(project_b));
1950        drop(buffer_b);
1951
1952        // See that the guest has left.
1953        project_a
1954            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1955            .await;
1956    }
1957
1958    #[gpui::test]
1959    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1960        cx_a.foreground().forbid_parking();
1961        let lang_registry = Arc::new(LanguageRegistry::new());
1962        let fs = Arc::new(FakeFs::new(cx_a.background()));
1963
1964        // Connect to a server as 2 clients.
1965        let mut server = TestServer::start(cx_a.foreground()).await;
1966        let client_a = server.create_client(&mut cx_a, "user_a").await;
1967        let client_b = server.create_client(&mut cx_b, "user_b").await;
1968
1969        // Share a project as client A
1970        fs.insert_tree(
1971            "/a",
1972            json!({
1973                ".zed.toml": r#"collaborators = ["user_b"]"#,
1974                "a.txt": "a-contents",
1975                "b.txt": "b-contents",
1976            }),
1977        )
1978        .await;
1979        let project_a = cx_a.update(|cx| {
1980            Project::local(
1981                client_a.clone(),
1982                client_a.user_store.clone(),
1983                lang_registry.clone(),
1984                fs.clone(),
1985                cx,
1986            )
1987        });
1988        let (worktree_a, _) = project_a
1989            .update(&mut cx_a, |p, cx| {
1990                p.find_or_create_local_worktree("/a", false, cx)
1991            })
1992            .await
1993            .unwrap();
1994        worktree_a
1995            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1996            .await;
1997        let project_id = project_a
1998            .update(&mut cx_a, |project, _| project.next_remote_id())
1999            .await;
2000        project_a
2001            .update(&mut cx_a, |project, cx| project.share(cx))
2002            .await
2003            .unwrap();
2004
2005        // Join that project as client B
2006        let _project_b = Project::remote(
2007            project_id,
2008            client_b.clone(),
2009            client_b.user_store.clone(),
2010            lang_registry.clone(),
2011            fs.clone(),
2012            &mut cx_b.to_async(),
2013        )
2014        .await
2015        .unwrap();
2016
2017        // See that a guest has joined as client A.
2018        project_a
2019            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2020            .await;
2021
2022        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2023        client_b.disconnect(&cx_b.to_async()).unwrap();
2024        project_a
2025            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2026            .await;
2027    }
2028
2029    #[gpui::test]
2030    async fn test_collaborating_with_diagnostics(
2031        mut cx_a: TestAppContext,
2032        mut cx_b: TestAppContext,
2033    ) {
2034        cx_a.foreground().forbid_parking();
2035        let mut lang_registry = Arc::new(LanguageRegistry::new());
2036        let fs = Arc::new(FakeFs::new(cx_a.background()));
2037
2038        // Set up a fake language server.
2039        let (language_server_config, mut fake_language_server) =
2040            LanguageServerConfig::fake(cx_a.background()).await;
2041        Arc::get_mut(&mut lang_registry)
2042            .unwrap()
2043            .add(Arc::new(Language::new(
2044                LanguageConfig {
2045                    name: "Rust".to_string(),
2046                    path_suffixes: vec!["rs".to_string()],
2047                    language_server: Some(language_server_config),
2048                    ..Default::default()
2049                },
2050                Some(tree_sitter_rust::language()),
2051            )));
2052
2053        // Connect to a server as 2 clients.
2054        let mut server = TestServer::start(cx_a.foreground()).await;
2055        let client_a = server.create_client(&mut cx_a, "user_a").await;
2056        let client_b = server.create_client(&mut cx_b, "user_b").await;
2057
2058        // Share a project as client A
2059        fs.insert_tree(
2060            "/a",
2061            json!({
2062                ".zed.toml": r#"collaborators = ["user_b"]"#,
2063                "a.rs": "let one = two",
2064                "other.rs": "",
2065            }),
2066        )
2067        .await;
2068        let project_a = cx_a.update(|cx| {
2069            Project::local(
2070                client_a.clone(),
2071                client_a.user_store.clone(),
2072                lang_registry.clone(),
2073                fs.clone(),
2074                cx,
2075            )
2076        });
2077        let (worktree_a, _) = project_a
2078            .update(&mut cx_a, |p, cx| {
2079                p.find_or_create_local_worktree("/a", false, cx)
2080            })
2081            .await
2082            .unwrap();
2083        worktree_a
2084            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2085            .await;
2086        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2087        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2088        project_a
2089            .update(&mut cx_a, |p, cx| p.share(cx))
2090            .await
2091            .unwrap();
2092
2093        // Cause the language server to start.
2094        let _ = cx_a
2095            .background()
2096            .spawn(project_a.update(&mut cx_a, |project, cx| {
2097                project.open_buffer(
2098                    ProjectPath {
2099                        worktree_id,
2100                        path: Path::new("other.rs").into(),
2101                    },
2102                    cx,
2103                )
2104            }))
2105            .await
2106            .unwrap();
2107
2108        // Simulate a language server reporting errors for a file.
2109        fake_language_server
2110            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2111                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2112                version: None,
2113                diagnostics: vec![lsp::Diagnostic {
2114                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2115                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2116                    message: "message 1".to_string(),
2117                    ..Default::default()
2118                }],
2119            })
2120            .await;
2121
2122        // Wait for server to see the diagnostics update.
2123        server
2124            .condition(|store| {
2125                let worktree = store
2126                    .project(project_id)
2127                    .unwrap()
2128                    .worktrees
2129                    .get(&worktree_id.to_proto())
2130                    .unwrap();
2131
2132                !worktree
2133                    .share
2134                    .as_ref()
2135                    .unwrap()
2136                    .diagnostic_summaries
2137                    .is_empty()
2138            })
2139            .await;
2140
2141        // Join the worktree as client B.
2142        let project_b = Project::remote(
2143            project_id,
2144            client_b.clone(),
2145            client_b.user_store.clone(),
2146            lang_registry.clone(),
2147            fs.clone(),
2148            &mut cx_b.to_async(),
2149        )
2150        .await
2151        .unwrap();
2152
2153        project_b.read_with(&cx_b, |project, cx| {
2154            assert_eq!(
2155                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2156                &[(
2157                    ProjectPath {
2158                        worktree_id,
2159                        path: Arc::from(Path::new("a.rs")),
2160                    },
2161                    DiagnosticSummary {
2162                        error_count: 1,
2163                        warning_count: 0,
2164                        ..Default::default()
2165                    },
2166                )]
2167            )
2168        });
2169
2170        // Simulate a language server reporting more errors for a file.
2171        fake_language_server
2172            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2173                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2174                version: None,
2175                diagnostics: vec![
2176                    lsp::Diagnostic {
2177                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2178                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2179                        message: "message 1".to_string(),
2180                        ..Default::default()
2181                    },
2182                    lsp::Diagnostic {
2183                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2184                        range: lsp::Range::new(
2185                            lsp::Position::new(0, 10),
2186                            lsp::Position::new(0, 13),
2187                        ),
2188                        message: "message 2".to_string(),
2189                        ..Default::default()
2190                    },
2191                ],
2192            })
2193            .await;
2194
2195        // Client b gets the updated summaries
2196        project_b
2197            .condition(&cx_b, |project, cx| {
2198                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2199                    == &[(
2200                        ProjectPath {
2201                            worktree_id,
2202                            path: Arc::from(Path::new("a.rs")),
2203                        },
2204                        DiagnosticSummary {
2205                            error_count: 1,
2206                            warning_count: 1,
2207                            ..Default::default()
2208                        },
2209                    )]
2210            })
2211            .await;
2212
2213        // Open the file with the errors on client B. They should be present.
2214        let buffer_b = cx_b
2215            .background()
2216            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2217            .await
2218            .unwrap();
2219
2220        buffer_b.read_with(&cx_b, |buffer, _| {
2221            assert_eq!(
2222                buffer
2223                    .snapshot()
2224                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2225                    .map(|entry| entry)
2226                    .collect::<Vec<_>>(),
2227                &[
2228                    DiagnosticEntry {
2229                        range: Point::new(0, 4)..Point::new(0, 7),
2230                        diagnostic: Diagnostic {
2231                            group_id: 0,
2232                            message: "message 1".to_string(),
2233                            severity: lsp::DiagnosticSeverity::ERROR,
2234                            is_primary: true,
2235                            ..Default::default()
2236                        }
2237                    },
2238                    DiagnosticEntry {
2239                        range: Point::new(0, 10)..Point::new(0, 13),
2240                        diagnostic: Diagnostic {
2241                            group_id: 1,
2242                            severity: lsp::DiagnosticSeverity::WARNING,
2243                            message: "message 2".to_string(),
2244                            is_primary: true,
2245                            ..Default::default()
2246                        }
2247                    }
2248                ]
2249            );
2250        });
2251    }
2252
2253    #[gpui::test]
2254    async fn test_collaborating_with_completion(
2255        mut cx_a: TestAppContext,
2256        mut cx_b: TestAppContext,
2257    ) {
2258        cx_a.foreground().forbid_parking();
2259        let mut lang_registry = Arc::new(LanguageRegistry::new());
2260        let fs = Arc::new(FakeFs::new(cx_a.background()));
2261
2262        // Set up a fake language server.
2263        let (language_server_config, mut fake_language_server) =
2264            LanguageServerConfig::fake_with_capabilities(
2265                lsp::ServerCapabilities {
2266                    completion_provider: Some(lsp::CompletionOptions {
2267                        trigger_characters: Some(vec![".".to_string()]),
2268                        ..Default::default()
2269                    }),
2270                    ..Default::default()
2271                },
2272                cx_a.background(),
2273            )
2274            .await;
2275        Arc::get_mut(&mut lang_registry)
2276            .unwrap()
2277            .add(Arc::new(Language::new(
2278                LanguageConfig {
2279                    name: "Rust".to_string(),
2280                    path_suffixes: vec!["rs".to_string()],
2281                    language_server: Some(language_server_config),
2282                    ..Default::default()
2283                },
2284                Some(tree_sitter_rust::language()),
2285            )));
2286
2287        // Connect to a server as 2 clients.
2288        let mut server = TestServer::start(cx_a.foreground()).await;
2289        let client_a = server.create_client(&mut cx_a, "user_a").await;
2290        let client_b = server.create_client(&mut cx_b, "user_b").await;
2291
2292        // Share a project as client A
2293        fs.insert_tree(
2294            "/a",
2295            json!({
2296                ".zed.toml": r#"collaborators = ["user_b"]"#,
2297                "main.rs": "fn main() { a }",
2298                "other.rs": "",
2299            }),
2300        )
2301        .await;
2302        let project_a = cx_a.update(|cx| {
2303            Project::local(
2304                client_a.clone(),
2305                client_a.user_store.clone(),
2306                lang_registry.clone(),
2307                fs.clone(),
2308                cx,
2309            )
2310        });
2311        let (worktree_a, _) = project_a
2312            .update(&mut cx_a, |p, cx| {
2313                p.find_or_create_local_worktree("/a", false, cx)
2314            })
2315            .await
2316            .unwrap();
2317        worktree_a
2318            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2319            .await;
2320        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2321        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2322        project_a
2323            .update(&mut cx_a, |p, cx| p.share(cx))
2324            .await
2325            .unwrap();
2326
2327        // Join the worktree as client B.
2328        let project_b = Project::remote(
2329            project_id,
2330            client_b.clone(),
2331            client_b.user_store.clone(),
2332            lang_registry.clone(),
2333            fs.clone(),
2334            &mut cx_b.to_async(),
2335        )
2336        .await
2337        .unwrap();
2338
2339        // Open a file in an editor as the guest.
2340        let buffer_b = project_b
2341            .update(&mut cx_b, |p, cx| {
2342                p.open_buffer((worktree_id, "main.rs"), cx)
2343            })
2344            .await
2345            .unwrap();
2346        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2347        let editor_b = cx_b.add_view(window_b, |cx| {
2348            Editor::for_buffer(
2349                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2350                Arc::new(|cx| EditorSettings::test(cx)),
2351                cx,
2352            )
2353        });
2354
2355        // Type a completion trigger character as the guest.
2356        editor_b.update(&mut cx_b, |editor, cx| {
2357            editor.select_ranges([13..13], None, cx);
2358            editor.handle_input(&Input(".".into()), cx);
2359            cx.focus(&editor_b);
2360        });
2361
2362        // Receive a completion request as the host's language server.
2363        let (request_id, params) = fake_language_server
2364            .receive_request::<lsp::request::Completion>()
2365            .await;
2366        assert_eq!(
2367            params.text_document_position.text_document.uri,
2368            lsp::Url::from_file_path("/a/main.rs").unwrap(),
2369        );
2370        assert_eq!(
2371            params.text_document_position.position,
2372            lsp::Position::new(0, 14),
2373        );
2374
2375        // Return some completions from the host's language server.
2376        fake_language_server
2377            .respond(
2378                request_id,
2379                Some(lsp::CompletionResponse::Array(vec![
2380                    lsp::CompletionItem {
2381                        label: "first_method(…)".into(),
2382                        detail: Some("fn(&mut self, B) -> C".into()),
2383                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2384                            new_text: "first_method($1)".to_string(),
2385                            range: lsp::Range::new(
2386                                lsp::Position::new(0, 14),
2387                                lsp::Position::new(0, 14),
2388                            ),
2389                        })),
2390                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2391                        ..Default::default()
2392                    },
2393                    lsp::CompletionItem {
2394                        label: "second_method(…)".into(),
2395                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2396                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2397                            new_text: "second_method()".to_string(),
2398                            range: lsp::Range::new(
2399                                lsp::Position::new(0, 14),
2400                                lsp::Position::new(0, 14),
2401                            ),
2402                        })),
2403                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2404                        ..Default::default()
2405                    },
2406                ])),
2407            )
2408            .await;
2409
2410        // Open the buffer on the host.
2411        let buffer_a = project_a
2412            .update(&mut cx_a, |p, cx| {
2413                p.open_buffer((worktree_id, "main.rs"), cx)
2414            })
2415            .await
2416            .unwrap();
2417        buffer_a
2418            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2419            .await;
2420
2421        // Confirm a completion on the guest.
2422        editor_b.next_notification(&cx_b).await;
2423        editor_b.update(&mut cx_b, |editor, cx| {
2424            assert!(editor.has_completions());
2425            editor.confirm_completion(Some(0), cx);
2426            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2427        });
2428
2429        buffer_a
2430            .condition(&cx_a, |buffer, _| {
2431                buffer.text() == "fn main() { a.first_method() }"
2432            })
2433            .await;
2434
2435        // Receive a request resolve the selected completion on the host's language server.
2436        let (request_id, params) = fake_language_server
2437            .receive_request::<lsp::request::ResolveCompletionItem>()
2438            .await;
2439        assert_eq!(params.label, "first_method(…)");
2440
2441        // Return a resolved completion from the host's language server.
2442        // The resolved completion has an additional text edit.
2443        fake_language_server
2444            .respond(
2445                request_id,
2446                lsp::CompletionItem {
2447                    label: "first_method(…)".into(),
2448                    detail: Some("fn(&mut self, B) -> C".into()),
2449                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2450                        new_text: "first_method($1)".to_string(),
2451                        range: lsp::Range::new(
2452                            lsp::Position::new(0, 14),
2453                            lsp::Position::new(0, 14),
2454                        ),
2455                    })),
2456                    additional_text_edits: Some(vec![lsp::TextEdit {
2457                        new_text: "use d::SomeTrait;\n".to_string(),
2458                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2459                    }]),
2460                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2461                    ..Default::default()
2462                },
2463            )
2464            .await;
2465
2466        // The additional edit is applied.
2467        buffer_b
2468            .condition(&cx_b, |buffer, _| {
2469                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2470            })
2471            .await;
2472        assert_eq!(
2473            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2474            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2475        );
2476    }
2477
2478    #[gpui::test]
2479    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2480        cx_a.foreground().forbid_parking();
2481        let mut lang_registry = Arc::new(LanguageRegistry::new());
2482        let fs = Arc::new(FakeFs::new(cx_a.background()));
2483
2484        // Set up a fake language server.
2485        let (language_server_config, mut fake_language_server) =
2486            LanguageServerConfig::fake(cx_a.background()).await;
2487        Arc::get_mut(&mut lang_registry)
2488            .unwrap()
2489            .add(Arc::new(Language::new(
2490                LanguageConfig {
2491                    name: "Rust".to_string(),
2492                    path_suffixes: vec!["rs".to_string()],
2493                    language_server: Some(language_server_config),
2494                    ..Default::default()
2495                },
2496                Some(tree_sitter_rust::language()),
2497            )));
2498
2499        // Connect to a server as 2 clients.
2500        let mut server = TestServer::start(cx_a.foreground()).await;
2501        let client_a = server.create_client(&mut cx_a, "user_a").await;
2502        let client_b = server.create_client(&mut cx_b, "user_b").await;
2503
2504        // Share a project as client A
2505        fs.insert_tree(
2506            "/a",
2507            json!({
2508                ".zed.toml": r#"collaborators = ["user_b"]"#,
2509                "a.rs": "let one = two",
2510            }),
2511        )
2512        .await;
2513        let project_a = cx_a.update(|cx| {
2514            Project::local(
2515                client_a.clone(),
2516                client_a.user_store.clone(),
2517                lang_registry.clone(),
2518                fs.clone(),
2519                cx,
2520            )
2521        });
2522        let (worktree_a, _) = project_a
2523            .update(&mut cx_a, |p, cx| {
2524                p.find_or_create_local_worktree("/a", false, cx)
2525            })
2526            .await
2527            .unwrap();
2528        worktree_a
2529            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2530            .await;
2531        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2532        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2533        project_a
2534            .update(&mut cx_a, |p, cx| p.share(cx))
2535            .await
2536            .unwrap();
2537
2538        // Join the worktree as client B.
2539        let project_b = Project::remote(
2540            project_id,
2541            client_b.clone(),
2542            client_b.user_store.clone(),
2543            lang_registry.clone(),
2544            fs.clone(),
2545            &mut cx_b.to_async(),
2546        )
2547        .await
2548        .unwrap();
2549
2550        let buffer_b = cx_b
2551            .background()
2552            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2553            .await
2554            .unwrap();
2555
2556        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2557        let (request_id, _) = fake_language_server
2558            .receive_request::<lsp::request::Formatting>()
2559            .await;
2560        fake_language_server
2561            .respond(
2562                request_id,
2563                Some(vec![
2564                    lsp::TextEdit {
2565                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2566                        new_text: "h".to_string(),
2567                    },
2568                    lsp::TextEdit {
2569                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2570                        new_text: "y".to_string(),
2571                    },
2572                ]),
2573            )
2574            .await;
2575        format.await.unwrap();
2576        assert_eq!(
2577            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2578            "let honey = two"
2579        );
2580    }
2581
2582    #[gpui::test]
2583    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2584        cx_a.foreground().forbid_parking();
2585        let mut lang_registry = Arc::new(LanguageRegistry::new());
2586        let fs = Arc::new(FakeFs::new(cx_a.background()));
2587        fs.insert_tree(
2588            "/root-1",
2589            json!({
2590                ".zed.toml": r#"collaborators = ["user_b"]"#,
2591                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2592            }),
2593        )
2594        .await;
2595        fs.insert_tree(
2596            "/root-2",
2597            json!({
2598                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2599            }),
2600        )
2601        .await;
2602
2603        // Set up a fake language server.
2604        let (language_server_config, mut fake_language_server) =
2605            LanguageServerConfig::fake(cx_a.background()).await;
2606        Arc::get_mut(&mut lang_registry)
2607            .unwrap()
2608            .add(Arc::new(Language::new(
2609                LanguageConfig {
2610                    name: "Rust".to_string(),
2611                    path_suffixes: vec!["rs".to_string()],
2612                    language_server: Some(language_server_config),
2613                    ..Default::default()
2614                },
2615                Some(tree_sitter_rust::language()),
2616            )));
2617
2618        // Connect to a server as 2 clients.
2619        let mut server = TestServer::start(cx_a.foreground()).await;
2620        let client_a = server.create_client(&mut cx_a, "user_a").await;
2621        let client_b = server.create_client(&mut cx_b, "user_b").await;
2622
2623        // Share a project as client A
2624        let project_a = cx_a.update(|cx| {
2625            Project::local(
2626                client_a.clone(),
2627                client_a.user_store.clone(),
2628                lang_registry.clone(),
2629                fs.clone(),
2630                cx,
2631            )
2632        });
2633        let (worktree_a, _) = project_a
2634            .update(&mut cx_a, |p, cx| {
2635                p.find_or_create_local_worktree("/root-1", false, cx)
2636            })
2637            .await
2638            .unwrap();
2639        worktree_a
2640            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2641            .await;
2642        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2643        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2644        project_a
2645            .update(&mut cx_a, |p, cx| p.share(cx))
2646            .await
2647            .unwrap();
2648
2649        // Join the worktree as client B.
2650        let project_b = Project::remote(
2651            project_id,
2652            client_b.clone(),
2653            client_b.user_store.clone(),
2654            lang_registry.clone(),
2655            fs.clone(),
2656            &mut cx_b.to_async(),
2657        )
2658        .await
2659        .unwrap();
2660
2661        // Open the file to be formatted on client B.
2662        let buffer_b = cx_b
2663            .background()
2664            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2665            .await
2666            .unwrap();
2667
2668        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2669        let (request_id, _) = fake_language_server
2670            .receive_request::<lsp::request::GotoDefinition>()
2671            .await;
2672        fake_language_server
2673            .respond(
2674                request_id,
2675                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2676                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2677                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2678                ))),
2679            )
2680            .await;
2681        let definitions_1 = definitions_1.await.unwrap();
2682        cx_b.read(|cx| {
2683            assert_eq!(definitions_1.len(), 1);
2684            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2685            let target_buffer = definitions_1[0].target_buffer.read(cx);
2686            assert_eq!(
2687                target_buffer.text(),
2688                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2689            );
2690            assert_eq!(
2691                definitions_1[0].target_range.to_point(target_buffer),
2692                Point::new(0, 6)..Point::new(0, 9)
2693            );
2694        });
2695
2696        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2697        // the previous call to `definition`.
2698        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2699        let (request_id, _) = fake_language_server
2700            .receive_request::<lsp::request::GotoDefinition>()
2701            .await;
2702        fake_language_server
2703            .respond(
2704                request_id,
2705                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2706                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2707                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2708                ))),
2709            )
2710            .await;
2711        let definitions_2 = definitions_2.await.unwrap();
2712        cx_b.read(|cx| {
2713            assert_eq!(definitions_2.len(), 1);
2714            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2715            let target_buffer = definitions_2[0].target_buffer.read(cx);
2716            assert_eq!(
2717                target_buffer.text(),
2718                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2719            );
2720            assert_eq!(
2721                definitions_2[0].target_range.to_point(target_buffer),
2722                Point::new(1, 6)..Point::new(1, 11)
2723            );
2724        });
2725        assert_eq!(
2726            definitions_1[0].target_buffer,
2727            definitions_2[0].target_buffer
2728        );
2729
2730        cx_b.update(|_| {
2731            drop(definitions_1);
2732            drop(definitions_2);
2733        });
2734        project_b
2735            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2736            .await;
2737    }
2738
2739    #[gpui::test]
2740    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2741        mut cx_a: TestAppContext,
2742        mut cx_b: TestAppContext,
2743        mut rng: StdRng,
2744    ) {
2745        cx_a.foreground().forbid_parking();
2746        let mut lang_registry = Arc::new(LanguageRegistry::new());
2747        let fs = Arc::new(FakeFs::new(cx_a.background()));
2748        fs.insert_tree(
2749            "/root",
2750            json!({
2751                ".zed.toml": r#"collaborators = ["user_b"]"#,
2752                "a.rs": "const ONE: usize = b::TWO;",
2753                "b.rs": "const TWO: usize = 2",
2754            }),
2755        )
2756        .await;
2757
2758        // Set up a fake language server.
2759        let (language_server_config, mut fake_language_server) =
2760            LanguageServerConfig::fake(cx_a.background()).await;
2761        Arc::get_mut(&mut lang_registry)
2762            .unwrap()
2763            .add(Arc::new(Language::new(
2764                LanguageConfig {
2765                    name: "Rust".to_string(),
2766                    path_suffixes: vec!["rs".to_string()],
2767                    language_server: Some(language_server_config),
2768                    ..Default::default()
2769                },
2770                Some(tree_sitter_rust::language()),
2771            )));
2772
2773        // Connect to a server as 2 clients.
2774        let mut server = TestServer::start(cx_a.foreground()).await;
2775        let client_a = server.create_client(&mut cx_a, "user_a").await;
2776        let client_b = server.create_client(&mut cx_b, "user_b").await;
2777
2778        // Share a project as client A
2779        let project_a = cx_a.update(|cx| {
2780            Project::local(
2781                client_a.clone(),
2782                client_a.user_store.clone(),
2783                lang_registry.clone(),
2784                fs.clone(),
2785                cx,
2786            )
2787        });
2788        let (worktree_a, _) = project_a
2789            .update(&mut cx_a, |p, cx| {
2790                p.find_or_create_local_worktree("/root", false, cx)
2791            })
2792            .await
2793            .unwrap();
2794        worktree_a
2795            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2796            .await;
2797        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2798        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2799        project_a
2800            .update(&mut cx_a, |p, cx| p.share(cx))
2801            .await
2802            .unwrap();
2803
2804        // Join the worktree as client B.
2805        let project_b = Project::remote(
2806            project_id,
2807            client_b.clone(),
2808            client_b.user_store.clone(),
2809            lang_registry.clone(),
2810            fs.clone(),
2811            &mut cx_b.to_async(),
2812        )
2813        .await
2814        .unwrap();
2815
2816        let buffer_b1 = cx_b
2817            .background()
2818            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2819            .await
2820            .unwrap();
2821
2822        let definitions;
2823        let buffer_b2;
2824        if rng.gen() {
2825            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2826            buffer_b2 =
2827                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2828        } else {
2829            buffer_b2 =
2830                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2831            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2832        }
2833
2834        let (request_id, _) = fake_language_server
2835            .receive_request::<lsp::request::GotoDefinition>()
2836            .await;
2837        fake_language_server
2838            .respond(
2839                request_id,
2840                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2841                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2842                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2843                ))),
2844            )
2845            .await;
2846
2847        let buffer_b2 = buffer_b2.await.unwrap();
2848        let definitions = definitions.await.unwrap();
2849        assert_eq!(definitions.len(), 1);
2850        assert_eq!(definitions[0].target_buffer, buffer_b2);
2851    }
2852
2853    #[gpui::test]
2854    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2855        cx_a.foreground().forbid_parking();
2856
2857        // Connect to a server as 2 clients.
2858        let mut server = TestServer::start(cx_a.foreground()).await;
2859        let client_a = server.create_client(&mut cx_a, "user_a").await;
2860        let client_b = server.create_client(&mut cx_b, "user_b").await;
2861
2862        // Create an org that includes these 2 users.
2863        let db = &server.app_state.db;
2864        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2865        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2866            .await
2867            .unwrap();
2868        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2869            .await
2870            .unwrap();
2871
2872        // Create a channel that includes all the users.
2873        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2874        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2875            .await
2876            .unwrap();
2877        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2878            .await
2879            .unwrap();
2880        db.create_channel_message(
2881            channel_id,
2882            client_b.current_user_id(&cx_b),
2883            "hello A, it's B.",
2884            OffsetDateTime::now_utc(),
2885            1,
2886        )
2887        .await
2888        .unwrap();
2889
2890        let channels_a = cx_a
2891            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2892        channels_a
2893            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2894            .await;
2895        channels_a.read_with(&cx_a, |list, _| {
2896            assert_eq!(
2897                list.available_channels().unwrap(),
2898                &[ChannelDetails {
2899                    id: channel_id.to_proto(),
2900                    name: "test-channel".to_string()
2901                }]
2902            )
2903        });
2904        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2905            this.get_channel(channel_id.to_proto(), cx).unwrap()
2906        });
2907        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2908        channel_a
2909            .condition(&cx_a, |channel, _| {
2910                channel_messages(channel)
2911                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2912            })
2913            .await;
2914
2915        let channels_b = cx_b
2916            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2917        channels_b
2918            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2919            .await;
2920        channels_b.read_with(&cx_b, |list, _| {
2921            assert_eq!(
2922                list.available_channels().unwrap(),
2923                &[ChannelDetails {
2924                    id: channel_id.to_proto(),
2925                    name: "test-channel".to_string()
2926                }]
2927            )
2928        });
2929
2930        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2931            this.get_channel(channel_id.to_proto(), cx).unwrap()
2932        });
2933        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2934        channel_b
2935            .condition(&cx_b, |channel, _| {
2936                channel_messages(channel)
2937                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2938            })
2939            .await;
2940
2941        channel_a
2942            .update(&mut cx_a, |channel, cx| {
2943                channel
2944                    .send_message("oh, hi B.".to_string(), cx)
2945                    .unwrap()
2946                    .detach();
2947                let task = channel.send_message("sup".to_string(), cx).unwrap();
2948                assert_eq!(
2949                    channel_messages(channel),
2950                    &[
2951                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2952                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2953                        ("user_a".to_string(), "sup".to_string(), true)
2954                    ]
2955                );
2956                task
2957            })
2958            .await
2959            .unwrap();
2960
2961        channel_b
2962            .condition(&cx_b, |channel, _| {
2963                channel_messages(channel)
2964                    == [
2965                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2966                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2967                        ("user_a".to_string(), "sup".to_string(), false),
2968                    ]
2969            })
2970            .await;
2971
2972        assert_eq!(
2973            server
2974                .state()
2975                .await
2976                .channel(channel_id)
2977                .unwrap()
2978                .connection_ids
2979                .len(),
2980            2
2981        );
2982        cx_b.update(|_| drop(channel_b));
2983        server
2984            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2985            .await;
2986
2987        cx_a.update(|_| drop(channel_a));
2988        server
2989            .condition(|state| state.channel(channel_id).is_none())
2990            .await;
2991    }
2992
2993    #[gpui::test]
2994    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2995        cx_a.foreground().forbid_parking();
2996
2997        let mut server = TestServer::start(cx_a.foreground()).await;
2998        let client_a = server.create_client(&mut cx_a, "user_a").await;
2999
3000        let db = &server.app_state.db;
3001        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3002        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3003        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3004            .await
3005            .unwrap();
3006        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3007            .await
3008            .unwrap();
3009
3010        let channels_a = cx_a
3011            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3012        channels_a
3013            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3014            .await;
3015        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3016            this.get_channel(channel_id.to_proto(), cx).unwrap()
3017        });
3018
3019        // Messages aren't allowed to be too long.
3020        channel_a
3021            .update(&mut cx_a, |channel, cx| {
3022                let long_body = "this is long.\n".repeat(1024);
3023                channel.send_message(long_body, cx).unwrap()
3024            })
3025            .await
3026            .unwrap_err();
3027
3028        // Messages aren't allowed to be blank.
3029        channel_a.update(&mut cx_a, |channel, cx| {
3030            channel.send_message(String::new(), cx).unwrap_err()
3031        });
3032
3033        // Leading and trailing whitespace are trimmed.
3034        channel_a
3035            .update(&mut cx_a, |channel, cx| {
3036                channel
3037                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3038                    .unwrap()
3039            })
3040            .await
3041            .unwrap();
3042        assert_eq!(
3043            db.get_channel_messages(channel_id, 10, None)
3044                .await
3045                .unwrap()
3046                .iter()
3047                .map(|m| &m.body)
3048                .collect::<Vec<_>>(),
3049            &["surrounded by whitespace"]
3050        );
3051    }
3052
3053    #[gpui::test]
3054    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3055        cx_a.foreground().forbid_parking();
3056
3057        // Connect to a server as 2 clients.
3058        let mut server = TestServer::start(cx_a.foreground()).await;
3059        let client_a = server.create_client(&mut cx_a, "user_a").await;
3060        let client_b = server.create_client(&mut cx_b, "user_b").await;
3061        let mut status_b = client_b.status();
3062
3063        // Create an org that includes these 2 users.
3064        let db = &server.app_state.db;
3065        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3066        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3067            .await
3068            .unwrap();
3069        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3070            .await
3071            .unwrap();
3072
3073        // Create a channel that includes all the users.
3074        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3075        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3076            .await
3077            .unwrap();
3078        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3079            .await
3080            .unwrap();
3081        db.create_channel_message(
3082            channel_id,
3083            client_b.current_user_id(&cx_b),
3084            "hello A, it's B.",
3085            OffsetDateTime::now_utc(),
3086            2,
3087        )
3088        .await
3089        .unwrap();
3090
3091        let channels_a = cx_a
3092            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3093        channels_a
3094            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3095            .await;
3096
3097        channels_a.read_with(&cx_a, |list, _| {
3098            assert_eq!(
3099                list.available_channels().unwrap(),
3100                &[ChannelDetails {
3101                    id: channel_id.to_proto(),
3102                    name: "test-channel".to_string()
3103                }]
3104            )
3105        });
3106        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3107            this.get_channel(channel_id.to_proto(), cx).unwrap()
3108        });
3109        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3110        channel_a
3111            .condition(&cx_a, |channel, _| {
3112                channel_messages(channel)
3113                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3114            })
3115            .await;
3116
3117        let channels_b = cx_b
3118            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3119        channels_b
3120            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3121            .await;
3122        channels_b.read_with(&cx_b, |list, _| {
3123            assert_eq!(
3124                list.available_channels().unwrap(),
3125                &[ChannelDetails {
3126                    id: channel_id.to_proto(),
3127                    name: "test-channel".to_string()
3128                }]
3129            )
3130        });
3131
3132        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3133            this.get_channel(channel_id.to_proto(), cx).unwrap()
3134        });
3135        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3136        channel_b
3137            .condition(&cx_b, |channel, _| {
3138                channel_messages(channel)
3139                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3140            })
3141            .await;
3142
3143        // Disconnect client B, ensuring we can still access its cached channel data.
3144        server.forbid_connections();
3145        server.disconnect_client(client_b.current_user_id(&cx_b));
3146        while !matches!(
3147            status_b.next().await,
3148            Some(client::Status::ReconnectionError { .. })
3149        ) {}
3150
3151        channels_b.read_with(&cx_b, |channels, _| {
3152            assert_eq!(
3153                channels.available_channels().unwrap(),
3154                [ChannelDetails {
3155                    id: channel_id.to_proto(),
3156                    name: "test-channel".to_string()
3157                }]
3158            )
3159        });
3160        channel_b.read_with(&cx_b, |channel, _| {
3161            assert_eq!(
3162                channel_messages(channel),
3163                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3164            )
3165        });
3166
3167        // Send a message from client B while it is disconnected.
3168        channel_b
3169            .update(&mut cx_b, |channel, cx| {
3170                let task = channel
3171                    .send_message("can you see this?".to_string(), cx)
3172                    .unwrap();
3173                assert_eq!(
3174                    channel_messages(channel),
3175                    &[
3176                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3177                        ("user_b".to_string(), "can you see this?".to_string(), true)
3178                    ]
3179                );
3180                task
3181            })
3182            .await
3183            .unwrap_err();
3184
3185        // Send a message from client A while B is disconnected.
3186        channel_a
3187            .update(&mut cx_a, |channel, cx| {
3188                channel
3189                    .send_message("oh, hi B.".to_string(), cx)
3190                    .unwrap()
3191                    .detach();
3192                let task = channel.send_message("sup".to_string(), cx).unwrap();
3193                assert_eq!(
3194                    channel_messages(channel),
3195                    &[
3196                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3197                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3198                        ("user_a".to_string(), "sup".to_string(), true)
3199                    ]
3200                );
3201                task
3202            })
3203            .await
3204            .unwrap();
3205
3206        // Give client B a chance to reconnect.
3207        server.allow_connections();
3208        cx_b.foreground().advance_clock(Duration::from_secs(10));
3209
3210        // Verify that B sees the new messages upon reconnection, as well as the message client B
3211        // sent while offline.
3212        channel_b
3213            .condition(&cx_b, |channel, _| {
3214                channel_messages(channel)
3215                    == [
3216                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3217                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3218                        ("user_a".to_string(), "sup".to_string(), false),
3219                        ("user_b".to_string(), "can you see this?".to_string(), false),
3220                    ]
3221            })
3222            .await;
3223
3224        // Ensure client A and B can communicate normally after reconnection.
3225        channel_a
3226            .update(&mut cx_a, |channel, cx| {
3227                channel.send_message("you online?".to_string(), cx).unwrap()
3228            })
3229            .await
3230            .unwrap();
3231        channel_b
3232            .condition(&cx_b, |channel, _| {
3233                channel_messages(channel)
3234                    == [
3235                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3236                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3237                        ("user_a".to_string(), "sup".to_string(), false),
3238                        ("user_b".to_string(), "can you see this?".to_string(), false),
3239                        ("user_a".to_string(), "you online?".to_string(), false),
3240                    ]
3241            })
3242            .await;
3243
3244        channel_b
3245            .update(&mut cx_b, |channel, cx| {
3246                channel.send_message("yep".to_string(), cx).unwrap()
3247            })
3248            .await
3249            .unwrap();
3250        channel_a
3251            .condition(&cx_a, |channel, _| {
3252                channel_messages(channel)
3253                    == [
3254                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3255                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3256                        ("user_a".to_string(), "sup".to_string(), false),
3257                        ("user_b".to_string(), "can you see this?".to_string(), false),
3258                        ("user_a".to_string(), "you online?".to_string(), false),
3259                        ("user_b".to_string(), "yep".to_string(), false),
3260                    ]
3261            })
3262            .await;
3263    }
3264
3265    #[gpui::test]
3266    async fn test_contacts(
3267        mut cx_a: TestAppContext,
3268        mut cx_b: TestAppContext,
3269        mut cx_c: TestAppContext,
3270    ) {
3271        cx_a.foreground().forbid_parking();
3272        let lang_registry = Arc::new(LanguageRegistry::new());
3273        let fs = Arc::new(FakeFs::new(cx_a.background()));
3274
3275        // Connect to a server as 3 clients.
3276        let mut server = TestServer::start(cx_a.foreground()).await;
3277        let client_a = server.create_client(&mut cx_a, "user_a").await;
3278        let client_b = server.create_client(&mut cx_b, "user_b").await;
3279        let client_c = server.create_client(&mut cx_c, "user_c").await;
3280
3281        // Share a worktree as client A.
3282        fs.insert_tree(
3283            "/a",
3284            json!({
3285                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3286            }),
3287        )
3288        .await;
3289
3290        let project_a = cx_a.update(|cx| {
3291            Project::local(
3292                client_a.clone(),
3293                client_a.user_store.clone(),
3294                lang_registry.clone(),
3295                fs.clone(),
3296                cx,
3297            )
3298        });
3299        let (worktree_a, _) = project_a
3300            .update(&mut cx_a, |p, cx| {
3301                p.find_or_create_local_worktree("/a", false, cx)
3302            })
3303            .await
3304            .unwrap();
3305        worktree_a
3306            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3307            .await;
3308
3309        client_a
3310            .user_store
3311            .condition(&cx_a, |user_store, _| {
3312                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3313            })
3314            .await;
3315        client_b
3316            .user_store
3317            .condition(&cx_b, |user_store, _| {
3318                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3319            })
3320            .await;
3321        client_c
3322            .user_store
3323            .condition(&cx_c, |user_store, _| {
3324                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3325            })
3326            .await;
3327
3328        let project_id = project_a
3329            .update(&mut cx_a, |project, _| project.next_remote_id())
3330            .await;
3331        project_a
3332            .update(&mut cx_a, |project, cx| project.share(cx))
3333            .await
3334            .unwrap();
3335
3336        let _project_b = Project::remote(
3337            project_id,
3338            client_b.clone(),
3339            client_b.user_store.clone(),
3340            lang_registry.clone(),
3341            fs.clone(),
3342            &mut cx_b.to_async(),
3343        )
3344        .await
3345        .unwrap();
3346
3347        client_a
3348            .user_store
3349            .condition(&cx_a, |user_store, _| {
3350                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3351            })
3352            .await;
3353        client_b
3354            .user_store
3355            .condition(&cx_b, |user_store, _| {
3356                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3357            })
3358            .await;
3359        client_c
3360            .user_store
3361            .condition(&cx_c, |user_store, _| {
3362                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3363            })
3364            .await;
3365
3366        project_a
3367            .condition(&cx_a, |project, _| {
3368                project.collaborators().contains_key(&client_b.peer_id)
3369            })
3370            .await;
3371
3372        cx_a.update(move |_| drop(project_a));
3373        client_a
3374            .user_store
3375            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3376            .await;
3377        client_b
3378            .user_store
3379            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3380            .await;
3381        client_c
3382            .user_store
3383            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3384            .await;
3385
3386        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3387            user_store
3388                .contacts()
3389                .iter()
3390                .map(|contact| {
3391                    let worktrees = contact
3392                        .projects
3393                        .iter()
3394                        .map(|p| {
3395                            (
3396                                p.worktree_root_names[0].as_str(),
3397                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3398                            )
3399                        })
3400                        .collect();
3401                    (contact.user.github_login.as_str(), worktrees)
3402                })
3403                .collect()
3404        }
3405    }
3406
3407    struct TestServer {
3408        peer: Arc<Peer>,
3409        app_state: Arc<AppState>,
3410        server: Arc<Server>,
3411        foreground: Rc<executor::Foreground>,
3412        notifications: mpsc::Receiver<()>,
3413        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3414        forbid_connections: Arc<AtomicBool>,
3415        _test_db: TestDb,
3416    }
3417
3418    impl TestServer {
3419        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3420            let test_db = TestDb::new();
3421            let app_state = Self::build_app_state(&test_db).await;
3422            let peer = Peer::new();
3423            let notifications = mpsc::channel(128);
3424            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3425            Self {
3426                peer,
3427                app_state,
3428                server,
3429                foreground,
3430                notifications: notifications.1,
3431                connection_killers: Default::default(),
3432                forbid_connections: Default::default(),
3433                _test_db: test_db,
3434            }
3435        }
3436
3437        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3438            let http = FakeHttpClient::with_404_response();
3439            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3440            let client_name = name.to_string();
3441            let mut client = Client::new(http.clone());
3442            let server = self.server.clone();
3443            let connection_killers = self.connection_killers.clone();
3444            let forbid_connections = self.forbid_connections.clone();
3445            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3446
3447            Arc::get_mut(&mut client)
3448                .unwrap()
3449                .override_authenticate(move |cx| {
3450                    cx.spawn(|_| async move {
3451                        let access_token = "the-token".to_string();
3452                        Ok(Credentials {
3453                            user_id: user_id.0 as u64,
3454                            access_token,
3455                        })
3456                    })
3457                })
3458                .override_establish_connection(move |credentials, cx| {
3459                    assert_eq!(credentials.user_id, user_id.0 as u64);
3460                    assert_eq!(credentials.access_token, "the-token");
3461
3462                    let server = server.clone();
3463                    let connection_killers = connection_killers.clone();
3464                    let forbid_connections = forbid_connections.clone();
3465                    let client_name = client_name.clone();
3466                    let connection_id_tx = connection_id_tx.clone();
3467                    cx.spawn(move |cx| async move {
3468                        if forbid_connections.load(SeqCst) {
3469                            Err(EstablishConnectionError::other(anyhow!(
3470                                "server is forbidding connections"
3471                            )))
3472                        } else {
3473                            let (client_conn, server_conn, kill_conn) =
3474                                Connection::in_memory(cx.background());
3475                            connection_killers.lock().insert(user_id, kill_conn);
3476                            cx.background()
3477                                .spawn(server.handle_connection(
3478                                    server_conn,
3479                                    client_name,
3480                                    user_id,
3481                                    Some(connection_id_tx),
3482                                ))
3483                                .detach();
3484                            Ok(client_conn)
3485                        }
3486                    })
3487                });
3488
3489            client
3490                .authenticate_and_connect(&cx.to_async())
3491                .await
3492                .unwrap();
3493
3494            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3495            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3496            let mut authed_user =
3497                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3498            while authed_user.next().await.unwrap().is_none() {}
3499
3500            TestClient {
3501                client,
3502                peer_id,
3503                user_store,
3504            }
3505        }
3506
3507        fn disconnect_client(&self, user_id: UserId) {
3508            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3509                let _ = kill_conn.try_send(Some(()));
3510            }
3511        }
3512
3513        fn forbid_connections(&self) {
3514            self.forbid_connections.store(true, SeqCst);
3515        }
3516
3517        fn allow_connections(&self) {
3518            self.forbid_connections.store(false, SeqCst);
3519        }
3520
3521        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3522            let mut config = Config::default();
3523            config.session_secret = "a".repeat(32);
3524            config.database_url = test_db.url.clone();
3525            let github_client = github::AppClient::test();
3526            Arc::new(AppState {
3527                db: test_db.db().clone(),
3528                handlebars: Default::default(),
3529                auth_client: auth::build_client("", ""),
3530                repo_client: github::RepoClient::test(&github_client),
3531                github_client,
3532                config,
3533            })
3534        }
3535
3536        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3537            self.server.store.read()
3538        }
3539
3540        async fn condition<F>(&mut self, mut predicate: F)
3541        where
3542            F: FnMut(&Store) -> bool,
3543        {
3544            async_std::future::timeout(Duration::from_millis(500), async {
3545                while !(predicate)(&*self.server.store.read()) {
3546                    self.foreground.start_waiting();
3547                    self.notifications.next().await;
3548                    self.foreground.finish_waiting();
3549                }
3550            })
3551            .await
3552            .expect("condition timed out");
3553        }
3554    }
3555
3556    impl Drop for TestServer {
3557        fn drop(&mut self) {
3558            self.peer.reset();
3559        }
3560    }
3561
3562    struct TestClient {
3563        client: Arc<Client>,
3564        pub peer_id: PeerId,
3565        pub user_store: ModelHandle<UserStore>,
3566    }
3567
3568    impl Deref for TestClient {
3569        type Target = Arc<Client>;
3570
3571        fn deref(&self) -> &Self::Target {
3572            &self.client
3573        }
3574    }
3575
3576    impl TestClient {
3577        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3578            UserId::from_proto(
3579                self.user_store
3580                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3581            )
3582        }
3583    }
3584
3585    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3586        channel
3587            .messages()
3588            .cursor::<()>()
3589            .map(|m| {
3590                (
3591                    m.sender.github_login.clone(),
3592                    m.body.clone(),
3593                    m.is_pending(),
3594                )
3595            })
3596            .collect()
3597    }
3598
3599    struct EmptyView;
3600
3601    impl gpui::Entity for EmptyView {
3602        type Event = ();
3603    }
3604
3605    impl gpui::View for EmptyView {
3606        fn ui_name() -> &'static str {
3607            "empty view"
3608        }
3609
3610        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3611            gpui::Element::boxed(gpui::elements::Empty)
3612        }
3613    }
3614}