rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_channels)
  89            .add_handler(Server::get_users)
  90            .add_handler(Server::join_channel)
  91            .add_handler(Server::leave_channel)
  92            .add_handler(Server::send_channel_message)
  93            .add_handler(Server::get_channel_messages);
  94
  95        Arc::new(server)
  96    }
  97
  98    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  99    where
 100        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 101        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 102        M: EnvelopedMessage,
 103    {
 104        let prev_handler = self.handlers.insert(
 105            TypeId::of::<M>(),
 106            Box::new(move |server, envelope| {
 107                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 108                (handler)(server, *envelope).boxed()
 109            }),
 110        );
 111        if prev_handler.is_some() {
 112            panic!("registered a handler for the same message twice");
 113        }
 114        self
 115    }
 116
 117    pub fn handle_connection(
 118        self: &Arc<Self>,
 119        connection: Connection,
 120        addr: String,
 121        user_id: UserId,
 122        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 123    ) -> impl Future<Output = ()> {
 124        let mut this = self.clone();
 125        async move {
 126            let (connection_id, handle_io, mut incoming_rx) =
 127                this.peer.add_connection(connection).await;
 128
 129            if let Some(send_connection_id) = send_connection_id.as_mut() {
 130                let _ = send_connection_id.send(connection_id).await;
 131            }
 132
 133            this.state_mut().add_connection(connection_id, user_id);
 134            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 135                log::error!("error updating contacts for {:?}: {}", user_id, err);
 136            }
 137
 138            let handle_io = handle_io.fuse();
 139            futures::pin_mut!(handle_io);
 140            loop {
 141                let next_message = incoming_rx.next().fuse();
 142                futures::pin_mut!(next_message);
 143                futures::select_biased! {
 144                    result = handle_io => {
 145                        if let Err(err) = result {
 146                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 147                        }
 148                        break;
 149                    }
 150                    message = next_message => {
 151                        if let Some(message) = message {
 152                            let start_time = Instant::now();
 153                            let type_name = message.payload_type_name();
 154                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 155                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 156                                if let Err(err) = (handler)(this.clone(), message).await {
 157                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 158                                } else {
 159                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 160                                }
 161
 162                                if let Some(mut notifications) = this.notifications.clone() {
 163                                    let _ = notifications.send(()).await;
 164                                }
 165                            } else {
 166                                log::warn!("unhandled message: {}", type_name);
 167                            }
 168                        } else {
 169                            log::info!("rpc connection closed {:?}", addr);
 170                            break;
 171                        }
 172                    }
 173                }
 174            }
 175
 176            if let Err(err) = this.sign_out(connection_id).await {
 177                log::error!("error signing out connection {:?} - {:?}", addr, err);
 178            }
 179        }
 180    }
 181
 182    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 183        self.peer.disconnect(connection_id);
 184        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 185
 186        for (project_id, project) in removed_connection.hosted_projects {
 187            if let Some(share) = project.share {
 188                broadcast(
 189                    connection_id,
 190                    share.guests.keys().copied().collect(),
 191                    |conn_id| {
 192                        self.peer
 193                            .send(conn_id, proto::UnshareProject { project_id })
 194                    },
 195                )?;
 196            }
 197        }
 198
 199        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 200            broadcast(connection_id, peer_ids, |conn_id| {
 201                self.peer.send(
 202                    conn_id,
 203                    proto::RemoveProjectCollaborator {
 204                        project_id,
 205                        peer_id: connection_id.0,
 206                    },
 207                )
 208            })?;
 209        }
 210
 211        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 212        Ok(())
 213    }
 214
 215    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 216        self.peer.respond(request.receipt(), proto::Ack {})?;
 217        Ok(())
 218    }
 219
 220    async fn register_project(
 221        mut self: Arc<Server>,
 222        request: TypedEnvelope<proto::RegisterProject>,
 223    ) -> tide::Result<()> {
 224        let project_id = {
 225            let mut state = self.state_mut();
 226            let user_id = state.user_id_for_connection(request.sender_id)?;
 227            state.register_project(request.sender_id, user_id)
 228        };
 229        self.peer.respond(
 230            request.receipt(),
 231            proto::RegisterProjectResponse { project_id },
 232        )?;
 233        Ok(())
 234    }
 235
 236    async fn unregister_project(
 237        mut self: Arc<Server>,
 238        request: TypedEnvelope<proto::UnregisterProject>,
 239    ) -> tide::Result<()> {
 240        let project = self
 241            .state_mut()
 242            .unregister_project(request.payload.project_id, request.sender_id)
 243            .ok_or_else(|| anyhow!("no such project"))?;
 244        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 245        Ok(())
 246    }
 247
 248    async fn share_project(
 249        mut self: Arc<Server>,
 250        request: TypedEnvelope<proto::ShareProject>,
 251    ) -> tide::Result<()> {
 252        self.state_mut()
 253            .share_project(request.payload.project_id, request.sender_id);
 254        self.peer.respond(request.receipt(), proto::Ack {})?;
 255        Ok(())
 256    }
 257
 258    async fn unshare_project(
 259        mut self: Arc<Server>,
 260        request: TypedEnvelope<proto::UnshareProject>,
 261    ) -> tide::Result<()> {
 262        let project_id = request.payload.project_id;
 263        let project = self
 264            .state_mut()
 265            .unshare_project(project_id, request.sender_id)?;
 266
 267        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 268            self.peer
 269                .send(conn_id, proto::UnshareProject { project_id })
 270        })?;
 271        self.update_contacts_for_users(&project.authorized_user_ids)?;
 272        Ok(())
 273    }
 274
 275    async fn join_project(
 276        mut self: Arc<Server>,
 277        request: TypedEnvelope<proto::JoinProject>,
 278    ) -> tide::Result<()> {
 279        let project_id = request.payload.project_id;
 280
 281        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 282        let response_data = self
 283            .state_mut()
 284            .join_project(request.sender_id, user_id, project_id)
 285            .and_then(|joined| {
 286                let share = joined.project.share()?;
 287                let peer_count = share.guests.len();
 288                let mut collaborators = Vec::with_capacity(peer_count);
 289                collaborators.push(proto::Collaborator {
 290                    peer_id: joined.project.host_connection_id.0,
 291                    replica_id: 0,
 292                    user_id: joined.project.host_user_id.to_proto(),
 293                });
 294                let worktrees = joined
 295                    .project
 296                    .worktrees
 297                    .iter()
 298                    .filter_map(|(id, worktree)| {
 299                        worktree.share.as_ref().map(|share| proto::Worktree {
 300                            id: *id,
 301                            root_name: worktree.root_name.clone(),
 302                            entries: share.entries.values().cloned().collect(),
 303                            diagnostic_summaries: share
 304                                .diagnostic_summaries
 305                                .values()
 306                                .cloned()
 307                                .collect(),
 308                            weak: worktree.weak,
 309                        })
 310                    })
 311                    .collect();
 312                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 313                    if *peer_conn_id != request.sender_id {
 314                        collaborators.push(proto::Collaborator {
 315                            peer_id: peer_conn_id.0,
 316                            replica_id: *peer_replica_id as u32,
 317                            user_id: peer_user_id.to_proto(),
 318                        });
 319                    }
 320                }
 321                let response = proto::JoinProjectResponse {
 322                    worktrees,
 323                    replica_id: joined.replica_id as u32,
 324                    collaborators,
 325                };
 326                let connection_ids = joined.project.connection_ids();
 327                let contact_user_ids = joined.project.authorized_user_ids();
 328                Ok((response, connection_ids, contact_user_ids))
 329            });
 330
 331        match response_data {
 332            Ok((response, connection_ids, contact_user_ids)) => {
 333                broadcast(request.sender_id, connection_ids, |conn_id| {
 334                    self.peer.send(
 335                        conn_id,
 336                        proto::AddProjectCollaborator {
 337                            project_id,
 338                            collaborator: Some(proto::Collaborator {
 339                                peer_id: request.sender_id.0,
 340                                replica_id: response.replica_id,
 341                                user_id: user_id.to_proto(),
 342                            }),
 343                        },
 344                    )
 345                })?;
 346                self.peer.respond(request.receipt(), response)?;
 347                self.update_contacts_for_users(&contact_user_ids)?;
 348            }
 349            Err(error) => {
 350                self.peer.respond_with_error(
 351                    request.receipt(),
 352                    proto::Error {
 353                        message: error.to_string(),
 354                    },
 355                )?;
 356            }
 357        }
 358
 359        Ok(())
 360    }
 361
 362    async fn leave_project(
 363        mut self: Arc<Server>,
 364        request: TypedEnvelope<proto::LeaveProject>,
 365    ) -> tide::Result<()> {
 366        let sender_id = request.sender_id;
 367        let project_id = request.payload.project_id;
 368        let worktree = self.state_mut().leave_project(sender_id, project_id);
 369        if let Some(worktree) = worktree {
 370            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 371                self.peer.send(
 372                    conn_id,
 373                    proto::RemoveProjectCollaborator {
 374                        project_id,
 375                        peer_id: sender_id.0,
 376                    },
 377                )
 378            })?;
 379            self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 380        }
 381        Ok(())
 382    }
 383
 384    async fn register_worktree(
 385        mut self: Arc<Server>,
 386        request: TypedEnvelope<proto::RegisterWorktree>,
 387    ) -> tide::Result<()> {
 388        let receipt = request.receipt();
 389        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 390
 391        let mut contact_user_ids = HashSet::default();
 392        contact_user_ids.insert(host_user_id);
 393        for github_login in request.payload.authorized_logins {
 394            match self.app_state.db.create_user(&github_login, false).await {
 395                Ok(contact_user_id) => {
 396                    contact_user_ids.insert(contact_user_id);
 397                }
 398                Err(err) => {
 399                    let message = err.to_string();
 400                    self.peer
 401                        .respond_with_error(receipt, proto::Error { message })?;
 402                    return Ok(());
 403                }
 404            }
 405        }
 406
 407        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 408        let ok = self.state_mut().register_worktree(
 409            request.payload.project_id,
 410            request.payload.worktree_id,
 411            Worktree {
 412                authorized_user_ids: contact_user_ids.clone(),
 413                root_name: request.payload.root_name,
 414                share: None,
 415                weak: false,
 416            },
 417        );
 418
 419        if ok {
 420            self.peer.respond(receipt, proto::Ack {})?;
 421            self.update_contacts_for_users(&contact_user_ids)?;
 422        } else {
 423            self.peer.respond_with_error(
 424                receipt,
 425                proto::Error {
 426                    message: NO_SUCH_PROJECT.to_string(),
 427                },
 428            )?;
 429        }
 430
 431        Ok(())
 432    }
 433
 434    async fn unregister_worktree(
 435        mut self: Arc<Server>,
 436        request: TypedEnvelope<proto::UnregisterWorktree>,
 437    ) -> tide::Result<()> {
 438        let project_id = request.payload.project_id;
 439        let worktree_id = request.payload.worktree_id;
 440        let (worktree, guest_connection_ids) =
 441            self.state_mut()
 442                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 443        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 444            self.peer.send(
 445                conn_id,
 446                proto::UnregisterWorktree {
 447                    project_id,
 448                    worktree_id,
 449                },
 450            )
 451        })?;
 452        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 453        Ok(())
 454    }
 455
 456    async fn share_worktree(
 457        mut self: Arc<Server>,
 458        mut request: TypedEnvelope<proto::ShareWorktree>,
 459    ) -> tide::Result<()> {
 460        let worktree = request
 461            .payload
 462            .worktree
 463            .as_mut()
 464            .ok_or_else(|| anyhow!("missing worktree"))?;
 465        let entries = worktree
 466            .entries
 467            .iter()
 468            .map(|entry| (entry.id, entry.clone()))
 469            .collect();
 470        let diagnostic_summaries = worktree
 471            .diagnostic_summaries
 472            .iter()
 473            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 474            .collect();
 475
 476        let shared_worktree = self.state_mut().share_worktree(
 477            request.payload.project_id,
 478            worktree.id,
 479            request.sender_id,
 480            entries,
 481            diagnostic_summaries,
 482        );
 483        if let Some(shared_worktree) = shared_worktree {
 484            broadcast(
 485                request.sender_id,
 486                shared_worktree.connection_ids,
 487                |connection_id| {
 488                    self.peer.forward_send(
 489                        request.sender_id,
 490                        connection_id,
 491                        request.payload.clone(),
 492                    )
 493                },
 494            )?;
 495            self.peer.respond(request.receipt(), proto::Ack {})?;
 496            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 497        } else {
 498            self.peer.respond_with_error(
 499                request.receipt(),
 500                proto::Error {
 501                    message: "no such worktree".to_string(),
 502                },
 503            )?;
 504        }
 505        Ok(())
 506    }
 507
 508    async fn update_worktree(
 509        mut self: Arc<Server>,
 510        request: TypedEnvelope<proto::UpdateWorktree>,
 511    ) -> tide::Result<()> {
 512        let connection_ids = self
 513            .state_mut()
 514            .update_worktree(
 515                request.sender_id,
 516                request.payload.project_id,
 517                request.payload.worktree_id,
 518                &request.payload.removed_entries,
 519                &request.payload.updated_entries,
 520            )
 521            .ok_or_else(|| anyhow!("no such worktree"))?;
 522
 523        broadcast(request.sender_id, connection_ids, |connection_id| {
 524            self.peer
 525                .forward_send(request.sender_id, connection_id, request.payload.clone())
 526        })?;
 527
 528        Ok(())
 529    }
 530
 531    async fn update_diagnostic_summary(
 532        mut self: Arc<Server>,
 533        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 534    ) -> tide::Result<()> {
 535        let receiver_ids = request
 536            .payload
 537            .summary
 538            .clone()
 539            .and_then(|summary| {
 540                self.state_mut().update_diagnostic_summary(
 541                    request.payload.project_id,
 542                    request.payload.worktree_id,
 543                    request.sender_id,
 544                    summary,
 545                )
 546            })
 547            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 548
 549        broadcast(request.sender_id, receiver_ids, |connection_id| {
 550            self.peer
 551                .forward_send(request.sender_id, connection_id, request.payload.clone())
 552        })?;
 553        Ok(())
 554    }
 555
 556    async fn disk_based_diagnostics_updating(
 557        self: Arc<Server>,
 558        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 559    ) -> tide::Result<()> {
 560        let receiver_ids = self
 561            .state()
 562            .project_connection_ids(request.payload.project_id, request.sender_id)
 563            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 564        broadcast(request.sender_id, receiver_ids, |connection_id| {
 565            self.peer
 566                .forward_send(request.sender_id, connection_id, request.payload.clone())
 567        })?;
 568        Ok(())
 569    }
 570
 571    async fn disk_based_diagnostics_updated(
 572        self: Arc<Server>,
 573        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 574    ) -> tide::Result<()> {
 575        let receiver_ids = self
 576            .state()
 577            .project_connection_ids(request.payload.project_id, request.sender_id)
 578            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 579        broadcast(request.sender_id, receiver_ids, |connection_id| {
 580            self.peer
 581                .forward_send(request.sender_id, connection_id, request.payload.clone())
 582        })?;
 583        Ok(())
 584    }
 585
 586    async fn get_definition(
 587        self: Arc<Server>,
 588        request: TypedEnvelope<proto::GetDefinition>,
 589    ) -> tide::Result<()> {
 590        let receipt = request.receipt();
 591        let host_connection_id = self
 592            .state()
 593            .read_project(request.payload.project_id, request.sender_id)
 594            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 595            .host_connection_id;
 596        let response = self
 597            .peer
 598            .forward_request(request.sender_id, host_connection_id, request.payload)
 599            .await?;
 600        self.peer.respond(receipt, response)?;
 601        Ok(())
 602    }
 603
 604    async fn open_buffer(
 605        self: Arc<Server>,
 606        request: TypedEnvelope<proto::OpenBuffer>,
 607    ) -> tide::Result<()> {
 608        let receipt = request.receipt();
 609        let host_connection_id = self
 610            .state()
 611            .read_project(request.payload.project_id, request.sender_id)
 612            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 613            .host_connection_id;
 614        let response = self
 615            .peer
 616            .forward_request(request.sender_id, host_connection_id, request.payload)
 617            .await?;
 618        self.peer.respond(receipt, response)?;
 619        Ok(())
 620    }
 621
 622    async fn close_buffer(
 623        self: Arc<Server>,
 624        request: TypedEnvelope<proto::CloseBuffer>,
 625    ) -> tide::Result<()> {
 626        let host_connection_id = self
 627            .state()
 628            .read_project(request.payload.project_id, request.sender_id)
 629            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 630            .host_connection_id;
 631        self.peer
 632            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 633        Ok(())
 634    }
 635
 636    async fn save_buffer(
 637        self: Arc<Server>,
 638        request: TypedEnvelope<proto::SaveBuffer>,
 639    ) -> tide::Result<()> {
 640        let host;
 641        let guests;
 642        {
 643            let state = self.state();
 644            let project = state
 645                .read_project(request.payload.project_id, request.sender_id)
 646                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 647            host = project.host_connection_id;
 648            guests = project.guest_connection_ids()
 649        }
 650
 651        let sender = request.sender_id;
 652        let receipt = request.receipt();
 653        let response = self
 654            .peer
 655            .forward_request(sender, host, request.payload.clone())
 656            .await?;
 657
 658        broadcast(host, guests, |conn_id| {
 659            let response = response.clone();
 660            if conn_id == sender {
 661                self.peer.respond(receipt, response)
 662            } else {
 663                self.peer.forward_send(host, conn_id, response)
 664            }
 665        })?;
 666
 667        Ok(())
 668    }
 669
 670    async fn format_buffer(
 671        self: Arc<Server>,
 672        request: TypedEnvelope<proto::FormatBuffer>,
 673    ) -> tide::Result<()> {
 674        let host;
 675        {
 676            let state = self.state();
 677            let project = state
 678                .read_project(request.payload.project_id, request.sender_id)
 679                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 680            host = project.host_connection_id;
 681        }
 682
 683        let sender = request.sender_id;
 684        let receipt = request.receipt();
 685        let response = self
 686            .peer
 687            .forward_request(sender, host, request.payload.clone())
 688            .await?;
 689        self.peer.respond(receipt, response)?;
 690
 691        Ok(())
 692    }
 693
 694    async fn get_completions(
 695        self: Arc<Server>,
 696        request: TypedEnvelope<proto::GetCompletions>,
 697    ) -> tide::Result<()> {
 698        let host;
 699        {
 700            let state = self.state();
 701            let project = state
 702                .read_project(request.payload.project_id, request.sender_id)
 703                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 704            host = project.host_connection_id;
 705        }
 706
 707        let sender = request.sender_id;
 708        let receipt = request.receipt();
 709        let response = self
 710            .peer
 711            .forward_request(sender, host, request.payload.clone())
 712            .await?;
 713        self.peer.respond(receipt, response)?;
 714        Ok(())
 715    }
 716
 717    async fn apply_additional_edits_for_completion(
 718        self: Arc<Server>,
 719        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 720    ) -> tide::Result<()> {
 721        let host;
 722        {
 723            let state = self.state();
 724            let project = state
 725                .read_project(request.payload.project_id, request.sender_id)
 726                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 727            host = project.host_connection_id;
 728        }
 729
 730        let sender = request.sender_id;
 731        let receipt = request.receipt();
 732        let response = self
 733            .peer
 734            .forward_request(sender, host, request.payload.clone())
 735            .await?;
 736        self.peer.respond(receipt, response)?;
 737        Ok(())
 738    }
 739
 740    async fn update_buffer(
 741        self: Arc<Server>,
 742        request: TypedEnvelope<proto::UpdateBuffer>,
 743    ) -> tide::Result<()> {
 744        let receiver_ids = self
 745            .state()
 746            .project_connection_ids(request.payload.project_id, request.sender_id)
 747            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 748        broadcast(request.sender_id, receiver_ids, |connection_id| {
 749            self.peer
 750                .forward_send(request.sender_id, connection_id, request.payload.clone())
 751        })?;
 752        self.peer.respond(request.receipt(), proto::Ack {})?;
 753        Ok(())
 754    }
 755
 756    async fn update_buffer_file(
 757        self: Arc<Server>,
 758        request: TypedEnvelope<proto::UpdateBufferFile>,
 759    ) -> tide::Result<()> {
 760        let receiver_ids = self
 761            .state()
 762            .project_connection_ids(request.payload.project_id, request.sender_id)
 763            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 764        broadcast(request.sender_id, receiver_ids, |connection_id| {
 765            self.peer
 766                .forward_send(request.sender_id, connection_id, request.payload.clone())
 767        })?;
 768        Ok(())
 769    }
 770
 771    async fn buffer_reloaded(
 772        self: Arc<Server>,
 773        request: TypedEnvelope<proto::BufferReloaded>,
 774    ) -> tide::Result<()> {
 775        let receiver_ids = self
 776            .state()
 777            .project_connection_ids(request.payload.project_id, request.sender_id)
 778            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 779        broadcast(request.sender_id, receiver_ids, |connection_id| {
 780            self.peer
 781                .forward_send(request.sender_id, connection_id, request.payload.clone())
 782        })?;
 783        Ok(())
 784    }
 785
 786    async fn buffer_saved(
 787        self: Arc<Server>,
 788        request: TypedEnvelope<proto::BufferSaved>,
 789    ) -> tide::Result<()> {
 790        let receiver_ids = self
 791            .state()
 792            .project_connection_ids(request.payload.project_id, request.sender_id)
 793            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 794        broadcast(request.sender_id, receiver_ids, |connection_id| {
 795            self.peer
 796                .forward_send(request.sender_id, connection_id, request.payload.clone())
 797        })?;
 798        Ok(())
 799    }
 800
 801    async fn get_channels(
 802        self: Arc<Server>,
 803        request: TypedEnvelope<proto::GetChannels>,
 804    ) -> tide::Result<()> {
 805        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 806        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 807        self.peer.respond(
 808            request.receipt(),
 809            proto::GetChannelsResponse {
 810                channels: channels
 811                    .into_iter()
 812                    .map(|chan| proto::Channel {
 813                        id: chan.id.to_proto(),
 814                        name: chan.name,
 815                    })
 816                    .collect(),
 817            },
 818        )?;
 819        Ok(())
 820    }
 821
 822    async fn get_users(
 823        self: Arc<Server>,
 824        request: TypedEnvelope<proto::GetUsers>,
 825    ) -> tide::Result<()> {
 826        let receipt = request.receipt();
 827        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 828        let users = self
 829            .app_state
 830            .db
 831            .get_users_by_ids(user_ids)
 832            .await?
 833            .into_iter()
 834            .map(|user| proto::User {
 835                id: user.id.to_proto(),
 836                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 837                github_login: user.github_login,
 838            })
 839            .collect();
 840        self.peer
 841            .respond(receipt, proto::GetUsersResponse { users })?;
 842        Ok(())
 843    }
 844
 845    fn update_contacts_for_users<'a>(
 846        self: &Arc<Server>,
 847        user_ids: impl IntoIterator<Item = &'a UserId>,
 848    ) -> anyhow::Result<()> {
 849        let mut result = Ok(());
 850        let state = self.state();
 851        for user_id in user_ids {
 852            let contacts = state.contacts_for_user(*user_id);
 853            for connection_id in state.connection_ids_for_user(*user_id) {
 854                if let Err(error) = self.peer.send(
 855                    connection_id,
 856                    proto::UpdateContacts {
 857                        contacts: contacts.clone(),
 858                    },
 859                ) {
 860                    result = Err(error);
 861                }
 862            }
 863        }
 864        result
 865    }
 866
 867    async fn join_channel(
 868        mut self: Arc<Self>,
 869        request: TypedEnvelope<proto::JoinChannel>,
 870    ) -> tide::Result<()> {
 871        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 872        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 873        if !self
 874            .app_state
 875            .db
 876            .can_user_access_channel(user_id, channel_id)
 877            .await?
 878        {
 879            Err(anyhow!("access denied"))?;
 880        }
 881
 882        self.state_mut().join_channel(request.sender_id, channel_id);
 883        let messages = self
 884            .app_state
 885            .db
 886            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 887            .await?
 888            .into_iter()
 889            .map(|msg| proto::ChannelMessage {
 890                id: msg.id.to_proto(),
 891                body: msg.body,
 892                timestamp: msg.sent_at.unix_timestamp() as u64,
 893                sender_id: msg.sender_id.to_proto(),
 894                nonce: Some(msg.nonce.as_u128().into()),
 895            })
 896            .collect::<Vec<_>>();
 897        self.peer.respond(
 898            request.receipt(),
 899            proto::JoinChannelResponse {
 900                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 901                messages,
 902            },
 903        )?;
 904        Ok(())
 905    }
 906
 907    async fn leave_channel(
 908        mut self: Arc<Self>,
 909        request: TypedEnvelope<proto::LeaveChannel>,
 910    ) -> tide::Result<()> {
 911        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 912        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 913        if !self
 914            .app_state
 915            .db
 916            .can_user_access_channel(user_id, channel_id)
 917            .await?
 918        {
 919            Err(anyhow!("access denied"))?;
 920        }
 921
 922        self.state_mut()
 923            .leave_channel(request.sender_id, channel_id);
 924
 925        Ok(())
 926    }
 927
 928    async fn send_channel_message(
 929        self: Arc<Self>,
 930        request: TypedEnvelope<proto::SendChannelMessage>,
 931    ) -> tide::Result<()> {
 932        let receipt = request.receipt();
 933        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 934        let user_id;
 935        let connection_ids;
 936        {
 937            let state = self.state();
 938            user_id = state.user_id_for_connection(request.sender_id)?;
 939            if let Some(ids) = state.channel_connection_ids(channel_id) {
 940                connection_ids = ids;
 941            } else {
 942                return Ok(());
 943            }
 944        }
 945
 946        // Validate the message body.
 947        let body = request.payload.body.trim().to_string();
 948        if body.len() > MAX_MESSAGE_LEN {
 949            self.peer.respond_with_error(
 950                receipt,
 951                proto::Error {
 952                    message: "message is too long".to_string(),
 953                },
 954            )?;
 955            return Ok(());
 956        }
 957        if body.is_empty() {
 958            self.peer.respond_with_error(
 959                receipt,
 960                proto::Error {
 961                    message: "message can't be blank".to_string(),
 962                },
 963            )?;
 964            return Ok(());
 965        }
 966
 967        let timestamp = OffsetDateTime::now_utc();
 968        let nonce = if let Some(nonce) = request.payload.nonce {
 969            nonce
 970        } else {
 971            self.peer.respond_with_error(
 972                receipt,
 973                proto::Error {
 974                    message: "nonce can't be blank".to_string(),
 975                },
 976            )?;
 977            return Ok(());
 978        };
 979
 980        let message_id = self
 981            .app_state
 982            .db
 983            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 984            .await?
 985            .to_proto();
 986        let message = proto::ChannelMessage {
 987            sender_id: user_id.to_proto(),
 988            id: message_id,
 989            body,
 990            timestamp: timestamp.unix_timestamp() as u64,
 991            nonce: Some(nonce),
 992        };
 993        broadcast(request.sender_id, connection_ids, |conn_id| {
 994            self.peer.send(
 995                conn_id,
 996                proto::ChannelMessageSent {
 997                    channel_id: channel_id.to_proto(),
 998                    message: Some(message.clone()),
 999                },
1000            )
1001        })?;
1002        self.peer.respond(
1003            receipt,
1004            proto::SendChannelMessageResponse {
1005                message: Some(message),
1006            },
1007        )?;
1008        Ok(())
1009    }
1010
1011    async fn get_channel_messages(
1012        self: Arc<Self>,
1013        request: TypedEnvelope<proto::GetChannelMessages>,
1014    ) -> tide::Result<()> {
1015        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1016        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1017        if !self
1018            .app_state
1019            .db
1020            .can_user_access_channel(user_id, channel_id)
1021            .await?
1022        {
1023            Err(anyhow!("access denied"))?;
1024        }
1025
1026        let messages = self
1027            .app_state
1028            .db
1029            .get_channel_messages(
1030                channel_id,
1031                MESSAGE_COUNT_PER_PAGE,
1032                Some(MessageId::from_proto(request.payload.before_message_id)),
1033            )
1034            .await?
1035            .into_iter()
1036            .map(|msg| proto::ChannelMessage {
1037                id: msg.id.to_proto(),
1038                body: msg.body,
1039                timestamp: msg.sent_at.unix_timestamp() as u64,
1040                sender_id: msg.sender_id.to_proto(),
1041                nonce: Some(msg.nonce.as_u128().into()),
1042            })
1043            .collect::<Vec<_>>();
1044        self.peer.respond(
1045            request.receipt(),
1046            proto::GetChannelMessagesResponse {
1047                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1048                messages,
1049            },
1050        )?;
1051        Ok(())
1052    }
1053
1054    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1055        self.store.read()
1056    }
1057
1058    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1059        self.store.write()
1060    }
1061}
1062
1063fn broadcast<F>(
1064    sender_id: ConnectionId,
1065    receiver_ids: Vec<ConnectionId>,
1066    mut f: F,
1067) -> anyhow::Result<()>
1068where
1069    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1070{
1071    let mut result = Ok(());
1072    for receiver_id in receiver_ids {
1073        if receiver_id != sender_id {
1074            if let Err(error) = f(receiver_id) {
1075                if result.is_ok() {
1076                    result = Err(error);
1077                }
1078            }
1079        }
1080    }
1081    result
1082}
1083
1084pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1085    let server = Server::new(app.state().clone(), rpc.clone(), None);
1086    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1087        let server = server.clone();
1088        async move {
1089            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1090
1091            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1092            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1093            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1094            let client_protocol_version: Option<u32> = request
1095                .header("X-Zed-Protocol-Version")
1096                .and_then(|v| v.as_str().parse().ok());
1097
1098            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1099                return Ok(Response::new(StatusCode::UpgradeRequired));
1100            }
1101
1102            let header = match request.header("Sec-Websocket-Key") {
1103                Some(h) => h.as_str(),
1104                None => return Err(anyhow!("expected sec-websocket-key"))?,
1105            };
1106
1107            let user_id = process_auth_header(&request).await?;
1108
1109            let mut response = Response::new(StatusCode::SwitchingProtocols);
1110            response.insert_header(UPGRADE, "websocket");
1111            response.insert_header(CONNECTION, "Upgrade");
1112            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1113            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1114            response.insert_header("Sec-Websocket-Version", "13");
1115
1116            let http_res: &mut tide::http::Response = response.as_mut();
1117            let upgrade_receiver = http_res.recv_upgrade().await;
1118            let addr = request.remote().unwrap_or("unknown").to_string();
1119            task::spawn(async move {
1120                if let Some(stream) = upgrade_receiver.await {
1121                    server
1122                        .handle_connection(
1123                            Connection::new(
1124                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1125                            ),
1126                            addr,
1127                            user_id,
1128                            None,
1129                        )
1130                        .await;
1131                }
1132            });
1133
1134            Ok(response)
1135        }
1136    });
1137}
1138
1139fn header_contains_ignore_case<T>(
1140    request: &tide::Request<T>,
1141    header_name: HeaderName,
1142    value: &str,
1143) -> bool {
1144    request
1145        .header(header_name)
1146        .map(|h| {
1147            h.as_str()
1148                .split(',')
1149                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1150        })
1151        .unwrap_or(false)
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156    use super::*;
1157    use crate::{
1158        auth,
1159        db::{tests::TestDb, UserId},
1160        github, AppState, Config,
1161    };
1162    use ::rpc::Peer;
1163    use async_std::task;
1164    use gpui::{executor, ModelHandle, TestAppContext};
1165    use parking_lot::Mutex;
1166    use postage::{mpsc, watch};
1167    use rand::prelude::*;
1168    use rpc::PeerId;
1169    use serde_json::json;
1170    use sqlx::types::time::OffsetDateTime;
1171    use std::{
1172        ops::Deref,
1173        path::Path,
1174        rc::Rc,
1175        sync::{
1176            atomic::{AtomicBool, Ordering::SeqCst},
1177            Arc,
1178        },
1179        time::Duration,
1180    };
1181    use zed::{
1182        client::{
1183            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1184            EstablishConnectionError, UserStore,
1185        },
1186        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1187        fs::{FakeFs, Fs as _},
1188        language::{
1189            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1190            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1191        },
1192        lsp,
1193        project::{DiagnosticSummary, Project, ProjectPath},
1194    };
1195
1196    #[cfg(test)]
1197    #[ctor::ctor]
1198    fn init_logger() {
1199        // std::env::set_var("RUST_LOG", "info");
1200        env_logger::init();
1201    }
1202
1203    #[gpui::test]
1204    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1205        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1206        let lang_registry = Arc::new(LanguageRegistry::new());
1207        let fs = Arc::new(FakeFs::new(cx_a.background()));
1208        cx_a.foreground().forbid_parking();
1209
1210        // Connect to a server as 2 clients.
1211        let mut server = TestServer::start(cx_a.foreground()).await;
1212        let client_a = server.create_client(&mut cx_a, "user_a").await;
1213        let client_b = server.create_client(&mut cx_b, "user_b").await;
1214
1215        // Share a project as client A
1216        fs.insert_tree(
1217            "/a",
1218            json!({
1219                ".zed.toml": r#"collaborators = ["user_b"]"#,
1220                "a.txt": "a-contents",
1221                "b.txt": "b-contents",
1222            }),
1223        )
1224        .await;
1225        let project_a = cx_a.update(|cx| {
1226            Project::local(
1227                client_a.clone(),
1228                client_a.user_store.clone(),
1229                lang_registry.clone(),
1230                fs.clone(),
1231                cx,
1232            )
1233        });
1234        let (worktree_a, _) = project_a
1235            .update(&mut cx_a, |p, cx| {
1236                p.find_or_create_local_worktree("/a", false, cx)
1237            })
1238            .await
1239            .unwrap();
1240        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1241        worktree_a
1242            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1243            .await;
1244        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1245        project_a
1246            .update(&mut cx_a, |p, cx| p.share(cx))
1247            .await
1248            .unwrap();
1249
1250        // Join that project as client B
1251        let project_b = Project::remote(
1252            project_id,
1253            client_b.clone(),
1254            client_b.user_store.clone(),
1255            lang_registry.clone(),
1256            fs.clone(),
1257            &mut cx_b.to_async(),
1258        )
1259        .await
1260        .unwrap();
1261
1262        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1263            assert_eq!(
1264                project
1265                    .collaborators()
1266                    .get(&client_a.peer_id)
1267                    .unwrap()
1268                    .user
1269                    .github_login,
1270                "user_a"
1271            );
1272            project.replica_id()
1273        });
1274        project_a
1275            .condition(&cx_a, |tree, _| {
1276                tree.collaborators()
1277                    .get(&client_b.peer_id)
1278                    .map_or(false, |collaborator| {
1279                        collaborator.replica_id == replica_id_b
1280                            && collaborator.user.github_login == "user_b"
1281                    })
1282            })
1283            .await;
1284
1285        // Open the same file as client B and client A.
1286        let buffer_b = project_b
1287            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1288            .await
1289            .unwrap();
1290        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1291        buffer_b.read_with(&cx_b, |buf, cx| {
1292            assert_eq!(buf.read(cx).text(), "b-contents")
1293        });
1294        project_a.read_with(&cx_a, |project, cx| {
1295            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1296        });
1297        let buffer_a = project_a
1298            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1299            .await
1300            .unwrap();
1301
1302        let editor_b = cx_b.add_view(window_b, |cx| {
1303            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1304        });
1305
1306        // TODO
1307        // // Create a selection set as client B and see that selection set as client A.
1308        // buffer_a
1309        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1310        //     .await;
1311
1312        // Edit the buffer as client B and see that edit as client A.
1313        editor_b.update(&mut cx_b, |editor, cx| {
1314            editor.handle_input(&Input("ok, ".into()), cx)
1315        });
1316        buffer_a
1317            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1318            .await;
1319
1320        // TODO
1321        // // Remove the selection set as client B, see those selections disappear as client A.
1322        cx_b.update(move |_| drop(editor_b));
1323        // buffer_a
1324        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1325        //     .await;
1326
1327        // Close the buffer as client A, see that the buffer is closed.
1328        cx_a.update(move |_| drop(buffer_a));
1329        project_a
1330            .condition(&cx_a, |project, cx| {
1331                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1332            })
1333            .await;
1334
1335        // Dropping the client B's project removes client B from client A's collaborators.
1336        cx_b.update(move |_| drop(project_b));
1337        project_a
1338            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1339            .await;
1340    }
1341
1342    #[gpui::test]
1343    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1344        let lang_registry = Arc::new(LanguageRegistry::new());
1345        let fs = Arc::new(FakeFs::new(cx_a.background()));
1346        cx_a.foreground().forbid_parking();
1347
1348        // Connect to a server as 2 clients.
1349        let mut server = TestServer::start(cx_a.foreground()).await;
1350        let client_a = server.create_client(&mut cx_a, "user_a").await;
1351        let client_b = server.create_client(&mut cx_b, "user_b").await;
1352
1353        // Share a project as client A
1354        fs.insert_tree(
1355            "/a",
1356            json!({
1357                ".zed.toml": r#"collaborators = ["user_b"]"#,
1358                "a.txt": "a-contents",
1359                "b.txt": "b-contents",
1360            }),
1361        )
1362        .await;
1363        let project_a = cx_a.update(|cx| {
1364            Project::local(
1365                client_a.clone(),
1366                client_a.user_store.clone(),
1367                lang_registry.clone(),
1368                fs.clone(),
1369                cx,
1370            )
1371        });
1372        let (worktree_a, _) = project_a
1373            .update(&mut cx_a, |p, cx| {
1374                p.find_or_create_local_worktree("/a", false, cx)
1375            })
1376            .await
1377            .unwrap();
1378        worktree_a
1379            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1380            .await;
1381        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1382        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1383        project_a
1384            .update(&mut cx_a, |p, cx| p.share(cx))
1385            .await
1386            .unwrap();
1387        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1388
1389        // Join that project as client B
1390        let project_b = Project::remote(
1391            project_id,
1392            client_b.clone(),
1393            client_b.user_store.clone(),
1394            lang_registry.clone(),
1395            fs.clone(),
1396            &mut cx_b.to_async(),
1397        )
1398        .await
1399        .unwrap();
1400        project_b
1401            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1402            .await
1403            .unwrap();
1404
1405        // Unshare the project as client A
1406        project_a
1407            .update(&mut cx_a, |project, cx| project.unshare(cx))
1408            .await
1409            .unwrap();
1410        project_b
1411            .condition(&mut cx_b, |project, _| project.is_read_only())
1412            .await;
1413        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1414        drop(project_b);
1415
1416        // Share the project again and ensure guests can still join.
1417        project_a
1418            .update(&mut cx_a, |project, cx| project.share(cx))
1419            .await
1420            .unwrap();
1421        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1422
1423        let project_c = Project::remote(
1424            project_id,
1425            client_b.clone(),
1426            client_b.user_store.clone(),
1427            lang_registry.clone(),
1428            fs.clone(),
1429            &mut cx_b.to_async(),
1430        )
1431        .await
1432        .unwrap();
1433        project_c
1434            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1435            .await
1436            .unwrap();
1437    }
1438
1439    #[gpui::test]
1440    async fn test_propagate_saves_and_fs_changes(
1441        mut cx_a: TestAppContext,
1442        mut cx_b: TestAppContext,
1443        mut cx_c: TestAppContext,
1444    ) {
1445        let lang_registry = Arc::new(LanguageRegistry::new());
1446        let fs = Arc::new(FakeFs::new(cx_a.background()));
1447        cx_a.foreground().forbid_parking();
1448
1449        // Connect to a server as 3 clients.
1450        let mut server = TestServer::start(cx_a.foreground()).await;
1451        let client_a = server.create_client(&mut cx_a, "user_a").await;
1452        let client_b = server.create_client(&mut cx_b, "user_b").await;
1453        let client_c = server.create_client(&mut cx_c, "user_c").await;
1454
1455        // Share a worktree as client A.
1456        fs.insert_tree(
1457            "/a",
1458            json!({
1459                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1460                "file1": "",
1461                "file2": ""
1462            }),
1463        )
1464        .await;
1465        let project_a = cx_a.update(|cx| {
1466            Project::local(
1467                client_a.clone(),
1468                client_a.user_store.clone(),
1469                lang_registry.clone(),
1470                fs.clone(),
1471                cx,
1472            )
1473        });
1474        let (worktree_a, _) = project_a
1475            .update(&mut cx_a, |p, cx| {
1476                p.find_or_create_local_worktree("/a", false, cx)
1477            })
1478            .await
1479            .unwrap();
1480        worktree_a
1481            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1482            .await;
1483        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1484        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1485        project_a
1486            .update(&mut cx_a, |p, cx| p.share(cx))
1487            .await
1488            .unwrap();
1489
1490        // Join that worktree as clients B and C.
1491        let project_b = Project::remote(
1492            project_id,
1493            client_b.clone(),
1494            client_b.user_store.clone(),
1495            lang_registry.clone(),
1496            fs.clone(),
1497            &mut cx_b.to_async(),
1498        )
1499        .await
1500        .unwrap();
1501        let project_c = Project::remote(
1502            project_id,
1503            client_c.clone(),
1504            client_c.user_store.clone(),
1505            lang_registry.clone(),
1506            fs.clone(),
1507            &mut cx_c.to_async(),
1508        )
1509        .await
1510        .unwrap();
1511        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1512        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1513
1514        // Open and edit a buffer as both guests B and C.
1515        let buffer_b = project_b
1516            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1517            .await
1518            .unwrap();
1519        let buffer_c = project_c
1520            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1521            .await
1522            .unwrap();
1523        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1524        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1525
1526        // Open and edit that buffer as the host.
1527        let buffer_a = project_a
1528            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1529            .await
1530            .unwrap();
1531
1532        buffer_a
1533            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1534            .await;
1535        buffer_a.update(&mut cx_a, |buf, cx| {
1536            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1537        });
1538
1539        // Wait for edits to propagate
1540        buffer_a
1541            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1542            .await;
1543        buffer_b
1544            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1545            .await;
1546        buffer_c
1547            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1548            .await;
1549
1550        // Edit the buffer as the host and concurrently save as guest B.
1551        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1552        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1553        save_b.await.unwrap();
1554        assert_eq!(
1555            fs.load("/a/file1".as_ref()).await.unwrap(),
1556            "hi-a, i-am-c, i-am-b, i-am-a"
1557        );
1558        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1559        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1560        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1561
1562        // Make changes on host's file system, see those changes on guest worktrees.
1563        fs.rename(
1564            "/a/file1".as_ref(),
1565            "/a/file1-renamed".as_ref(),
1566            Default::default(),
1567        )
1568        .await
1569        .unwrap();
1570        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1571            .await
1572            .unwrap();
1573        fs.insert_file(Path::new("/a/file4"), "4".into())
1574            .await
1575            .unwrap();
1576
1577        worktree_a
1578            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1579            .await;
1580        worktree_b
1581            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1582            .await;
1583        worktree_c
1584            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1585            .await;
1586        worktree_a.read_with(&cx_a, |tree, _| {
1587            assert_eq!(
1588                tree.paths()
1589                    .map(|p| p.to_string_lossy())
1590                    .collect::<Vec<_>>(),
1591                &[".zed.toml", "file1-renamed", "file3", "file4"]
1592            )
1593        });
1594        worktree_b.read_with(&cx_b, |tree, _| {
1595            assert_eq!(
1596                tree.paths()
1597                    .map(|p| p.to_string_lossy())
1598                    .collect::<Vec<_>>(),
1599                &[".zed.toml", "file1-renamed", "file3", "file4"]
1600            )
1601        });
1602        worktree_c.read_with(&cx_c, |tree, _| {
1603            assert_eq!(
1604                tree.paths()
1605                    .map(|p| p.to_string_lossy())
1606                    .collect::<Vec<_>>(),
1607                &[".zed.toml", "file1-renamed", "file3", "file4"]
1608            )
1609        });
1610
1611        // Ensure buffer files are updated as well.
1612        buffer_a
1613            .condition(&cx_a, |buf, _| {
1614                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1615            })
1616            .await;
1617        buffer_b
1618            .condition(&cx_b, |buf, _| {
1619                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1620            })
1621            .await;
1622        buffer_c
1623            .condition(&cx_c, |buf, _| {
1624                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1625            })
1626            .await;
1627    }
1628
1629    #[gpui::test]
1630    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1631        cx_a.foreground().forbid_parking();
1632        let lang_registry = Arc::new(LanguageRegistry::new());
1633        let fs = Arc::new(FakeFs::new(cx_a.background()));
1634
1635        // Connect to a server as 2 clients.
1636        let mut server = TestServer::start(cx_a.foreground()).await;
1637        let client_a = server.create_client(&mut cx_a, "user_a").await;
1638        let client_b = server.create_client(&mut cx_b, "user_b").await;
1639
1640        // Share a project as client A
1641        fs.insert_tree(
1642            "/dir",
1643            json!({
1644                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1645                "a.txt": "a-contents",
1646            }),
1647        )
1648        .await;
1649
1650        let project_a = cx_a.update(|cx| {
1651            Project::local(
1652                client_a.clone(),
1653                client_a.user_store.clone(),
1654                lang_registry.clone(),
1655                fs.clone(),
1656                cx,
1657            )
1658        });
1659        let (worktree_a, _) = project_a
1660            .update(&mut cx_a, |p, cx| {
1661                p.find_or_create_local_worktree("/dir", false, cx)
1662            })
1663            .await
1664            .unwrap();
1665        worktree_a
1666            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1667            .await;
1668        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1669        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1670        project_a
1671            .update(&mut cx_a, |p, cx| p.share(cx))
1672            .await
1673            .unwrap();
1674
1675        // Join that project as client B
1676        let project_b = Project::remote(
1677            project_id,
1678            client_b.clone(),
1679            client_b.user_store.clone(),
1680            lang_registry.clone(),
1681            fs.clone(),
1682            &mut cx_b.to_async(),
1683        )
1684        .await
1685        .unwrap();
1686        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1687
1688        // Open a buffer as client B
1689        let buffer_b = project_b
1690            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1691            .await
1692            .unwrap();
1693        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1694
1695        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1696        buffer_b.read_with(&cx_b, |buf, _| {
1697            assert!(buf.is_dirty());
1698            assert!(!buf.has_conflict());
1699        });
1700
1701        buffer_b
1702            .update(&mut cx_b, |buf, cx| buf.save(cx))
1703            .await
1704            .unwrap();
1705        worktree_b
1706            .condition(&cx_b, |_, cx| {
1707                buffer_b.read(cx).file().unwrap().mtime() != mtime
1708            })
1709            .await;
1710        buffer_b.read_with(&cx_b, |buf, _| {
1711            assert!(!buf.is_dirty());
1712            assert!(!buf.has_conflict());
1713        });
1714
1715        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1716        buffer_b.read_with(&cx_b, |buf, _| {
1717            assert!(buf.is_dirty());
1718            assert!(!buf.has_conflict());
1719        });
1720    }
1721
1722    #[gpui::test]
1723    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1724        cx_a.foreground().forbid_parking();
1725        let lang_registry = Arc::new(LanguageRegistry::new());
1726        let fs = Arc::new(FakeFs::new(cx_a.background()));
1727
1728        // Connect to a server as 2 clients.
1729        let mut server = TestServer::start(cx_a.foreground()).await;
1730        let client_a = server.create_client(&mut cx_a, "user_a").await;
1731        let client_b = server.create_client(&mut cx_b, "user_b").await;
1732
1733        // Share a project as client A
1734        fs.insert_tree(
1735            "/dir",
1736            json!({
1737                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1738                "a.txt": "a-contents",
1739            }),
1740        )
1741        .await;
1742
1743        let project_a = cx_a.update(|cx| {
1744            Project::local(
1745                client_a.clone(),
1746                client_a.user_store.clone(),
1747                lang_registry.clone(),
1748                fs.clone(),
1749                cx,
1750            )
1751        });
1752        let (worktree_a, _) = project_a
1753            .update(&mut cx_a, |p, cx| {
1754                p.find_or_create_local_worktree("/dir", false, cx)
1755            })
1756            .await
1757            .unwrap();
1758        worktree_a
1759            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1760            .await;
1761        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1762        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1763        project_a
1764            .update(&mut cx_a, |p, cx| p.share(cx))
1765            .await
1766            .unwrap();
1767
1768        // Join that project as client B
1769        let project_b = Project::remote(
1770            project_id,
1771            client_b.clone(),
1772            client_b.user_store.clone(),
1773            lang_registry.clone(),
1774            fs.clone(),
1775            &mut cx_b.to_async(),
1776        )
1777        .await
1778        .unwrap();
1779        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1780
1781        // Open a buffer as client B
1782        let buffer_b = project_b
1783            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1784            .await
1785            .unwrap();
1786        buffer_b.read_with(&cx_b, |buf, _| {
1787            assert!(!buf.is_dirty());
1788            assert!(!buf.has_conflict());
1789        });
1790
1791        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1792            .await
1793            .unwrap();
1794        buffer_b
1795            .condition(&cx_b, |buf, _| {
1796                buf.text() == "new contents" && !buf.is_dirty()
1797            })
1798            .await;
1799        buffer_b.read_with(&cx_b, |buf, _| {
1800            assert!(!buf.has_conflict());
1801        });
1802    }
1803
1804    #[gpui::test(iterations = 100)]
1805    async fn test_editing_while_guest_opens_buffer(
1806        mut cx_a: TestAppContext,
1807        mut cx_b: TestAppContext,
1808    ) {
1809        cx_a.foreground().forbid_parking();
1810        let lang_registry = Arc::new(LanguageRegistry::new());
1811        let fs = Arc::new(FakeFs::new(cx_a.background()));
1812
1813        // Connect to a server as 2 clients.
1814        let mut server = TestServer::start(cx_a.foreground()).await;
1815        let client_a = server.create_client(&mut cx_a, "user_a").await;
1816        let client_b = server.create_client(&mut cx_b, "user_b").await;
1817
1818        // Share a project as client A
1819        fs.insert_tree(
1820            "/dir",
1821            json!({
1822                ".zed.toml": r#"collaborators = ["user_b"]"#,
1823                "a.txt": "a-contents",
1824            }),
1825        )
1826        .await;
1827        let project_a = cx_a.update(|cx| {
1828            Project::local(
1829                client_a.clone(),
1830                client_a.user_store.clone(),
1831                lang_registry.clone(),
1832                fs.clone(),
1833                cx,
1834            )
1835        });
1836        let (worktree_a, _) = project_a
1837            .update(&mut cx_a, |p, cx| {
1838                p.find_or_create_local_worktree("/dir", false, cx)
1839            })
1840            .await
1841            .unwrap();
1842        worktree_a
1843            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1844            .await;
1845        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1846        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1847        project_a
1848            .update(&mut cx_a, |p, cx| p.share(cx))
1849            .await
1850            .unwrap();
1851
1852        // Join that project as client B
1853        let project_b = Project::remote(
1854            project_id,
1855            client_b.clone(),
1856            client_b.user_store.clone(),
1857            lang_registry.clone(),
1858            fs.clone(),
1859            &mut cx_b.to_async(),
1860        )
1861        .await
1862        .unwrap();
1863
1864        // Open a buffer as client A
1865        let buffer_a = project_a
1866            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1867            .await
1868            .unwrap();
1869
1870        // Start opening the same buffer as client B
1871        let buffer_b = cx_b
1872            .background()
1873            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1874        task::yield_now().await;
1875
1876        // Edit the buffer as client A while client B is still opening it.
1877        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1878
1879        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1880        let buffer_b = buffer_b.await.unwrap();
1881        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1882    }
1883
1884    #[gpui::test]
1885    async fn test_leaving_worktree_while_opening_buffer(
1886        mut cx_a: TestAppContext,
1887        mut cx_b: TestAppContext,
1888    ) {
1889        cx_a.foreground().forbid_parking();
1890        let lang_registry = Arc::new(LanguageRegistry::new());
1891        let fs = Arc::new(FakeFs::new(cx_a.background()));
1892
1893        // Connect to a server as 2 clients.
1894        let mut server = TestServer::start(cx_a.foreground()).await;
1895        let client_a = server.create_client(&mut cx_a, "user_a").await;
1896        let client_b = server.create_client(&mut cx_b, "user_b").await;
1897
1898        // Share a project as client A
1899        fs.insert_tree(
1900            "/dir",
1901            json!({
1902                ".zed.toml": r#"collaborators = ["user_b"]"#,
1903                "a.txt": "a-contents",
1904            }),
1905        )
1906        .await;
1907        let project_a = cx_a.update(|cx| {
1908            Project::local(
1909                client_a.clone(),
1910                client_a.user_store.clone(),
1911                lang_registry.clone(),
1912                fs.clone(),
1913                cx,
1914            )
1915        });
1916        let (worktree_a, _) = project_a
1917            .update(&mut cx_a, |p, cx| {
1918                p.find_or_create_local_worktree("/dir", false, cx)
1919            })
1920            .await
1921            .unwrap();
1922        worktree_a
1923            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1924            .await;
1925        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1926        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1927        project_a
1928            .update(&mut cx_a, |p, cx| p.share(cx))
1929            .await
1930            .unwrap();
1931
1932        // Join that project as client B
1933        let project_b = Project::remote(
1934            project_id,
1935            client_b.clone(),
1936            client_b.user_store.clone(),
1937            lang_registry.clone(),
1938            fs.clone(),
1939            &mut cx_b.to_async(),
1940        )
1941        .await
1942        .unwrap();
1943
1944        // See that a guest has joined as client A.
1945        project_a
1946            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1947            .await;
1948
1949        // Begin opening a buffer as client B, but leave the project before the open completes.
1950        let buffer_b = cx_b
1951            .background()
1952            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1953        cx_b.update(|_| drop(project_b));
1954        drop(buffer_b);
1955
1956        // See that the guest has left.
1957        project_a
1958            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1959            .await;
1960    }
1961
1962    #[gpui::test]
1963    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1964        cx_a.foreground().forbid_parking();
1965        let lang_registry = Arc::new(LanguageRegistry::new());
1966        let fs = Arc::new(FakeFs::new(cx_a.background()));
1967
1968        // Connect to a server as 2 clients.
1969        let mut server = TestServer::start(cx_a.foreground()).await;
1970        let client_a = server.create_client(&mut cx_a, "user_a").await;
1971        let client_b = server.create_client(&mut cx_b, "user_b").await;
1972
1973        // Share a project as client A
1974        fs.insert_tree(
1975            "/a",
1976            json!({
1977                ".zed.toml": r#"collaborators = ["user_b"]"#,
1978                "a.txt": "a-contents",
1979                "b.txt": "b-contents",
1980            }),
1981        )
1982        .await;
1983        let project_a = cx_a.update(|cx| {
1984            Project::local(
1985                client_a.clone(),
1986                client_a.user_store.clone(),
1987                lang_registry.clone(),
1988                fs.clone(),
1989                cx,
1990            )
1991        });
1992        let (worktree_a, _) = project_a
1993            .update(&mut cx_a, |p, cx| {
1994                p.find_or_create_local_worktree("/a", false, cx)
1995            })
1996            .await
1997            .unwrap();
1998        worktree_a
1999            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2000            .await;
2001        let project_id = project_a
2002            .update(&mut cx_a, |project, _| project.next_remote_id())
2003            .await;
2004        project_a
2005            .update(&mut cx_a, |project, cx| project.share(cx))
2006            .await
2007            .unwrap();
2008
2009        // Join that project as client B
2010        let _project_b = Project::remote(
2011            project_id,
2012            client_b.clone(),
2013            client_b.user_store.clone(),
2014            lang_registry.clone(),
2015            fs.clone(),
2016            &mut cx_b.to_async(),
2017        )
2018        .await
2019        .unwrap();
2020
2021        // See that a guest has joined as client A.
2022        project_a
2023            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2024            .await;
2025
2026        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2027        client_b.disconnect(&cx_b.to_async()).unwrap();
2028        project_a
2029            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2030            .await;
2031    }
2032
2033    #[gpui::test]
2034    async fn test_collaborating_with_diagnostics(
2035        mut cx_a: TestAppContext,
2036        mut cx_b: TestAppContext,
2037    ) {
2038        cx_a.foreground().forbid_parking();
2039        let mut lang_registry = Arc::new(LanguageRegistry::new());
2040        let fs = Arc::new(FakeFs::new(cx_a.background()));
2041
2042        // Set up a fake language server.
2043        let (language_server_config, mut fake_language_server) =
2044            LanguageServerConfig::fake(cx_a.background()).await;
2045        Arc::get_mut(&mut lang_registry)
2046            .unwrap()
2047            .add(Arc::new(Language::new(
2048                LanguageConfig {
2049                    name: "Rust".to_string(),
2050                    path_suffixes: vec!["rs".to_string()],
2051                    language_server: Some(language_server_config),
2052                    ..Default::default()
2053                },
2054                Some(tree_sitter_rust::language()),
2055            )));
2056
2057        // Connect to a server as 2 clients.
2058        let mut server = TestServer::start(cx_a.foreground()).await;
2059        let client_a = server.create_client(&mut cx_a, "user_a").await;
2060        let client_b = server.create_client(&mut cx_b, "user_b").await;
2061
2062        // Share a project as client A
2063        fs.insert_tree(
2064            "/a",
2065            json!({
2066                ".zed.toml": r#"collaborators = ["user_b"]"#,
2067                "a.rs": "let one = two",
2068                "other.rs": "",
2069            }),
2070        )
2071        .await;
2072        let project_a = cx_a.update(|cx| {
2073            Project::local(
2074                client_a.clone(),
2075                client_a.user_store.clone(),
2076                lang_registry.clone(),
2077                fs.clone(),
2078                cx,
2079            )
2080        });
2081        let (worktree_a, _) = project_a
2082            .update(&mut cx_a, |p, cx| {
2083                p.find_or_create_local_worktree("/a", false, cx)
2084            })
2085            .await
2086            .unwrap();
2087        worktree_a
2088            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2089            .await;
2090        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2091        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2092        project_a
2093            .update(&mut cx_a, |p, cx| p.share(cx))
2094            .await
2095            .unwrap();
2096
2097        // Cause the language server to start.
2098        let _ = cx_a
2099            .background()
2100            .spawn(project_a.update(&mut cx_a, |project, cx| {
2101                project.open_buffer(
2102                    ProjectPath {
2103                        worktree_id,
2104                        path: Path::new("other.rs").into(),
2105                    },
2106                    cx,
2107                )
2108            }))
2109            .await
2110            .unwrap();
2111
2112        // Simulate a language server reporting errors for a file.
2113        fake_language_server
2114            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2115                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2116                version: None,
2117                diagnostics: vec![lsp::Diagnostic {
2118                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2119                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2120                    message: "message 1".to_string(),
2121                    ..Default::default()
2122                }],
2123            })
2124            .await;
2125
2126        // Wait for server to see the diagnostics update.
2127        server
2128            .condition(|store| {
2129                let worktree = store
2130                    .project(project_id)
2131                    .unwrap()
2132                    .worktrees
2133                    .get(&worktree_id.to_proto())
2134                    .unwrap();
2135
2136                !worktree
2137                    .share
2138                    .as_ref()
2139                    .unwrap()
2140                    .diagnostic_summaries
2141                    .is_empty()
2142            })
2143            .await;
2144
2145        // Join the worktree as client B.
2146        let project_b = Project::remote(
2147            project_id,
2148            client_b.clone(),
2149            client_b.user_store.clone(),
2150            lang_registry.clone(),
2151            fs.clone(),
2152            &mut cx_b.to_async(),
2153        )
2154        .await
2155        .unwrap();
2156
2157        project_b.read_with(&cx_b, |project, cx| {
2158            assert_eq!(
2159                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2160                &[(
2161                    ProjectPath {
2162                        worktree_id,
2163                        path: Arc::from(Path::new("a.rs")),
2164                    },
2165                    DiagnosticSummary {
2166                        error_count: 1,
2167                        warning_count: 0,
2168                        ..Default::default()
2169                    },
2170                )]
2171            )
2172        });
2173
2174        // Simulate a language server reporting more errors for a file.
2175        fake_language_server
2176            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2177                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2178                version: None,
2179                diagnostics: vec![
2180                    lsp::Diagnostic {
2181                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2182                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2183                        message: "message 1".to_string(),
2184                        ..Default::default()
2185                    },
2186                    lsp::Diagnostic {
2187                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2188                        range: lsp::Range::new(
2189                            lsp::Position::new(0, 10),
2190                            lsp::Position::new(0, 13),
2191                        ),
2192                        message: "message 2".to_string(),
2193                        ..Default::default()
2194                    },
2195                ],
2196            })
2197            .await;
2198
2199        // Client b gets the updated summaries
2200        project_b
2201            .condition(&cx_b, |project, cx| {
2202                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2203                    == &[(
2204                        ProjectPath {
2205                            worktree_id,
2206                            path: Arc::from(Path::new("a.rs")),
2207                        },
2208                        DiagnosticSummary {
2209                            error_count: 1,
2210                            warning_count: 1,
2211                            ..Default::default()
2212                        },
2213                    )]
2214            })
2215            .await;
2216
2217        // Open the file with the errors on client B. They should be present.
2218        let buffer_b = cx_b
2219            .background()
2220            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2221            .await
2222            .unwrap();
2223
2224        buffer_b.read_with(&cx_b, |buffer, _| {
2225            assert_eq!(
2226                buffer
2227                    .snapshot()
2228                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2229                    .map(|entry| entry)
2230                    .collect::<Vec<_>>(),
2231                &[
2232                    DiagnosticEntry {
2233                        range: Point::new(0, 4)..Point::new(0, 7),
2234                        diagnostic: Diagnostic {
2235                            group_id: 0,
2236                            message: "message 1".to_string(),
2237                            severity: lsp::DiagnosticSeverity::ERROR,
2238                            is_primary: true,
2239                            ..Default::default()
2240                        }
2241                    },
2242                    DiagnosticEntry {
2243                        range: Point::new(0, 10)..Point::new(0, 13),
2244                        diagnostic: Diagnostic {
2245                            group_id: 1,
2246                            severity: lsp::DiagnosticSeverity::WARNING,
2247                            message: "message 2".to_string(),
2248                            is_primary: true,
2249                            ..Default::default()
2250                        }
2251                    }
2252                ]
2253            );
2254        });
2255    }
2256
2257    #[gpui::test]
2258    async fn test_collaborating_with_completion(
2259        mut cx_a: TestAppContext,
2260        mut cx_b: TestAppContext,
2261    ) {
2262        cx_a.foreground().forbid_parking();
2263        let mut lang_registry = Arc::new(LanguageRegistry::new());
2264        let fs = Arc::new(FakeFs::new(cx_a.background()));
2265
2266        // Set up a fake language server.
2267        let (language_server_config, mut fake_language_server) =
2268            LanguageServerConfig::fake_with_capabilities(
2269                lsp::ServerCapabilities {
2270                    completion_provider: Some(lsp::CompletionOptions {
2271                        trigger_characters: Some(vec![".".to_string()]),
2272                        ..Default::default()
2273                    }),
2274                    ..Default::default()
2275                },
2276                cx_a.background(),
2277            )
2278            .await;
2279        Arc::get_mut(&mut lang_registry)
2280            .unwrap()
2281            .add(Arc::new(Language::new(
2282                LanguageConfig {
2283                    name: "Rust".to_string(),
2284                    path_suffixes: vec!["rs".to_string()],
2285                    language_server: Some(language_server_config),
2286                    ..Default::default()
2287                },
2288                Some(tree_sitter_rust::language()),
2289            )));
2290
2291        // Connect to a server as 2 clients.
2292        let mut server = TestServer::start(cx_a.foreground()).await;
2293        let client_a = server.create_client(&mut cx_a, "user_a").await;
2294        let client_b = server.create_client(&mut cx_b, "user_b").await;
2295
2296        // Share a project as client A
2297        fs.insert_tree(
2298            "/a",
2299            json!({
2300                ".zed.toml": r#"collaborators = ["user_b"]"#,
2301                "main.rs": "fn main() { a }",
2302                "other.rs": "",
2303            }),
2304        )
2305        .await;
2306        let project_a = cx_a.update(|cx| {
2307            Project::local(
2308                client_a.clone(),
2309                client_a.user_store.clone(),
2310                lang_registry.clone(),
2311                fs.clone(),
2312                cx,
2313            )
2314        });
2315        let (worktree_a, _) = project_a
2316            .update(&mut cx_a, |p, cx| {
2317                p.find_or_create_local_worktree("/a", false, cx)
2318            })
2319            .await
2320            .unwrap();
2321        worktree_a
2322            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2323            .await;
2324        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2325        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2326        project_a
2327            .update(&mut cx_a, |p, cx| p.share(cx))
2328            .await
2329            .unwrap();
2330
2331        // Join the worktree as client B.
2332        let project_b = Project::remote(
2333            project_id,
2334            client_b.clone(),
2335            client_b.user_store.clone(),
2336            lang_registry.clone(),
2337            fs.clone(),
2338            &mut cx_b.to_async(),
2339        )
2340        .await
2341        .unwrap();
2342
2343        // Open a file in an editor as the guest.
2344        let buffer_b = project_b
2345            .update(&mut cx_b, |p, cx| {
2346                p.open_buffer((worktree_id, "main.rs"), cx)
2347            })
2348            .await
2349            .unwrap();
2350        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2351        let editor_b = cx_b.add_view(window_b, |cx| {
2352            Editor::for_buffer(
2353                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2354                Arc::new(|cx| EditorSettings::test(cx)),
2355                cx,
2356            )
2357        });
2358
2359        // Type a completion trigger character as the guest.
2360        editor_b.update(&mut cx_b, |editor, cx| {
2361            editor.select_ranges([13..13], None, cx);
2362            editor.handle_input(&Input(".".into()), cx);
2363            cx.focus(&editor_b);
2364        });
2365
2366        // Receive a completion request as the host's language server.
2367        let (request_id, params) = fake_language_server
2368            .receive_request::<lsp::request::Completion>()
2369            .await;
2370        assert_eq!(
2371            params.text_document_position.text_document.uri,
2372            lsp::Url::from_file_path("/a/main.rs").unwrap(),
2373        );
2374        assert_eq!(
2375            params.text_document_position.position,
2376            lsp::Position::new(0, 14),
2377        );
2378
2379        // Return some completions from the host's language server.
2380        fake_language_server
2381            .respond(
2382                request_id,
2383                Some(lsp::CompletionResponse::Array(vec![
2384                    lsp::CompletionItem {
2385                        label: "first_method(…)".into(),
2386                        detail: Some("fn(&mut self, B) -> C".into()),
2387                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2388                            new_text: "first_method($1)".to_string(),
2389                            range: lsp::Range::new(
2390                                lsp::Position::new(0, 14),
2391                                lsp::Position::new(0, 14),
2392                            ),
2393                        })),
2394                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2395                        ..Default::default()
2396                    },
2397                    lsp::CompletionItem {
2398                        label: "second_method(…)".into(),
2399                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2400                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2401                            new_text: "second_method()".to_string(),
2402                            range: lsp::Range::new(
2403                                lsp::Position::new(0, 14),
2404                                lsp::Position::new(0, 14),
2405                            ),
2406                        })),
2407                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2408                        ..Default::default()
2409                    },
2410                ])),
2411            )
2412            .await;
2413
2414        // Open the buffer on the host.
2415        let buffer_a = project_a
2416            .update(&mut cx_a, |p, cx| {
2417                p.open_buffer((worktree_id, "main.rs"), cx)
2418            })
2419            .await
2420            .unwrap();
2421        buffer_a
2422            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2423            .await;
2424
2425        // Confirm a completion on the guest.
2426        editor_b.next_notification(&cx_b).await;
2427        editor_b.update(&mut cx_b, |editor, cx| {
2428            assert!(editor.showing_context_menu());
2429            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2430            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2431        });
2432
2433        buffer_a
2434            .condition(&cx_a, |buffer, _| {
2435                buffer.text() == "fn main() { a.first_method() }"
2436            })
2437            .await;
2438
2439        // Receive a request resolve the selected completion on the host's language server.
2440        let (request_id, params) = fake_language_server
2441            .receive_request::<lsp::request::ResolveCompletionItem>()
2442            .await;
2443        assert_eq!(params.label, "first_method(…)");
2444
2445        // Return a resolved completion from the host's language server.
2446        // The resolved completion has an additional text edit.
2447        fake_language_server
2448            .respond(
2449                request_id,
2450                lsp::CompletionItem {
2451                    label: "first_method(…)".into(),
2452                    detail: Some("fn(&mut self, B) -> C".into()),
2453                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2454                        new_text: "first_method($1)".to_string(),
2455                        range: lsp::Range::new(
2456                            lsp::Position::new(0, 14),
2457                            lsp::Position::new(0, 14),
2458                        ),
2459                    })),
2460                    additional_text_edits: Some(vec![lsp::TextEdit {
2461                        new_text: "use d::SomeTrait;\n".to_string(),
2462                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2463                    }]),
2464                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2465                    ..Default::default()
2466                },
2467            )
2468            .await;
2469
2470        // The additional edit is applied.
2471        buffer_b
2472            .condition(&cx_b, |buffer, _| {
2473                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2474            })
2475            .await;
2476        assert_eq!(
2477            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2478            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2479        );
2480    }
2481
2482    #[gpui::test]
2483    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2484        cx_a.foreground().forbid_parking();
2485        let mut lang_registry = Arc::new(LanguageRegistry::new());
2486        let fs = Arc::new(FakeFs::new(cx_a.background()));
2487
2488        // Set up a fake language server.
2489        let (language_server_config, mut fake_language_server) =
2490            LanguageServerConfig::fake(cx_a.background()).await;
2491        Arc::get_mut(&mut lang_registry)
2492            .unwrap()
2493            .add(Arc::new(Language::new(
2494                LanguageConfig {
2495                    name: "Rust".to_string(),
2496                    path_suffixes: vec!["rs".to_string()],
2497                    language_server: Some(language_server_config),
2498                    ..Default::default()
2499                },
2500                Some(tree_sitter_rust::language()),
2501            )));
2502
2503        // Connect to a server as 2 clients.
2504        let mut server = TestServer::start(cx_a.foreground()).await;
2505        let client_a = server.create_client(&mut cx_a, "user_a").await;
2506        let client_b = server.create_client(&mut cx_b, "user_b").await;
2507
2508        // Share a project as client A
2509        fs.insert_tree(
2510            "/a",
2511            json!({
2512                ".zed.toml": r#"collaborators = ["user_b"]"#,
2513                "a.rs": "let one = two",
2514            }),
2515        )
2516        .await;
2517        let project_a = cx_a.update(|cx| {
2518            Project::local(
2519                client_a.clone(),
2520                client_a.user_store.clone(),
2521                lang_registry.clone(),
2522                fs.clone(),
2523                cx,
2524            )
2525        });
2526        let (worktree_a, _) = project_a
2527            .update(&mut cx_a, |p, cx| {
2528                p.find_or_create_local_worktree("/a", false, cx)
2529            })
2530            .await
2531            .unwrap();
2532        worktree_a
2533            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2534            .await;
2535        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2536        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2537        project_a
2538            .update(&mut cx_a, |p, cx| p.share(cx))
2539            .await
2540            .unwrap();
2541
2542        // Join the worktree as client B.
2543        let project_b = Project::remote(
2544            project_id,
2545            client_b.clone(),
2546            client_b.user_store.clone(),
2547            lang_registry.clone(),
2548            fs.clone(),
2549            &mut cx_b.to_async(),
2550        )
2551        .await
2552        .unwrap();
2553
2554        let buffer_b = cx_b
2555            .background()
2556            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2557            .await
2558            .unwrap();
2559
2560        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2561        let (request_id, _) = fake_language_server
2562            .receive_request::<lsp::request::Formatting>()
2563            .await;
2564        fake_language_server
2565            .respond(
2566                request_id,
2567                Some(vec![
2568                    lsp::TextEdit {
2569                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2570                        new_text: "h".to_string(),
2571                    },
2572                    lsp::TextEdit {
2573                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2574                        new_text: "y".to_string(),
2575                    },
2576                ]),
2577            )
2578            .await;
2579        format.await.unwrap();
2580        assert_eq!(
2581            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2582            "let honey = two"
2583        );
2584    }
2585
2586    #[gpui::test]
2587    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2588        cx_a.foreground().forbid_parking();
2589        let mut lang_registry = Arc::new(LanguageRegistry::new());
2590        let fs = Arc::new(FakeFs::new(cx_a.background()));
2591        fs.insert_tree(
2592            "/root-1",
2593            json!({
2594                ".zed.toml": r#"collaborators = ["user_b"]"#,
2595                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2596            }),
2597        )
2598        .await;
2599        fs.insert_tree(
2600            "/root-2",
2601            json!({
2602                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2603            }),
2604        )
2605        .await;
2606
2607        // Set up a fake language server.
2608        let (language_server_config, mut fake_language_server) =
2609            LanguageServerConfig::fake(cx_a.background()).await;
2610        Arc::get_mut(&mut lang_registry)
2611            .unwrap()
2612            .add(Arc::new(Language::new(
2613                LanguageConfig {
2614                    name: "Rust".to_string(),
2615                    path_suffixes: vec!["rs".to_string()],
2616                    language_server: Some(language_server_config),
2617                    ..Default::default()
2618                },
2619                Some(tree_sitter_rust::language()),
2620            )));
2621
2622        // Connect to a server as 2 clients.
2623        let mut server = TestServer::start(cx_a.foreground()).await;
2624        let client_a = server.create_client(&mut cx_a, "user_a").await;
2625        let client_b = server.create_client(&mut cx_b, "user_b").await;
2626
2627        // Share a project as client A
2628        let project_a = cx_a.update(|cx| {
2629            Project::local(
2630                client_a.clone(),
2631                client_a.user_store.clone(),
2632                lang_registry.clone(),
2633                fs.clone(),
2634                cx,
2635            )
2636        });
2637        let (worktree_a, _) = project_a
2638            .update(&mut cx_a, |p, cx| {
2639                p.find_or_create_local_worktree("/root-1", false, cx)
2640            })
2641            .await
2642            .unwrap();
2643        worktree_a
2644            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2645            .await;
2646        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2647        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2648        project_a
2649            .update(&mut cx_a, |p, cx| p.share(cx))
2650            .await
2651            .unwrap();
2652
2653        // Join the worktree as client B.
2654        let project_b = Project::remote(
2655            project_id,
2656            client_b.clone(),
2657            client_b.user_store.clone(),
2658            lang_registry.clone(),
2659            fs.clone(),
2660            &mut cx_b.to_async(),
2661        )
2662        .await
2663        .unwrap();
2664
2665        // Open the file to be formatted on client B.
2666        let buffer_b = cx_b
2667            .background()
2668            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2669            .await
2670            .unwrap();
2671
2672        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2673        let (request_id, _) = fake_language_server
2674            .receive_request::<lsp::request::GotoDefinition>()
2675            .await;
2676        fake_language_server
2677            .respond(
2678                request_id,
2679                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2680                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2681                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2682                ))),
2683            )
2684            .await;
2685        let definitions_1 = definitions_1.await.unwrap();
2686        cx_b.read(|cx| {
2687            assert_eq!(definitions_1.len(), 1);
2688            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2689            let target_buffer = definitions_1[0].target_buffer.read(cx);
2690            assert_eq!(
2691                target_buffer.text(),
2692                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2693            );
2694            assert_eq!(
2695                definitions_1[0].target_range.to_point(target_buffer),
2696                Point::new(0, 6)..Point::new(0, 9)
2697            );
2698        });
2699
2700        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2701        // the previous call to `definition`.
2702        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2703        let (request_id, _) = fake_language_server
2704            .receive_request::<lsp::request::GotoDefinition>()
2705            .await;
2706        fake_language_server
2707            .respond(
2708                request_id,
2709                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2710                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2711                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2712                ))),
2713            )
2714            .await;
2715        let definitions_2 = definitions_2.await.unwrap();
2716        cx_b.read(|cx| {
2717            assert_eq!(definitions_2.len(), 1);
2718            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2719            let target_buffer = definitions_2[0].target_buffer.read(cx);
2720            assert_eq!(
2721                target_buffer.text(),
2722                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2723            );
2724            assert_eq!(
2725                definitions_2[0].target_range.to_point(target_buffer),
2726                Point::new(1, 6)..Point::new(1, 11)
2727            );
2728        });
2729        assert_eq!(
2730            definitions_1[0].target_buffer,
2731            definitions_2[0].target_buffer
2732        );
2733
2734        cx_b.update(|_| {
2735            drop(definitions_1);
2736            drop(definitions_2);
2737        });
2738        project_b
2739            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2740            .await;
2741    }
2742
2743    #[gpui::test]
2744    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2745        mut cx_a: TestAppContext,
2746        mut cx_b: TestAppContext,
2747        mut rng: StdRng,
2748    ) {
2749        cx_a.foreground().forbid_parking();
2750        let mut lang_registry = Arc::new(LanguageRegistry::new());
2751        let fs = Arc::new(FakeFs::new(cx_a.background()));
2752        fs.insert_tree(
2753            "/root",
2754            json!({
2755                ".zed.toml": r#"collaborators = ["user_b"]"#,
2756                "a.rs": "const ONE: usize = b::TWO;",
2757                "b.rs": "const TWO: usize = 2",
2758            }),
2759        )
2760        .await;
2761
2762        // Set up a fake language server.
2763        let (language_server_config, mut fake_language_server) =
2764            LanguageServerConfig::fake(cx_a.background()).await;
2765        Arc::get_mut(&mut lang_registry)
2766            .unwrap()
2767            .add(Arc::new(Language::new(
2768                LanguageConfig {
2769                    name: "Rust".to_string(),
2770                    path_suffixes: vec!["rs".to_string()],
2771                    language_server: Some(language_server_config),
2772                    ..Default::default()
2773                },
2774                Some(tree_sitter_rust::language()),
2775            )));
2776
2777        // Connect to a server as 2 clients.
2778        let mut server = TestServer::start(cx_a.foreground()).await;
2779        let client_a = server.create_client(&mut cx_a, "user_a").await;
2780        let client_b = server.create_client(&mut cx_b, "user_b").await;
2781
2782        // Share a project as client A
2783        let project_a = cx_a.update(|cx| {
2784            Project::local(
2785                client_a.clone(),
2786                client_a.user_store.clone(),
2787                lang_registry.clone(),
2788                fs.clone(),
2789                cx,
2790            )
2791        });
2792        let (worktree_a, _) = project_a
2793            .update(&mut cx_a, |p, cx| {
2794                p.find_or_create_local_worktree("/root", false, cx)
2795            })
2796            .await
2797            .unwrap();
2798        worktree_a
2799            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2800            .await;
2801        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2802        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2803        project_a
2804            .update(&mut cx_a, |p, cx| p.share(cx))
2805            .await
2806            .unwrap();
2807
2808        // Join the worktree as client B.
2809        let project_b = Project::remote(
2810            project_id,
2811            client_b.clone(),
2812            client_b.user_store.clone(),
2813            lang_registry.clone(),
2814            fs.clone(),
2815            &mut cx_b.to_async(),
2816        )
2817        .await
2818        .unwrap();
2819
2820        let buffer_b1 = cx_b
2821            .background()
2822            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2823            .await
2824            .unwrap();
2825
2826        let definitions;
2827        let buffer_b2;
2828        if rng.gen() {
2829            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2830            buffer_b2 =
2831                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2832        } else {
2833            buffer_b2 =
2834                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2835            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2836        }
2837
2838        let (request_id, _) = fake_language_server
2839            .receive_request::<lsp::request::GotoDefinition>()
2840            .await;
2841        fake_language_server
2842            .respond(
2843                request_id,
2844                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2845                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2846                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2847                ))),
2848            )
2849            .await;
2850
2851        let buffer_b2 = buffer_b2.await.unwrap();
2852        let definitions = definitions.await.unwrap();
2853        assert_eq!(definitions.len(), 1);
2854        assert_eq!(definitions[0].target_buffer, buffer_b2);
2855    }
2856
2857    #[gpui::test]
2858    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2859        cx_a.foreground().forbid_parking();
2860
2861        // Connect to a server as 2 clients.
2862        let mut server = TestServer::start(cx_a.foreground()).await;
2863        let client_a = server.create_client(&mut cx_a, "user_a").await;
2864        let client_b = server.create_client(&mut cx_b, "user_b").await;
2865
2866        // Create an org that includes these 2 users.
2867        let db = &server.app_state.db;
2868        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2869        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2870            .await
2871            .unwrap();
2872        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2873            .await
2874            .unwrap();
2875
2876        // Create a channel that includes all the users.
2877        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2878        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2879            .await
2880            .unwrap();
2881        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2882            .await
2883            .unwrap();
2884        db.create_channel_message(
2885            channel_id,
2886            client_b.current_user_id(&cx_b),
2887            "hello A, it's B.",
2888            OffsetDateTime::now_utc(),
2889            1,
2890        )
2891        .await
2892        .unwrap();
2893
2894        let channels_a = cx_a
2895            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2896        channels_a
2897            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2898            .await;
2899        channels_a.read_with(&cx_a, |list, _| {
2900            assert_eq!(
2901                list.available_channels().unwrap(),
2902                &[ChannelDetails {
2903                    id: channel_id.to_proto(),
2904                    name: "test-channel".to_string()
2905                }]
2906            )
2907        });
2908        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2909            this.get_channel(channel_id.to_proto(), cx).unwrap()
2910        });
2911        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2912        channel_a
2913            .condition(&cx_a, |channel, _| {
2914                channel_messages(channel)
2915                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2916            })
2917            .await;
2918
2919        let channels_b = cx_b
2920            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2921        channels_b
2922            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2923            .await;
2924        channels_b.read_with(&cx_b, |list, _| {
2925            assert_eq!(
2926                list.available_channels().unwrap(),
2927                &[ChannelDetails {
2928                    id: channel_id.to_proto(),
2929                    name: "test-channel".to_string()
2930                }]
2931            )
2932        });
2933
2934        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2935            this.get_channel(channel_id.to_proto(), cx).unwrap()
2936        });
2937        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2938        channel_b
2939            .condition(&cx_b, |channel, _| {
2940                channel_messages(channel)
2941                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2942            })
2943            .await;
2944
2945        channel_a
2946            .update(&mut cx_a, |channel, cx| {
2947                channel
2948                    .send_message("oh, hi B.".to_string(), cx)
2949                    .unwrap()
2950                    .detach();
2951                let task = channel.send_message("sup".to_string(), cx).unwrap();
2952                assert_eq!(
2953                    channel_messages(channel),
2954                    &[
2955                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2956                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2957                        ("user_a".to_string(), "sup".to_string(), true)
2958                    ]
2959                );
2960                task
2961            })
2962            .await
2963            .unwrap();
2964
2965        channel_b
2966            .condition(&cx_b, |channel, _| {
2967                channel_messages(channel)
2968                    == [
2969                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2970                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2971                        ("user_a".to_string(), "sup".to_string(), false),
2972                    ]
2973            })
2974            .await;
2975
2976        assert_eq!(
2977            server
2978                .state()
2979                .await
2980                .channel(channel_id)
2981                .unwrap()
2982                .connection_ids
2983                .len(),
2984            2
2985        );
2986        cx_b.update(|_| drop(channel_b));
2987        server
2988            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2989            .await;
2990
2991        cx_a.update(|_| drop(channel_a));
2992        server
2993            .condition(|state| state.channel(channel_id).is_none())
2994            .await;
2995    }
2996
2997    #[gpui::test]
2998    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2999        cx_a.foreground().forbid_parking();
3000
3001        let mut server = TestServer::start(cx_a.foreground()).await;
3002        let client_a = server.create_client(&mut cx_a, "user_a").await;
3003
3004        let db = &server.app_state.db;
3005        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3006        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3007        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3008            .await
3009            .unwrap();
3010        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3011            .await
3012            .unwrap();
3013
3014        let channels_a = cx_a
3015            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3016        channels_a
3017            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3018            .await;
3019        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3020            this.get_channel(channel_id.to_proto(), cx).unwrap()
3021        });
3022
3023        // Messages aren't allowed to be too long.
3024        channel_a
3025            .update(&mut cx_a, |channel, cx| {
3026                let long_body = "this is long.\n".repeat(1024);
3027                channel.send_message(long_body, cx).unwrap()
3028            })
3029            .await
3030            .unwrap_err();
3031
3032        // Messages aren't allowed to be blank.
3033        channel_a.update(&mut cx_a, |channel, cx| {
3034            channel.send_message(String::new(), cx).unwrap_err()
3035        });
3036
3037        // Leading and trailing whitespace are trimmed.
3038        channel_a
3039            .update(&mut cx_a, |channel, cx| {
3040                channel
3041                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3042                    .unwrap()
3043            })
3044            .await
3045            .unwrap();
3046        assert_eq!(
3047            db.get_channel_messages(channel_id, 10, None)
3048                .await
3049                .unwrap()
3050                .iter()
3051                .map(|m| &m.body)
3052                .collect::<Vec<_>>(),
3053            &["surrounded by whitespace"]
3054        );
3055    }
3056
3057    #[gpui::test]
3058    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3059        cx_a.foreground().forbid_parking();
3060
3061        // Connect to a server as 2 clients.
3062        let mut server = TestServer::start(cx_a.foreground()).await;
3063        let client_a = server.create_client(&mut cx_a, "user_a").await;
3064        let client_b = server.create_client(&mut cx_b, "user_b").await;
3065        let mut status_b = client_b.status();
3066
3067        // Create an org that includes these 2 users.
3068        let db = &server.app_state.db;
3069        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3070        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3071            .await
3072            .unwrap();
3073        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3074            .await
3075            .unwrap();
3076
3077        // Create a channel that includes all the users.
3078        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3079        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3080            .await
3081            .unwrap();
3082        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3083            .await
3084            .unwrap();
3085        db.create_channel_message(
3086            channel_id,
3087            client_b.current_user_id(&cx_b),
3088            "hello A, it's B.",
3089            OffsetDateTime::now_utc(),
3090            2,
3091        )
3092        .await
3093        .unwrap();
3094
3095        let channels_a = cx_a
3096            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3097        channels_a
3098            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3099            .await;
3100
3101        channels_a.read_with(&cx_a, |list, _| {
3102            assert_eq!(
3103                list.available_channels().unwrap(),
3104                &[ChannelDetails {
3105                    id: channel_id.to_proto(),
3106                    name: "test-channel".to_string()
3107                }]
3108            )
3109        });
3110        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3111            this.get_channel(channel_id.to_proto(), cx).unwrap()
3112        });
3113        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3114        channel_a
3115            .condition(&cx_a, |channel, _| {
3116                channel_messages(channel)
3117                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3118            })
3119            .await;
3120
3121        let channels_b = cx_b
3122            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3123        channels_b
3124            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3125            .await;
3126        channels_b.read_with(&cx_b, |list, _| {
3127            assert_eq!(
3128                list.available_channels().unwrap(),
3129                &[ChannelDetails {
3130                    id: channel_id.to_proto(),
3131                    name: "test-channel".to_string()
3132                }]
3133            )
3134        });
3135
3136        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3137            this.get_channel(channel_id.to_proto(), cx).unwrap()
3138        });
3139        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3140        channel_b
3141            .condition(&cx_b, |channel, _| {
3142                channel_messages(channel)
3143                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3144            })
3145            .await;
3146
3147        // Disconnect client B, ensuring we can still access its cached channel data.
3148        server.forbid_connections();
3149        server.disconnect_client(client_b.current_user_id(&cx_b));
3150        while !matches!(
3151            status_b.next().await,
3152            Some(client::Status::ReconnectionError { .. })
3153        ) {}
3154
3155        channels_b.read_with(&cx_b, |channels, _| {
3156            assert_eq!(
3157                channels.available_channels().unwrap(),
3158                [ChannelDetails {
3159                    id: channel_id.to_proto(),
3160                    name: "test-channel".to_string()
3161                }]
3162            )
3163        });
3164        channel_b.read_with(&cx_b, |channel, _| {
3165            assert_eq!(
3166                channel_messages(channel),
3167                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3168            )
3169        });
3170
3171        // Send a message from client B while it is disconnected.
3172        channel_b
3173            .update(&mut cx_b, |channel, cx| {
3174                let task = channel
3175                    .send_message("can you see this?".to_string(), cx)
3176                    .unwrap();
3177                assert_eq!(
3178                    channel_messages(channel),
3179                    &[
3180                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3181                        ("user_b".to_string(), "can you see this?".to_string(), true)
3182                    ]
3183                );
3184                task
3185            })
3186            .await
3187            .unwrap_err();
3188
3189        // Send a message from client A while B is disconnected.
3190        channel_a
3191            .update(&mut cx_a, |channel, cx| {
3192                channel
3193                    .send_message("oh, hi B.".to_string(), cx)
3194                    .unwrap()
3195                    .detach();
3196                let task = channel.send_message("sup".to_string(), cx).unwrap();
3197                assert_eq!(
3198                    channel_messages(channel),
3199                    &[
3200                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3201                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3202                        ("user_a".to_string(), "sup".to_string(), true)
3203                    ]
3204                );
3205                task
3206            })
3207            .await
3208            .unwrap();
3209
3210        // Give client B a chance to reconnect.
3211        server.allow_connections();
3212        cx_b.foreground().advance_clock(Duration::from_secs(10));
3213
3214        // Verify that B sees the new messages upon reconnection, as well as the message client B
3215        // sent while offline.
3216        channel_b
3217            .condition(&cx_b, |channel, _| {
3218                channel_messages(channel)
3219                    == [
3220                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3221                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3222                        ("user_a".to_string(), "sup".to_string(), false),
3223                        ("user_b".to_string(), "can you see this?".to_string(), false),
3224                    ]
3225            })
3226            .await;
3227
3228        // Ensure client A and B can communicate normally after reconnection.
3229        channel_a
3230            .update(&mut cx_a, |channel, cx| {
3231                channel.send_message("you online?".to_string(), cx).unwrap()
3232            })
3233            .await
3234            .unwrap();
3235        channel_b
3236            .condition(&cx_b, |channel, _| {
3237                channel_messages(channel)
3238                    == [
3239                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3240                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3241                        ("user_a".to_string(), "sup".to_string(), false),
3242                        ("user_b".to_string(), "can you see this?".to_string(), false),
3243                        ("user_a".to_string(), "you online?".to_string(), false),
3244                    ]
3245            })
3246            .await;
3247
3248        channel_b
3249            .update(&mut cx_b, |channel, cx| {
3250                channel.send_message("yep".to_string(), cx).unwrap()
3251            })
3252            .await
3253            .unwrap();
3254        channel_a
3255            .condition(&cx_a, |channel, _| {
3256                channel_messages(channel)
3257                    == [
3258                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3259                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3260                        ("user_a".to_string(), "sup".to_string(), false),
3261                        ("user_b".to_string(), "can you see this?".to_string(), false),
3262                        ("user_a".to_string(), "you online?".to_string(), false),
3263                        ("user_b".to_string(), "yep".to_string(), false),
3264                    ]
3265            })
3266            .await;
3267    }
3268
3269    #[gpui::test]
3270    async fn test_contacts(
3271        mut cx_a: TestAppContext,
3272        mut cx_b: TestAppContext,
3273        mut cx_c: TestAppContext,
3274    ) {
3275        cx_a.foreground().forbid_parking();
3276        let lang_registry = Arc::new(LanguageRegistry::new());
3277        let fs = Arc::new(FakeFs::new(cx_a.background()));
3278
3279        // Connect to a server as 3 clients.
3280        let mut server = TestServer::start(cx_a.foreground()).await;
3281        let client_a = server.create_client(&mut cx_a, "user_a").await;
3282        let client_b = server.create_client(&mut cx_b, "user_b").await;
3283        let client_c = server.create_client(&mut cx_c, "user_c").await;
3284
3285        // Share a worktree as client A.
3286        fs.insert_tree(
3287            "/a",
3288            json!({
3289                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3290            }),
3291        )
3292        .await;
3293
3294        let project_a = cx_a.update(|cx| {
3295            Project::local(
3296                client_a.clone(),
3297                client_a.user_store.clone(),
3298                lang_registry.clone(),
3299                fs.clone(),
3300                cx,
3301            )
3302        });
3303        let (worktree_a, _) = project_a
3304            .update(&mut cx_a, |p, cx| {
3305                p.find_or_create_local_worktree("/a", false, cx)
3306            })
3307            .await
3308            .unwrap();
3309        worktree_a
3310            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3311            .await;
3312
3313        client_a
3314            .user_store
3315            .condition(&cx_a, |user_store, _| {
3316                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3317            })
3318            .await;
3319        client_b
3320            .user_store
3321            .condition(&cx_b, |user_store, _| {
3322                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3323            })
3324            .await;
3325        client_c
3326            .user_store
3327            .condition(&cx_c, |user_store, _| {
3328                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3329            })
3330            .await;
3331
3332        let project_id = project_a
3333            .update(&mut cx_a, |project, _| project.next_remote_id())
3334            .await;
3335        project_a
3336            .update(&mut cx_a, |project, cx| project.share(cx))
3337            .await
3338            .unwrap();
3339
3340        let _project_b = Project::remote(
3341            project_id,
3342            client_b.clone(),
3343            client_b.user_store.clone(),
3344            lang_registry.clone(),
3345            fs.clone(),
3346            &mut cx_b.to_async(),
3347        )
3348        .await
3349        .unwrap();
3350
3351        client_a
3352            .user_store
3353            .condition(&cx_a, |user_store, _| {
3354                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3355            })
3356            .await;
3357        client_b
3358            .user_store
3359            .condition(&cx_b, |user_store, _| {
3360                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3361            })
3362            .await;
3363        client_c
3364            .user_store
3365            .condition(&cx_c, |user_store, _| {
3366                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3367            })
3368            .await;
3369
3370        project_a
3371            .condition(&cx_a, |project, _| {
3372                project.collaborators().contains_key(&client_b.peer_id)
3373            })
3374            .await;
3375
3376        cx_a.update(move |_| drop(project_a));
3377        client_a
3378            .user_store
3379            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3380            .await;
3381        client_b
3382            .user_store
3383            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3384            .await;
3385        client_c
3386            .user_store
3387            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3388            .await;
3389
3390        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3391            user_store
3392                .contacts()
3393                .iter()
3394                .map(|contact| {
3395                    let worktrees = contact
3396                        .projects
3397                        .iter()
3398                        .map(|p| {
3399                            (
3400                                p.worktree_root_names[0].as_str(),
3401                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3402                            )
3403                        })
3404                        .collect();
3405                    (contact.user.github_login.as_str(), worktrees)
3406                })
3407                .collect()
3408        }
3409    }
3410
3411    struct TestServer {
3412        peer: Arc<Peer>,
3413        app_state: Arc<AppState>,
3414        server: Arc<Server>,
3415        foreground: Rc<executor::Foreground>,
3416        notifications: mpsc::Receiver<()>,
3417        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3418        forbid_connections: Arc<AtomicBool>,
3419        _test_db: TestDb,
3420    }
3421
3422    impl TestServer {
3423        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3424            let test_db = TestDb::new();
3425            let app_state = Self::build_app_state(&test_db).await;
3426            let peer = Peer::new();
3427            let notifications = mpsc::channel(128);
3428            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3429            Self {
3430                peer,
3431                app_state,
3432                server,
3433                foreground,
3434                notifications: notifications.1,
3435                connection_killers: Default::default(),
3436                forbid_connections: Default::default(),
3437                _test_db: test_db,
3438            }
3439        }
3440
3441        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3442            let http = FakeHttpClient::with_404_response();
3443            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3444            let client_name = name.to_string();
3445            let mut client = Client::new(http.clone());
3446            let server = self.server.clone();
3447            let connection_killers = self.connection_killers.clone();
3448            let forbid_connections = self.forbid_connections.clone();
3449            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3450
3451            Arc::get_mut(&mut client)
3452                .unwrap()
3453                .override_authenticate(move |cx| {
3454                    cx.spawn(|_| async move {
3455                        let access_token = "the-token".to_string();
3456                        Ok(Credentials {
3457                            user_id: user_id.0 as u64,
3458                            access_token,
3459                        })
3460                    })
3461                })
3462                .override_establish_connection(move |credentials, cx| {
3463                    assert_eq!(credentials.user_id, user_id.0 as u64);
3464                    assert_eq!(credentials.access_token, "the-token");
3465
3466                    let server = server.clone();
3467                    let connection_killers = connection_killers.clone();
3468                    let forbid_connections = forbid_connections.clone();
3469                    let client_name = client_name.clone();
3470                    let connection_id_tx = connection_id_tx.clone();
3471                    cx.spawn(move |cx| async move {
3472                        if forbid_connections.load(SeqCst) {
3473                            Err(EstablishConnectionError::other(anyhow!(
3474                                "server is forbidding connections"
3475                            )))
3476                        } else {
3477                            let (client_conn, server_conn, kill_conn) =
3478                                Connection::in_memory(cx.background());
3479                            connection_killers.lock().insert(user_id, kill_conn);
3480                            cx.background()
3481                                .spawn(server.handle_connection(
3482                                    server_conn,
3483                                    client_name,
3484                                    user_id,
3485                                    Some(connection_id_tx),
3486                                ))
3487                                .detach();
3488                            Ok(client_conn)
3489                        }
3490                    })
3491                });
3492
3493            client
3494                .authenticate_and_connect(&cx.to_async())
3495                .await
3496                .unwrap();
3497
3498            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3499            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3500            let mut authed_user =
3501                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3502            while authed_user.next().await.unwrap().is_none() {}
3503
3504            TestClient {
3505                client,
3506                peer_id,
3507                user_store,
3508            }
3509        }
3510
3511        fn disconnect_client(&self, user_id: UserId) {
3512            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3513                let _ = kill_conn.try_send(Some(()));
3514            }
3515        }
3516
3517        fn forbid_connections(&self) {
3518            self.forbid_connections.store(true, SeqCst);
3519        }
3520
3521        fn allow_connections(&self) {
3522            self.forbid_connections.store(false, SeqCst);
3523        }
3524
3525        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3526            let mut config = Config::default();
3527            config.session_secret = "a".repeat(32);
3528            config.database_url = test_db.url.clone();
3529            let github_client = github::AppClient::test();
3530            Arc::new(AppState {
3531                db: test_db.db().clone(),
3532                handlebars: Default::default(),
3533                auth_client: auth::build_client("", ""),
3534                repo_client: github::RepoClient::test(&github_client),
3535                github_client,
3536                config,
3537            })
3538        }
3539
3540        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3541            self.server.store.read()
3542        }
3543
3544        async fn condition<F>(&mut self, mut predicate: F)
3545        where
3546            F: FnMut(&Store) -> bool,
3547        {
3548            async_std::future::timeout(Duration::from_millis(500), async {
3549                while !(predicate)(&*self.server.store.read()) {
3550                    self.foreground.start_waiting();
3551                    self.notifications.next().await;
3552                    self.foreground.finish_waiting();
3553                }
3554            })
3555            .await
3556            .expect("condition timed out");
3557        }
3558    }
3559
3560    impl Drop for TestServer {
3561        fn drop(&mut self) {
3562            self.peer.reset();
3563        }
3564    }
3565
3566    struct TestClient {
3567        client: Arc<Client>,
3568        pub peer_id: PeerId,
3569        pub user_store: ModelHandle<UserStore>,
3570    }
3571
3572    impl Deref for TestClient {
3573        type Target = Arc<Client>;
3574
3575        fn deref(&self) -> &Self::Target {
3576            &self.client
3577        }
3578    }
3579
3580    impl TestClient {
3581        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3582            UserId::from_proto(
3583                self.user_store
3584                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3585            )
3586        }
3587    }
3588
3589    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3590        channel
3591            .messages()
3592            .cursor::<()>()
3593            .map(|m| {
3594                (
3595                    m.sender.github_login.clone(),
3596                    m.body.clone(),
3597                    m.is_pending(),
3598                )
3599            })
3600            .collect()
3601    }
3602
3603    struct EmptyView;
3604
3605    impl gpui::Entity for EmptyView {
3606        type Event = ();
3607    }
3608
3609    impl gpui::View for EmptyView {
3610        fn ui_name() -> &'static str {
3611            "empty view"
3612        }
3613
3614        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3615            gpui::Element::boxed(gpui::elements::Empty)
3616        }
3617    }
3618}