rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46
  47impl Server {
  48    pub fn new(
  49        app_state: Arc<AppState>,
  50        peer: Arc<Peer>,
  51        notifications: Option<mpsc::Sender<()>>,
  52    ) -> Arc<Self> {
  53        let mut server = Self {
  54            peer,
  55            app_state,
  56            store: Default::default(),
  57            handlers: Default::default(),
  58            notifications,
  59        };
  60
  61        server
  62            .add_request_handler(Server::ping)
  63            .add_request_handler(Server::register_project)
  64            .add_message_handler(Server::unregister_project)
  65            .add_request_handler(Server::share_project)
  66            .add_message_handler(Server::unshare_project)
  67            .add_request_handler(Server::join_project)
  68            .add_message_handler(Server::leave_project)
  69            .add_request_handler(Server::register_worktree)
  70            .add_message_handler(Server::unregister_worktree)
  71            .add_request_handler(Server::share_worktree)
  72            .add_message_handler(Server::update_worktree)
  73            .add_message_handler(Server::update_diagnostic_summary)
  74            .add_message_handler(Server::disk_based_diagnostics_updating)
  75            .add_message_handler(Server::disk_based_diagnostics_updated)
  76            .add_request_handler(Server::get_definition)
  77            .add_request_handler(Server::open_buffer)
  78            .add_message_handler(Server::close_buffer)
  79            .add_request_handler(Server::update_buffer)
  80            .add_message_handler(Server::update_buffer_file)
  81            .add_message_handler(Server::buffer_reloaded)
  82            .add_message_handler(Server::buffer_saved)
  83            .add_request_handler(Server::save_buffer)
  84            .add_request_handler(Server::format_buffers)
  85            .add_request_handler(Server::get_completions)
  86            .add_request_handler(Server::apply_additional_edits_for_completion)
  87            .add_request_handler(Server::get_code_actions)
  88            .add_request_handler(Server::apply_code_action)
  89            .add_request_handler(Server::get_channels)
  90            .add_request_handler(Server::get_users)
  91            .add_request_handler(Server::join_channel)
  92            .add_message_handler(Server::leave_channel)
  93            .add_request_handler(Server::send_channel_message)
  94            .add_request_handler(Server::get_channel_messages);
  95
  96        Arc::new(server)
  97    }
  98
  99    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 100    where
 101        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 102        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 103        M: EnvelopedMessage,
 104    {
 105        let prev_handler = self.handlers.insert(
 106            TypeId::of::<M>(),
 107            Box::new(move |server, envelope| {
 108                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 109                (handler)(server, *envelope).boxed()
 110            }),
 111        );
 112        if prev_handler.is_some() {
 113            panic!("registered a handler for the same message twice");
 114        }
 115        self
 116    }
 117
 118    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 119    where
 120        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 121        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 122        M: RequestMessage,
 123    {
 124        let prev_handler = self.handlers.insert(
 125            TypeId::of::<M>(),
 126            Box::new(move |server, envelope| {
 127                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 128                let receipt = envelope.receipt();
 129                let response = (handler)(server.clone(), *envelope);
 130                async move {
 131                    match response.await {
 132                        Ok(response) => {
 133                            server.peer.respond(receipt, response)?;
 134                            Ok(())
 135                        }
 136                        Err(error) => {
 137                            server.peer.respond_with_error(
 138                                receipt,
 139                                proto::Error {
 140                                    message: error.to_string(),
 141                                },
 142                            )?;
 143                            Err(error)
 144                        }
 145                    }
 146                }
 147                .boxed()
 148            }),
 149        );
 150        if prev_handler.is_some() {
 151            panic!("registered a handler for the same message twice");
 152        }
 153        self
 154    }
 155
 156    pub fn handle_connection(
 157        self: &Arc<Self>,
 158        connection: Connection,
 159        addr: String,
 160        user_id: UserId,
 161        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 162    ) -> impl Future<Output = ()> {
 163        let mut this = self.clone();
 164        async move {
 165            let (connection_id, handle_io, mut incoming_rx) =
 166                this.peer.add_connection(connection).await;
 167
 168            if let Some(send_connection_id) = send_connection_id.as_mut() {
 169                let _ = send_connection_id.send(connection_id).await;
 170            }
 171
 172            this.state_mut().add_connection(connection_id, user_id);
 173            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 174                log::error!("error updating contacts for {:?}: {}", user_id, err);
 175            }
 176
 177            let handle_io = handle_io.fuse();
 178            futures::pin_mut!(handle_io);
 179            loop {
 180                let next_message = incoming_rx.next().fuse();
 181                futures::pin_mut!(next_message);
 182                futures::select_biased! {
 183                    result = handle_io => {
 184                        if let Err(err) = result {
 185                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 186                        }
 187                        break;
 188                    }
 189                    message = next_message => {
 190                        if let Some(message) = message {
 191                            let start_time = Instant::now();
 192                            let type_name = message.payload_type_name();
 193                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 194                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 195                                if let Err(err) = (handler)(this.clone(), message).await {
 196                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 197                                } else {
 198                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 199                                }
 200
 201                                if let Some(mut notifications) = this.notifications.clone() {
 202                                    let _ = notifications.send(()).await;
 203                                }
 204                            } else {
 205                                log::warn!("unhandled message: {}", type_name);
 206                            }
 207                        } else {
 208                            log::info!("rpc connection closed {:?}", addr);
 209                            break;
 210                        }
 211                    }
 212                }
 213            }
 214
 215            if let Err(err) = this.sign_out(connection_id).await {
 216                log::error!("error signing out connection {:?} - {:?}", addr, err);
 217            }
 218        }
 219    }
 220
 221    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 222        self.peer.disconnect(connection_id);
 223        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 224
 225        for (project_id, project) in removed_connection.hosted_projects {
 226            if let Some(share) = project.share {
 227                broadcast(
 228                    connection_id,
 229                    share.guests.keys().copied().collect(),
 230                    |conn_id| {
 231                        self.peer
 232                            .send(conn_id, proto::UnshareProject { project_id })
 233                    },
 234                )?;
 235            }
 236        }
 237
 238        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 239            broadcast(connection_id, peer_ids, |conn_id| {
 240                self.peer.send(
 241                    conn_id,
 242                    proto::RemoveProjectCollaborator {
 243                        project_id,
 244                        peer_id: connection_id.0,
 245                    },
 246                )
 247            })?;
 248        }
 249
 250        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 251        Ok(())
 252    }
 253
 254    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 255        Ok(proto::Ack {})
 256    }
 257
 258    async fn register_project(
 259        mut self: Arc<Server>,
 260        request: TypedEnvelope<proto::RegisterProject>,
 261    ) -> tide::Result<proto::RegisterProjectResponse> {
 262        let project_id = {
 263            let mut state = self.state_mut();
 264            let user_id = state.user_id_for_connection(request.sender_id)?;
 265            state.register_project(request.sender_id, user_id)
 266        };
 267        Ok(proto::RegisterProjectResponse { project_id })
 268    }
 269
 270    async fn unregister_project(
 271        mut self: Arc<Server>,
 272        request: TypedEnvelope<proto::UnregisterProject>,
 273    ) -> tide::Result<()> {
 274        let project = self
 275            .state_mut()
 276            .unregister_project(request.payload.project_id, request.sender_id)?;
 277        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 278        Ok(())
 279    }
 280
 281    async fn share_project(
 282        mut self: Arc<Server>,
 283        request: TypedEnvelope<proto::ShareProject>,
 284    ) -> tide::Result<proto::Ack> {
 285        self.state_mut()
 286            .share_project(request.payload.project_id, request.sender_id);
 287        Ok(proto::Ack {})
 288    }
 289
 290    async fn unshare_project(
 291        mut self: Arc<Server>,
 292        request: TypedEnvelope<proto::UnshareProject>,
 293    ) -> tide::Result<()> {
 294        let project_id = request.payload.project_id;
 295        let project = self
 296            .state_mut()
 297            .unshare_project(project_id, request.sender_id)?;
 298
 299        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 300            self.peer
 301                .send(conn_id, proto::UnshareProject { project_id })
 302        })?;
 303        self.update_contacts_for_users(&project.authorized_user_ids)?;
 304        Ok(())
 305    }
 306
 307    async fn join_project(
 308        mut self: Arc<Server>,
 309        request: TypedEnvelope<proto::JoinProject>,
 310    ) -> tide::Result<proto::JoinProjectResponse> {
 311        let project_id = request.payload.project_id;
 312
 313        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 314        let (response, connection_ids, contact_user_ids) = self
 315            .state_mut()
 316            .join_project(request.sender_id, user_id, project_id)
 317            .and_then(|joined| {
 318                let share = joined.project.share()?;
 319                let peer_count = share.guests.len();
 320                let mut collaborators = Vec::with_capacity(peer_count);
 321                collaborators.push(proto::Collaborator {
 322                    peer_id: joined.project.host_connection_id.0,
 323                    replica_id: 0,
 324                    user_id: joined.project.host_user_id.to_proto(),
 325                });
 326                let worktrees = joined
 327                    .project
 328                    .worktrees
 329                    .iter()
 330                    .filter_map(|(id, worktree)| {
 331                        worktree.share.as_ref().map(|share| proto::Worktree {
 332                            id: *id,
 333                            root_name: worktree.root_name.clone(),
 334                            entries: share.entries.values().cloned().collect(),
 335                            diagnostic_summaries: share
 336                                .diagnostic_summaries
 337                                .values()
 338                                .cloned()
 339                                .collect(),
 340                            weak: worktree.weak,
 341                        })
 342                    })
 343                    .collect();
 344                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 345                    if *peer_conn_id != request.sender_id {
 346                        collaborators.push(proto::Collaborator {
 347                            peer_id: peer_conn_id.0,
 348                            replica_id: *peer_replica_id as u32,
 349                            user_id: peer_user_id.to_proto(),
 350                        });
 351                    }
 352                }
 353                let response = proto::JoinProjectResponse {
 354                    worktrees,
 355                    replica_id: joined.replica_id as u32,
 356                    collaborators,
 357                };
 358                let connection_ids = joined.project.connection_ids();
 359                let contact_user_ids = joined.project.authorized_user_ids();
 360                Ok((response, connection_ids, contact_user_ids))
 361            })?;
 362
 363        broadcast(request.sender_id, connection_ids, |conn_id| {
 364            self.peer.send(
 365                conn_id,
 366                proto::AddProjectCollaborator {
 367                    project_id,
 368                    collaborator: Some(proto::Collaborator {
 369                        peer_id: request.sender_id.0,
 370                        replica_id: response.replica_id,
 371                        user_id: user_id.to_proto(),
 372                    }),
 373                },
 374            )
 375        })?;
 376        self.update_contacts_for_users(&contact_user_ids)?;
 377        Ok(response)
 378    }
 379
 380    async fn leave_project(
 381        mut self: Arc<Server>,
 382        request: TypedEnvelope<proto::LeaveProject>,
 383    ) -> tide::Result<()> {
 384        let sender_id = request.sender_id;
 385        let project_id = request.payload.project_id;
 386        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 387
 388        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 389            self.peer.send(
 390                conn_id,
 391                proto::RemoveProjectCollaborator {
 392                    project_id,
 393                    peer_id: sender_id.0,
 394                },
 395            )
 396        })?;
 397        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 398
 399        Ok(())
 400    }
 401
 402    async fn register_worktree(
 403        mut self: Arc<Server>,
 404        request: TypedEnvelope<proto::RegisterWorktree>,
 405    ) -> tide::Result<proto::Ack> {
 406        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 407
 408        let mut contact_user_ids = HashSet::default();
 409        contact_user_ids.insert(host_user_id);
 410        for github_login in request.payload.authorized_logins {
 411            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 412            contact_user_ids.insert(contact_user_id);
 413        }
 414
 415        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 416        self.state_mut().register_worktree(
 417            request.payload.project_id,
 418            request.payload.worktree_id,
 419            request.sender_id,
 420            Worktree {
 421                authorized_user_ids: contact_user_ids.clone(),
 422                root_name: request.payload.root_name,
 423                share: None,
 424                weak: false,
 425            },
 426        )?;
 427        self.update_contacts_for_users(&contact_user_ids)?;
 428        Ok(proto::Ack {})
 429    }
 430
 431    async fn unregister_worktree(
 432        mut self: Arc<Server>,
 433        request: TypedEnvelope<proto::UnregisterWorktree>,
 434    ) -> tide::Result<()> {
 435        let project_id = request.payload.project_id;
 436        let worktree_id = request.payload.worktree_id;
 437        let (worktree, guest_connection_ids) =
 438            self.state_mut()
 439                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 440        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 441            self.peer.send(
 442                conn_id,
 443                proto::UnregisterWorktree {
 444                    project_id,
 445                    worktree_id,
 446                },
 447            )
 448        })?;
 449        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 450        Ok(())
 451    }
 452
 453    async fn share_worktree(
 454        mut self: Arc<Server>,
 455        mut request: TypedEnvelope<proto::ShareWorktree>,
 456    ) -> tide::Result<proto::Ack> {
 457        let worktree = request
 458            .payload
 459            .worktree
 460            .as_mut()
 461            .ok_or_else(|| anyhow!("missing worktree"))?;
 462        let entries = worktree
 463            .entries
 464            .iter()
 465            .map(|entry| (entry.id, entry.clone()))
 466            .collect();
 467        let diagnostic_summaries = worktree
 468            .diagnostic_summaries
 469            .iter()
 470            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 471            .collect();
 472
 473        let shared_worktree = self.state_mut().share_worktree(
 474            request.payload.project_id,
 475            worktree.id,
 476            request.sender_id,
 477            entries,
 478            diagnostic_summaries,
 479        )?;
 480
 481        broadcast(
 482            request.sender_id,
 483            shared_worktree.connection_ids,
 484            |connection_id| {
 485                self.peer
 486                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 487            },
 488        )?;
 489        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 490
 491        Ok(proto::Ack {})
 492    }
 493
 494    async fn update_worktree(
 495        mut self: Arc<Server>,
 496        request: TypedEnvelope<proto::UpdateWorktree>,
 497    ) -> tide::Result<()> {
 498        let connection_ids = self.state_mut().update_worktree(
 499            request.sender_id,
 500            request.payload.project_id,
 501            request.payload.worktree_id,
 502            &request.payload.removed_entries,
 503            &request.payload.updated_entries,
 504        )?;
 505
 506        broadcast(request.sender_id, connection_ids, |connection_id| {
 507            self.peer
 508                .forward_send(request.sender_id, connection_id, request.payload.clone())
 509        })?;
 510
 511        Ok(())
 512    }
 513
 514    async fn update_diagnostic_summary(
 515        mut self: Arc<Server>,
 516        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 517    ) -> tide::Result<()> {
 518        let summary = request
 519            .payload
 520            .summary
 521            .clone()
 522            .ok_or_else(|| anyhow!("invalid summary"))?;
 523        let receiver_ids = self.state_mut().update_diagnostic_summary(
 524            request.payload.project_id,
 525            request.payload.worktree_id,
 526            request.sender_id,
 527            summary,
 528        )?;
 529
 530        broadcast(request.sender_id, receiver_ids, |connection_id| {
 531            self.peer
 532                .forward_send(request.sender_id, connection_id, request.payload.clone())
 533        })?;
 534        Ok(())
 535    }
 536
 537    async fn disk_based_diagnostics_updating(
 538        self: Arc<Server>,
 539        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 540    ) -> tide::Result<()> {
 541        let receiver_ids = self
 542            .state()
 543            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 544        broadcast(request.sender_id, receiver_ids, |connection_id| {
 545            self.peer
 546                .forward_send(request.sender_id, connection_id, request.payload.clone())
 547        })?;
 548        Ok(())
 549    }
 550
 551    async fn disk_based_diagnostics_updated(
 552        self: Arc<Server>,
 553        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 554    ) -> tide::Result<()> {
 555        let receiver_ids = self
 556            .state()
 557            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 558        broadcast(request.sender_id, receiver_ids, |connection_id| {
 559            self.peer
 560                .forward_send(request.sender_id, connection_id, request.payload.clone())
 561        })?;
 562        Ok(())
 563    }
 564
 565    async fn get_definition(
 566        self: Arc<Server>,
 567        request: TypedEnvelope<proto::GetDefinition>,
 568    ) -> tide::Result<proto::GetDefinitionResponse> {
 569        let host_connection_id = self
 570            .state()
 571            .read_project(request.payload.project_id, request.sender_id)?
 572            .host_connection_id;
 573        Ok(self
 574            .peer
 575            .forward_request(request.sender_id, host_connection_id, request.payload)
 576            .await?)
 577    }
 578
 579    async fn open_buffer(
 580        self: Arc<Server>,
 581        request: TypedEnvelope<proto::OpenBuffer>,
 582    ) -> tide::Result<proto::OpenBufferResponse> {
 583        let host_connection_id = self
 584            .state()
 585            .read_project(request.payload.project_id, request.sender_id)?
 586            .host_connection_id;
 587        Ok(self
 588            .peer
 589            .forward_request(request.sender_id, host_connection_id, request.payload)
 590            .await?)
 591    }
 592
 593    async fn close_buffer(
 594        self: Arc<Server>,
 595        request: TypedEnvelope<proto::CloseBuffer>,
 596    ) -> tide::Result<()> {
 597        let host_connection_id = self
 598            .state()
 599            .read_project(request.payload.project_id, request.sender_id)?
 600            .host_connection_id;
 601        self.peer
 602            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 603        Ok(())
 604    }
 605
 606    async fn save_buffer(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::SaveBuffer>,
 609    ) -> tide::Result<proto::BufferSaved> {
 610        let host;
 611        let mut guests;
 612        {
 613            let state = self.state();
 614            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 615            host = project.host_connection_id;
 616            guests = project.guest_connection_ids()
 617        }
 618
 619        let response = self
 620            .peer
 621            .forward_request(request.sender_id, host, request.payload.clone())
 622            .await?;
 623
 624        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 625        broadcast(host, guests, |conn_id| {
 626            self.peer.forward_send(host, conn_id, response.clone())
 627        })?;
 628
 629        Ok(response)
 630    }
 631
 632    async fn format_buffers(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::FormatBuffers>,
 635    ) -> tide::Result<proto::FormatBuffersResponse> {
 636        let host = self
 637            .state()
 638            .read_project(request.payload.project_id, request.sender_id)?
 639            .host_connection_id;
 640        Ok(self
 641            .peer
 642            .forward_request(request.sender_id, host, request.payload.clone())
 643            .await?)
 644    }
 645
 646    async fn get_completions(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::GetCompletions>,
 649    ) -> tide::Result<proto::GetCompletionsResponse> {
 650        let host = self
 651            .state()
 652            .read_project(request.payload.project_id, request.sender_id)?
 653            .host_connection_id;
 654        Ok(self
 655            .peer
 656            .forward_request(request.sender_id, host, request.payload.clone())
 657            .await?)
 658    }
 659
 660    async fn apply_additional_edits_for_completion(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 663    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 664        let host = self
 665            .state()
 666            .read_project(request.payload.project_id, request.sender_id)?
 667            .host_connection_id;
 668        Ok(self
 669            .peer
 670            .forward_request(request.sender_id, host, request.payload.clone())
 671            .await?)
 672    }
 673
 674    async fn get_code_actions(
 675        self: Arc<Server>,
 676        request: TypedEnvelope<proto::GetCodeActions>,
 677    ) -> tide::Result<proto::GetCodeActionsResponse> {
 678        let host = self
 679            .state()
 680            .read_project(request.payload.project_id, request.sender_id)?
 681            .host_connection_id;
 682        Ok(self
 683            .peer
 684            .forward_request(request.sender_id, host, request.payload.clone())
 685            .await?)
 686    }
 687
 688    async fn apply_code_action(
 689        self: Arc<Server>,
 690        request: TypedEnvelope<proto::ApplyCodeAction>,
 691    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 692        let host = self
 693            .state()
 694            .read_project(request.payload.project_id, request.sender_id)?
 695            .host_connection_id;
 696        Ok(self
 697            .peer
 698            .forward_request(request.sender_id, host, request.payload.clone())
 699            .await?)
 700    }
 701
 702    async fn update_buffer(
 703        self: Arc<Server>,
 704        request: TypedEnvelope<proto::UpdateBuffer>,
 705    ) -> tide::Result<proto::Ack> {
 706        let receiver_ids = self
 707            .state()
 708            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 709        broadcast(request.sender_id, receiver_ids, |connection_id| {
 710            self.peer
 711                .forward_send(request.sender_id, connection_id, request.payload.clone())
 712        })?;
 713        Ok(proto::Ack {})
 714    }
 715
 716    async fn update_buffer_file(
 717        self: Arc<Server>,
 718        request: TypedEnvelope<proto::UpdateBufferFile>,
 719    ) -> tide::Result<()> {
 720        let receiver_ids = self
 721            .state()
 722            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 723        broadcast(request.sender_id, receiver_ids, |connection_id| {
 724            self.peer
 725                .forward_send(request.sender_id, connection_id, request.payload.clone())
 726        })?;
 727        Ok(())
 728    }
 729
 730    async fn buffer_reloaded(
 731        self: Arc<Server>,
 732        request: TypedEnvelope<proto::BufferReloaded>,
 733    ) -> tide::Result<()> {
 734        let receiver_ids = self
 735            .state()
 736            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 737        broadcast(request.sender_id, receiver_ids, |connection_id| {
 738            self.peer
 739                .forward_send(request.sender_id, connection_id, request.payload.clone())
 740        })?;
 741        Ok(())
 742    }
 743
 744    async fn buffer_saved(
 745        self: Arc<Server>,
 746        request: TypedEnvelope<proto::BufferSaved>,
 747    ) -> tide::Result<()> {
 748        let receiver_ids = self
 749            .state()
 750            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 751        broadcast(request.sender_id, receiver_ids, |connection_id| {
 752            self.peer
 753                .forward_send(request.sender_id, connection_id, request.payload.clone())
 754        })?;
 755        Ok(())
 756    }
 757
 758    async fn get_channels(
 759        self: Arc<Server>,
 760        request: TypedEnvelope<proto::GetChannels>,
 761    ) -> tide::Result<proto::GetChannelsResponse> {
 762        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 763        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 764        Ok(proto::GetChannelsResponse {
 765            channels: channels
 766                .into_iter()
 767                .map(|chan| proto::Channel {
 768                    id: chan.id.to_proto(),
 769                    name: chan.name,
 770                })
 771                .collect(),
 772        })
 773    }
 774
 775    async fn get_users(
 776        self: Arc<Server>,
 777        request: TypedEnvelope<proto::GetUsers>,
 778    ) -> tide::Result<proto::GetUsersResponse> {
 779        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 780        let users = self
 781            .app_state
 782            .db
 783            .get_users_by_ids(user_ids)
 784            .await?
 785            .into_iter()
 786            .map(|user| proto::User {
 787                id: user.id.to_proto(),
 788                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 789                github_login: user.github_login,
 790            })
 791            .collect();
 792        Ok(proto::GetUsersResponse { users })
 793    }
 794
 795    fn update_contacts_for_users<'a>(
 796        self: &Arc<Server>,
 797        user_ids: impl IntoIterator<Item = &'a UserId>,
 798    ) -> anyhow::Result<()> {
 799        let mut result = Ok(());
 800        let state = self.state();
 801        for user_id in user_ids {
 802            let contacts = state.contacts_for_user(*user_id);
 803            for connection_id in state.connection_ids_for_user(*user_id) {
 804                if let Err(error) = self.peer.send(
 805                    connection_id,
 806                    proto::UpdateContacts {
 807                        contacts: contacts.clone(),
 808                    },
 809                ) {
 810                    result = Err(error);
 811                }
 812            }
 813        }
 814        result
 815    }
 816
 817    async fn join_channel(
 818        mut self: Arc<Self>,
 819        request: TypedEnvelope<proto::JoinChannel>,
 820    ) -> tide::Result<proto::JoinChannelResponse> {
 821        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 822        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 823        if !self
 824            .app_state
 825            .db
 826            .can_user_access_channel(user_id, channel_id)
 827            .await?
 828        {
 829            Err(anyhow!("access denied"))?;
 830        }
 831
 832        self.state_mut().join_channel(request.sender_id, channel_id);
 833        let messages = self
 834            .app_state
 835            .db
 836            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 837            .await?
 838            .into_iter()
 839            .map(|msg| proto::ChannelMessage {
 840                id: msg.id.to_proto(),
 841                body: msg.body,
 842                timestamp: msg.sent_at.unix_timestamp() as u64,
 843                sender_id: msg.sender_id.to_proto(),
 844                nonce: Some(msg.nonce.as_u128().into()),
 845            })
 846            .collect::<Vec<_>>();
 847        Ok(proto::JoinChannelResponse {
 848            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 849            messages,
 850        })
 851    }
 852
 853    async fn leave_channel(
 854        mut self: Arc<Self>,
 855        request: TypedEnvelope<proto::LeaveChannel>,
 856    ) -> tide::Result<()> {
 857        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 858        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 859        if !self
 860            .app_state
 861            .db
 862            .can_user_access_channel(user_id, channel_id)
 863            .await?
 864        {
 865            Err(anyhow!("access denied"))?;
 866        }
 867
 868        self.state_mut()
 869            .leave_channel(request.sender_id, channel_id);
 870
 871        Ok(())
 872    }
 873
 874    async fn send_channel_message(
 875        self: Arc<Self>,
 876        request: TypedEnvelope<proto::SendChannelMessage>,
 877    ) -> tide::Result<proto::SendChannelMessageResponse> {
 878        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 879        let user_id;
 880        let connection_ids;
 881        {
 882            let state = self.state();
 883            user_id = state.user_id_for_connection(request.sender_id)?;
 884            connection_ids = state.channel_connection_ids(channel_id)?;
 885        }
 886
 887        // Validate the message body.
 888        let body = request.payload.body.trim().to_string();
 889        if body.len() > MAX_MESSAGE_LEN {
 890            return Err(anyhow!("message is too long"))?;
 891        }
 892        if body.is_empty() {
 893            return Err(anyhow!("message can't be blank"))?;
 894        }
 895
 896        let timestamp = OffsetDateTime::now_utc();
 897        let nonce = request
 898            .payload
 899            .nonce
 900            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 901
 902        let message_id = self
 903            .app_state
 904            .db
 905            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 906            .await?
 907            .to_proto();
 908        let message = proto::ChannelMessage {
 909            sender_id: user_id.to_proto(),
 910            id: message_id,
 911            body,
 912            timestamp: timestamp.unix_timestamp() as u64,
 913            nonce: Some(nonce),
 914        };
 915        broadcast(request.sender_id, connection_ids, |conn_id| {
 916            self.peer.send(
 917                conn_id,
 918                proto::ChannelMessageSent {
 919                    channel_id: channel_id.to_proto(),
 920                    message: Some(message.clone()),
 921                },
 922            )
 923        })?;
 924        Ok(proto::SendChannelMessageResponse {
 925            message: Some(message),
 926        })
 927    }
 928
 929    async fn get_channel_messages(
 930        self: Arc<Self>,
 931        request: TypedEnvelope<proto::GetChannelMessages>,
 932    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 933        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 934        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 935        if !self
 936            .app_state
 937            .db
 938            .can_user_access_channel(user_id, channel_id)
 939            .await?
 940        {
 941            Err(anyhow!("access denied"))?;
 942        }
 943
 944        let messages = self
 945            .app_state
 946            .db
 947            .get_channel_messages(
 948                channel_id,
 949                MESSAGE_COUNT_PER_PAGE,
 950                Some(MessageId::from_proto(request.payload.before_message_id)),
 951            )
 952            .await?
 953            .into_iter()
 954            .map(|msg| proto::ChannelMessage {
 955                id: msg.id.to_proto(),
 956                body: msg.body,
 957                timestamp: msg.sent_at.unix_timestamp() as u64,
 958                sender_id: msg.sender_id.to_proto(),
 959                nonce: Some(msg.nonce.as_u128().into()),
 960            })
 961            .collect::<Vec<_>>();
 962
 963        Ok(proto::GetChannelMessagesResponse {
 964            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 965            messages,
 966        })
 967    }
 968
 969    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 970        self.store.read()
 971    }
 972
 973    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 974        self.store.write()
 975    }
 976}
 977
 978fn broadcast<F>(
 979    sender_id: ConnectionId,
 980    receiver_ids: Vec<ConnectionId>,
 981    mut f: F,
 982) -> anyhow::Result<()>
 983where
 984    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 985{
 986    let mut result = Ok(());
 987    for receiver_id in receiver_ids {
 988        if receiver_id != sender_id {
 989            if let Err(error) = f(receiver_id) {
 990                if result.is_ok() {
 991                    result = Err(error);
 992                }
 993            }
 994        }
 995    }
 996    result
 997}
 998
 999pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1000    let server = Server::new(app.state().clone(), rpc.clone(), None);
1001    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1002        let server = server.clone();
1003        async move {
1004            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1005
1006            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1007            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1008            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1009            let client_protocol_version: Option<u32> = request
1010                .header("X-Zed-Protocol-Version")
1011                .and_then(|v| v.as_str().parse().ok());
1012
1013            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1014                return Ok(Response::new(StatusCode::UpgradeRequired));
1015            }
1016
1017            let header = match request.header("Sec-Websocket-Key") {
1018                Some(h) => h.as_str(),
1019                None => return Err(anyhow!("expected sec-websocket-key"))?,
1020            };
1021
1022            let user_id = process_auth_header(&request).await?;
1023
1024            let mut response = Response::new(StatusCode::SwitchingProtocols);
1025            response.insert_header(UPGRADE, "websocket");
1026            response.insert_header(CONNECTION, "Upgrade");
1027            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1028            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1029            response.insert_header("Sec-Websocket-Version", "13");
1030
1031            let http_res: &mut tide::http::Response = response.as_mut();
1032            let upgrade_receiver = http_res.recv_upgrade().await;
1033            let addr = request.remote().unwrap_or("unknown").to_string();
1034            task::spawn(async move {
1035                if let Some(stream) = upgrade_receiver.await {
1036                    server
1037                        .handle_connection(
1038                            Connection::new(
1039                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1040                            ),
1041                            addr,
1042                            user_id,
1043                            None,
1044                        )
1045                        .await;
1046                }
1047            });
1048
1049            Ok(response)
1050        }
1051    });
1052}
1053
1054fn header_contains_ignore_case<T>(
1055    request: &tide::Request<T>,
1056    header_name: HeaderName,
1057    value: &str,
1058) -> bool {
1059    request
1060        .header(header_name)
1061        .map(|h| {
1062            h.as_str()
1063                .split(',')
1064                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1065        })
1066        .unwrap_or(false)
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072    use crate::{
1073        auth,
1074        db::{tests::TestDb, UserId},
1075        github, AppState, Config,
1076    };
1077    use ::rpc::Peer;
1078    use async_std::task;
1079    use gpui::{executor, ModelHandle, TestAppContext};
1080    use parking_lot::Mutex;
1081    use postage::{mpsc, watch};
1082    use rand::prelude::*;
1083    use rpc::PeerId;
1084    use serde_json::json;
1085    use sqlx::types::time::OffsetDateTime;
1086    use std::{
1087        ops::Deref,
1088        path::Path,
1089        rc::Rc,
1090        sync::{
1091            atomic::{AtomicBool, Ordering::SeqCst},
1092            Arc,
1093        },
1094        time::Duration,
1095    };
1096    use zed::{
1097        client::{
1098            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1099            EstablishConnectionError, UserStore,
1100        },
1101        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1102        fs::{FakeFs, Fs as _},
1103        language::{
1104            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1105            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1106        },
1107        lsp,
1108        project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath},
1109    };
1110
1111    #[cfg(test)]
1112    #[ctor::ctor]
1113    fn init_logger() {
1114        if std::env::var("RUST_LOG").is_ok() {
1115            env_logger::init();
1116        }
1117    }
1118
1119    #[gpui::test(iterations = 10)]
1120    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1121        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1122        let lang_registry = Arc::new(LanguageRegistry::new());
1123        let fs = Arc::new(FakeFs::new(cx_a.background()));
1124        cx_a.foreground().forbid_parking();
1125
1126        // Connect to a server as 2 clients.
1127        let mut server = TestServer::start(cx_a.foreground()).await;
1128        let client_a = server.create_client(&mut cx_a, "user_a").await;
1129        let client_b = server.create_client(&mut cx_b, "user_b").await;
1130
1131        // Share a project as client A
1132        fs.insert_tree(
1133            "/a",
1134            json!({
1135                ".zed.toml": r#"collaborators = ["user_b"]"#,
1136                "a.txt": "a-contents",
1137                "b.txt": "b-contents",
1138            }),
1139        )
1140        .await;
1141        let project_a = cx_a.update(|cx| {
1142            Project::local(
1143                client_a.clone(),
1144                client_a.user_store.clone(),
1145                lang_registry.clone(),
1146                fs.clone(),
1147                cx,
1148            )
1149        });
1150        let (worktree_a, _) = project_a
1151            .update(&mut cx_a, |p, cx| {
1152                p.find_or_create_local_worktree("/a", false, cx)
1153            })
1154            .await
1155            .unwrap();
1156        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1157        worktree_a
1158            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1159            .await;
1160        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1161        project_a
1162            .update(&mut cx_a, |p, cx| p.share(cx))
1163            .await
1164            .unwrap();
1165
1166        // Join that project as client B
1167        let project_b = Project::remote(
1168            project_id,
1169            client_b.clone(),
1170            client_b.user_store.clone(),
1171            lang_registry.clone(),
1172            fs.clone(),
1173            &mut cx_b.to_async(),
1174        )
1175        .await
1176        .unwrap();
1177
1178        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1179            assert_eq!(
1180                project
1181                    .collaborators()
1182                    .get(&client_a.peer_id)
1183                    .unwrap()
1184                    .user
1185                    .github_login,
1186                "user_a"
1187            );
1188            project.replica_id()
1189        });
1190        project_a
1191            .condition(&cx_a, |tree, _| {
1192                tree.collaborators()
1193                    .get(&client_b.peer_id)
1194                    .map_or(false, |collaborator| {
1195                        collaborator.replica_id == replica_id_b
1196                            && collaborator.user.github_login == "user_b"
1197                    })
1198            })
1199            .await;
1200
1201        // Open the same file as client B and client A.
1202        let buffer_b = project_b
1203            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1204            .await
1205            .unwrap();
1206        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1207        buffer_b.read_with(&cx_b, |buf, cx| {
1208            assert_eq!(buf.read(cx).text(), "b-contents")
1209        });
1210        project_a.read_with(&cx_a, |project, cx| {
1211            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1212        });
1213        let buffer_a = project_a
1214            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1215            .await
1216            .unwrap();
1217
1218        let editor_b = cx_b.add_view(window_b, |cx| {
1219            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1220        });
1221
1222        // TODO
1223        // // Create a selection set as client B and see that selection set as client A.
1224        // buffer_a
1225        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1226        //     .await;
1227
1228        // Edit the buffer as client B and see that edit as client A.
1229        editor_b.update(&mut cx_b, |editor, cx| {
1230            editor.handle_input(&Input("ok, ".into()), cx)
1231        });
1232        buffer_a
1233            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1234            .await;
1235
1236        // TODO
1237        // // Remove the selection set as client B, see those selections disappear as client A.
1238        cx_b.update(move |_| drop(editor_b));
1239        // buffer_a
1240        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1241        //     .await;
1242
1243        // Close the buffer as client A, see that the buffer is closed.
1244        cx_a.update(move |_| drop(buffer_a));
1245        project_a
1246            .condition(&cx_a, |project, cx| {
1247                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1248            })
1249            .await;
1250
1251        // Dropping the client B's project removes client B from client A's collaborators.
1252        cx_b.update(move |_| drop(project_b));
1253        project_a
1254            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1255            .await;
1256    }
1257
1258    #[gpui::test(iterations = 10)]
1259    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1260        let lang_registry = Arc::new(LanguageRegistry::new());
1261        let fs = Arc::new(FakeFs::new(cx_a.background()));
1262        cx_a.foreground().forbid_parking();
1263
1264        // Connect to a server as 2 clients.
1265        let mut server = TestServer::start(cx_a.foreground()).await;
1266        let client_a = server.create_client(&mut cx_a, "user_a").await;
1267        let client_b = server.create_client(&mut cx_b, "user_b").await;
1268
1269        // Share a project as client A
1270        fs.insert_tree(
1271            "/a",
1272            json!({
1273                ".zed.toml": r#"collaborators = ["user_b"]"#,
1274                "a.txt": "a-contents",
1275                "b.txt": "b-contents",
1276            }),
1277        )
1278        .await;
1279        let project_a = cx_a.update(|cx| {
1280            Project::local(
1281                client_a.clone(),
1282                client_a.user_store.clone(),
1283                lang_registry.clone(),
1284                fs.clone(),
1285                cx,
1286            )
1287        });
1288        let (worktree_a, _) = project_a
1289            .update(&mut cx_a, |p, cx| {
1290                p.find_or_create_local_worktree("/a", false, cx)
1291            })
1292            .await
1293            .unwrap();
1294        worktree_a
1295            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1296            .await;
1297        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1298        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1299        project_a
1300            .update(&mut cx_a, |p, cx| p.share(cx))
1301            .await
1302            .unwrap();
1303        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1304
1305        // Join that project as client B
1306        let project_b = Project::remote(
1307            project_id,
1308            client_b.clone(),
1309            client_b.user_store.clone(),
1310            lang_registry.clone(),
1311            fs.clone(),
1312            &mut cx_b.to_async(),
1313        )
1314        .await
1315        .unwrap();
1316        project_b
1317            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1318            .await
1319            .unwrap();
1320
1321        // Unshare the project as client A
1322        project_a
1323            .update(&mut cx_a, |project, cx| project.unshare(cx))
1324            .await
1325            .unwrap();
1326        project_b
1327            .condition(&mut cx_b, |project, _| project.is_read_only())
1328            .await;
1329        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1330        drop(project_b);
1331
1332        // Share the project again and ensure guests can still join.
1333        project_a
1334            .update(&mut cx_a, |project, cx| project.share(cx))
1335            .await
1336            .unwrap();
1337        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1338
1339        let project_c = Project::remote(
1340            project_id,
1341            client_b.clone(),
1342            client_b.user_store.clone(),
1343            lang_registry.clone(),
1344            fs.clone(),
1345            &mut cx_b.to_async(),
1346        )
1347        .await
1348        .unwrap();
1349        project_c
1350            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1351            .await
1352            .unwrap();
1353    }
1354
1355    #[gpui::test(iterations = 10)]
1356    async fn test_propagate_saves_and_fs_changes(
1357        mut cx_a: TestAppContext,
1358        mut cx_b: TestAppContext,
1359        mut cx_c: TestAppContext,
1360    ) {
1361        let lang_registry = Arc::new(LanguageRegistry::new());
1362        let fs = Arc::new(FakeFs::new(cx_a.background()));
1363        cx_a.foreground().forbid_parking();
1364
1365        // Connect to a server as 3 clients.
1366        let mut server = TestServer::start(cx_a.foreground()).await;
1367        let client_a = server.create_client(&mut cx_a, "user_a").await;
1368        let client_b = server.create_client(&mut cx_b, "user_b").await;
1369        let client_c = server.create_client(&mut cx_c, "user_c").await;
1370
1371        // Share a worktree as client A.
1372        fs.insert_tree(
1373            "/a",
1374            json!({
1375                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1376                "file1": "",
1377                "file2": ""
1378            }),
1379        )
1380        .await;
1381        let project_a = cx_a.update(|cx| {
1382            Project::local(
1383                client_a.clone(),
1384                client_a.user_store.clone(),
1385                lang_registry.clone(),
1386                fs.clone(),
1387                cx,
1388            )
1389        });
1390        let (worktree_a, _) = project_a
1391            .update(&mut cx_a, |p, cx| {
1392                p.find_or_create_local_worktree("/a", false, cx)
1393            })
1394            .await
1395            .unwrap();
1396        worktree_a
1397            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1398            .await;
1399        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1400        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1401        project_a
1402            .update(&mut cx_a, |p, cx| p.share(cx))
1403            .await
1404            .unwrap();
1405
1406        // Join that worktree as clients B and C.
1407        let project_b = Project::remote(
1408            project_id,
1409            client_b.clone(),
1410            client_b.user_store.clone(),
1411            lang_registry.clone(),
1412            fs.clone(),
1413            &mut cx_b.to_async(),
1414        )
1415        .await
1416        .unwrap();
1417        let project_c = Project::remote(
1418            project_id,
1419            client_c.clone(),
1420            client_c.user_store.clone(),
1421            lang_registry.clone(),
1422            fs.clone(),
1423            &mut cx_c.to_async(),
1424        )
1425        .await
1426        .unwrap();
1427        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1428        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1429
1430        // Open and edit a buffer as both guests B and C.
1431        let buffer_b = project_b
1432            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1433            .await
1434            .unwrap();
1435        let buffer_c = project_c
1436            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1437            .await
1438            .unwrap();
1439        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1440        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1441
1442        // Open and edit that buffer as the host.
1443        let buffer_a = project_a
1444            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1445            .await
1446            .unwrap();
1447
1448        buffer_a
1449            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1450            .await;
1451        buffer_a.update(&mut cx_a, |buf, cx| {
1452            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1453        });
1454
1455        // Wait for edits to propagate
1456        buffer_a
1457            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1458            .await;
1459        buffer_b
1460            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1461            .await;
1462        buffer_c
1463            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1464            .await;
1465
1466        // Edit the buffer as the host and concurrently save as guest B.
1467        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1468        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1469        save_b.await.unwrap();
1470        assert_eq!(
1471            fs.load("/a/file1".as_ref()).await.unwrap(),
1472            "hi-a, i-am-c, i-am-b, i-am-a"
1473        );
1474        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1475        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1476        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1477
1478        // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise
1479        // when interpreting the change event it will mistakenly think that the file has been
1480        // deleted (because its path has changed) and will subsequently fail to detect the rename.
1481        worktree_a.flush_fs_events(&cx_a).await;
1482
1483        // Make changes on host's file system, see those changes on guest worktrees.
1484        fs.rename(
1485            "/a/file1".as_ref(),
1486            "/a/file1-renamed".as_ref(),
1487            Default::default(),
1488        )
1489        .await
1490        .unwrap();
1491        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1492            .await
1493            .unwrap();
1494        fs.insert_file(Path::new("/a/file4"), "4".into())
1495            .await
1496            .unwrap();
1497
1498        worktree_a
1499            .condition(&cx_a, |tree, _| {
1500                tree.paths()
1501                    .map(|p| p.to_string_lossy())
1502                    .collect::<Vec<_>>()
1503                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1504            })
1505            .await;
1506        worktree_b
1507            .condition(&cx_b, |tree, _| {
1508                tree.paths()
1509                    .map(|p| p.to_string_lossy())
1510                    .collect::<Vec<_>>()
1511                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1512            })
1513            .await;
1514        worktree_c
1515            .condition(&cx_c, |tree, _| {
1516                tree.paths()
1517                    .map(|p| p.to_string_lossy())
1518                    .collect::<Vec<_>>()
1519                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1520            })
1521            .await;
1522
1523        // Ensure buffer files are updated as well.
1524        buffer_a
1525            .condition(&cx_a, |buf, _| {
1526                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1527            })
1528            .await;
1529        buffer_b
1530            .condition(&cx_b, |buf, _| {
1531                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1532            })
1533            .await;
1534        buffer_c
1535            .condition(&cx_c, |buf, _| {
1536                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1537            })
1538            .await;
1539    }
1540
1541    #[gpui::test(iterations = 10)]
1542    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1543        cx_a.foreground().forbid_parking();
1544        let lang_registry = Arc::new(LanguageRegistry::new());
1545        let fs = Arc::new(FakeFs::new(cx_a.background()));
1546
1547        // Connect to a server as 2 clients.
1548        let mut server = TestServer::start(cx_a.foreground()).await;
1549        let client_a = server.create_client(&mut cx_a, "user_a").await;
1550        let client_b = server.create_client(&mut cx_b, "user_b").await;
1551
1552        // Share a project as client A
1553        fs.insert_tree(
1554            "/dir",
1555            json!({
1556                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1557                "a.txt": "a-contents",
1558            }),
1559        )
1560        .await;
1561
1562        let project_a = cx_a.update(|cx| {
1563            Project::local(
1564                client_a.clone(),
1565                client_a.user_store.clone(),
1566                lang_registry.clone(),
1567                fs.clone(),
1568                cx,
1569            )
1570        });
1571        let (worktree_a, _) = project_a
1572            .update(&mut cx_a, |p, cx| {
1573                p.find_or_create_local_worktree("/dir", false, cx)
1574            })
1575            .await
1576            .unwrap();
1577        worktree_a
1578            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1579            .await;
1580        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1581        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1582        project_a
1583            .update(&mut cx_a, |p, cx| p.share(cx))
1584            .await
1585            .unwrap();
1586
1587        // Join that project as client B
1588        let project_b = Project::remote(
1589            project_id,
1590            client_b.clone(),
1591            client_b.user_store.clone(),
1592            lang_registry.clone(),
1593            fs.clone(),
1594            &mut cx_b.to_async(),
1595        )
1596        .await
1597        .unwrap();
1598        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1599
1600        // Open a buffer as client B
1601        let buffer_b = project_b
1602            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1603            .await
1604            .unwrap();
1605        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1606
1607        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1608        buffer_b.read_with(&cx_b, |buf, _| {
1609            assert!(buf.is_dirty());
1610            assert!(!buf.has_conflict());
1611        });
1612
1613        buffer_b
1614            .update(&mut cx_b, |buf, cx| buf.save(cx))
1615            .await
1616            .unwrap();
1617        worktree_b
1618            .condition(&cx_b, |_, cx| {
1619                buffer_b.read(cx).file().unwrap().mtime() != mtime
1620            })
1621            .await;
1622        buffer_b.read_with(&cx_b, |buf, _| {
1623            assert!(!buf.is_dirty());
1624            assert!(!buf.has_conflict());
1625        });
1626
1627        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1628        buffer_b.read_with(&cx_b, |buf, _| {
1629            assert!(buf.is_dirty());
1630            assert!(!buf.has_conflict());
1631        });
1632    }
1633
1634    #[gpui::test(iterations = 10)]
1635    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1636        cx_a.foreground().forbid_parking();
1637        let lang_registry = Arc::new(LanguageRegistry::new());
1638        let fs = Arc::new(FakeFs::new(cx_a.background()));
1639
1640        // Connect to a server as 2 clients.
1641        let mut server = TestServer::start(cx_a.foreground()).await;
1642        let client_a = server.create_client(&mut cx_a, "user_a").await;
1643        let client_b = server.create_client(&mut cx_b, "user_b").await;
1644
1645        // Share a project as client A
1646        fs.insert_tree(
1647            "/dir",
1648            json!({
1649                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1650                "a.txt": "a-contents",
1651            }),
1652        )
1653        .await;
1654
1655        let project_a = cx_a.update(|cx| {
1656            Project::local(
1657                client_a.clone(),
1658                client_a.user_store.clone(),
1659                lang_registry.clone(),
1660                fs.clone(),
1661                cx,
1662            )
1663        });
1664        let (worktree_a, _) = project_a
1665            .update(&mut cx_a, |p, cx| {
1666                p.find_or_create_local_worktree("/dir", false, cx)
1667            })
1668            .await
1669            .unwrap();
1670        worktree_a
1671            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1672            .await;
1673        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1674        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1675        project_a
1676            .update(&mut cx_a, |p, cx| p.share(cx))
1677            .await
1678            .unwrap();
1679
1680        // Join that project as client B
1681        let project_b = Project::remote(
1682            project_id,
1683            client_b.clone(),
1684            client_b.user_store.clone(),
1685            lang_registry.clone(),
1686            fs.clone(),
1687            &mut cx_b.to_async(),
1688        )
1689        .await
1690        .unwrap();
1691        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1692
1693        // Open a buffer as client B
1694        let buffer_b = project_b
1695            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1696            .await
1697            .unwrap();
1698        buffer_b.read_with(&cx_b, |buf, _| {
1699            assert!(!buf.is_dirty());
1700            assert!(!buf.has_conflict());
1701        });
1702
1703        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1704            .await
1705            .unwrap();
1706        buffer_b
1707            .condition(&cx_b, |buf, _| {
1708                buf.text() == "new contents" && !buf.is_dirty()
1709            })
1710            .await;
1711        buffer_b.read_with(&cx_b, |buf, _| {
1712            assert!(!buf.has_conflict());
1713        });
1714    }
1715
1716    #[gpui::test(iterations = 10)]
1717    async fn test_editing_while_guest_opens_buffer(
1718        mut cx_a: TestAppContext,
1719        mut cx_b: TestAppContext,
1720    ) {
1721        cx_a.foreground().forbid_parking();
1722        let lang_registry = Arc::new(LanguageRegistry::new());
1723        let fs = Arc::new(FakeFs::new(cx_a.background()));
1724
1725        // Connect to a server as 2 clients.
1726        let mut server = TestServer::start(cx_a.foreground()).await;
1727        let client_a = server.create_client(&mut cx_a, "user_a").await;
1728        let client_b = server.create_client(&mut cx_b, "user_b").await;
1729
1730        // Share a project as client A
1731        fs.insert_tree(
1732            "/dir",
1733            json!({
1734                ".zed.toml": r#"collaborators = ["user_b"]"#,
1735                "a.txt": "a-contents",
1736            }),
1737        )
1738        .await;
1739        let project_a = cx_a.update(|cx| {
1740            Project::local(
1741                client_a.clone(),
1742                client_a.user_store.clone(),
1743                lang_registry.clone(),
1744                fs.clone(),
1745                cx,
1746            )
1747        });
1748        let (worktree_a, _) = project_a
1749            .update(&mut cx_a, |p, cx| {
1750                p.find_or_create_local_worktree("/dir", false, cx)
1751            })
1752            .await
1753            .unwrap();
1754        worktree_a
1755            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1756            .await;
1757        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1758        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1759        project_a
1760            .update(&mut cx_a, |p, cx| p.share(cx))
1761            .await
1762            .unwrap();
1763
1764        // Join that project as client B
1765        let project_b = Project::remote(
1766            project_id,
1767            client_b.clone(),
1768            client_b.user_store.clone(),
1769            lang_registry.clone(),
1770            fs.clone(),
1771            &mut cx_b.to_async(),
1772        )
1773        .await
1774        .unwrap();
1775
1776        // Open a buffer as client A
1777        let buffer_a = project_a
1778            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1779            .await
1780            .unwrap();
1781
1782        // Start opening the same buffer as client B
1783        let buffer_b = cx_b
1784            .background()
1785            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1786        task::yield_now().await;
1787
1788        // Edit the buffer as client A while client B is still opening it.
1789        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1790
1791        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1792        let buffer_b = buffer_b.await.unwrap();
1793        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1794    }
1795
1796    #[gpui::test(iterations = 10)]
1797    async fn test_leaving_worktree_while_opening_buffer(
1798        mut cx_a: TestAppContext,
1799        mut cx_b: TestAppContext,
1800    ) {
1801        cx_a.foreground().forbid_parking();
1802        let lang_registry = Arc::new(LanguageRegistry::new());
1803        let fs = Arc::new(FakeFs::new(cx_a.background()));
1804
1805        // Connect to a server as 2 clients.
1806        let mut server = TestServer::start(cx_a.foreground()).await;
1807        let client_a = server.create_client(&mut cx_a, "user_a").await;
1808        let client_b = server.create_client(&mut cx_b, "user_b").await;
1809
1810        // Share a project as client A
1811        fs.insert_tree(
1812            "/dir",
1813            json!({
1814                ".zed.toml": r#"collaborators = ["user_b"]"#,
1815                "a.txt": "a-contents",
1816            }),
1817        )
1818        .await;
1819        let project_a = cx_a.update(|cx| {
1820            Project::local(
1821                client_a.clone(),
1822                client_a.user_store.clone(),
1823                lang_registry.clone(),
1824                fs.clone(),
1825                cx,
1826            )
1827        });
1828        let (worktree_a, _) = project_a
1829            .update(&mut cx_a, |p, cx| {
1830                p.find_or_create_local_worktree("/dir", false, cx)
1831            })
1832            .await
1833            .unwrap();
1834        worktree_a
1835            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1836            .await;
1837        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1838        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1839        project_a
1840            .update(&mut cx_a, |p, cx| p.share(cx))
1841            .await
1842            .unwrap();
1843
1844        // Join that project as client B
1845        let project_b = Project::remote(
1846            project_id,
1847            client_b.clone(),
1848            client_b.user_store.clone(),
1849            lang_registry.clone(),
1850            fs.clone(),
1851            &mut cx_b.to_async(),
1852        )
1853        .await
1854        .unwrap();
1855
1856        // See that a guest has joined as client A.
1857        project_a
1858            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1859            .await;
1860
1861        // Begin opening a buffer as client B, but leave the project before the open completes.
1862        let buffer_b = cx_b
1863            .background()
1864            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1865        cx_b.update(|_| drop(project_b));
1866        drop(buffer_b);
1867
1868        // See that the guest has left.
1869        project_a
1870            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1871            .await;
1872    }
1873
1874    #[gpui::test(iterations = 10)]
1875    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1876        cx_a.foreground().forbid_parking();
1877        let lang_registry = Arc::new(LanguageRegistry::new());
1878        let fs = Arc::new(FakeFs::new(cx_a.background()));
1879
1880        // Connect to a server as 2 clients.
1881        let mut server = TestServer::start(cx_a.foreground()).await;
1882        let client_a = server.create_client(&mut cx_a, "user_a").await;
1883        let client_b = server.create_client(&mut cx_b, "user_b").await;
1884
1885        // Share a project as client A
1886        fs.insert_tree(
1887            "/a",
1888            json!({
1889                ".zed.toml": r#"collaborators = ["user_b"]"#,
1890                "a.txt": "a-contents",
1891                "b.txt": "b-contents",
1892            }),
1893        )
1894        .await;
1895        let project_a = cx_a.update(|cx| {
1896            Project::local(
1897                client_a.clone(),
1898                client_a.user_store.clone(),
1899                lang_registry.clone(),
1900                fs.clone(),
1901                cx,
1902            )
1903        });
1904        let (worktree_a, _) = project_a
1905            .update(&mut cx_a, |p, cx| {
1906                p.find_or_create_local_worktree("/a", false, cx)
1907            })
1908            .await
1909            .unwrap();
1910        worktree_a
1911            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1912            .await;
1913        let project_id = project_a
1914            .update(&mut cx_a, |project, _| project.next_remote_id())
1915            .await;
1916        project_a
1917            .update(&mut cx_a, |project, cx| project.share(cx))
1918            .await
1919            .unwrap();
1920
1921        // Join that project as client B
1922        let _project_b = Project::remote(
1923            project_id,
1924            client_b.clone(),
1925            client_b.user_store.clone(),
1926            lang_registry.clone(),
1927            fs.clone(),
1928            &mut cx_b.to_async(),
1929        )
1930        .await
1931        .unwrap();
1932
1933        // See that a guest has joined as client A.
1934        project_a
1935            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1936            .await;
1937
1938        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1939        client_b.disconnect(&cx_b.to_async()).unwrap();
1940        project_a
1941            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1942            .await;
1943    }
1944
1945    #[gpui::test(iterations = 10)]
1946    async fn test_collaborating_with_diagnostics(
1947        mut cx_a: TestAppContext,
1948        mut cx_b: TestAppContext,
1949    ) {
1950        cx_a.foreground().forbid_parking();
1951        let mut lang_registry = Arc::new(LanguageRegistry::new());
1952        let fs = Arc::new(FakeFs::new(cx_a.background()));
1953
1954        // Set up a fake language server.
1955        let (language_server_config, mut fake_language_server) =
1956            LanguageServerConfig::fake(&cx_a).await;
1957        Arc::get_mut(&mut lang_registry)
1958            .unwrap()
1959            .add(Arc::new(Language::new(
1960                LanguageConfig {
1961                    name: "Rust".to_string(),
1962                    path_suffixes: vec!["rs".to_string()],
1963                    language_server: Some(language_server_config),
1964                    ..Default::default()
1965                },
1966                Some(tree_sitter_rust::language()),
1967            )));
1968
1969        // Connect to a server as 2 clients.
1970        let mut server = TestServer::start(cx_a.foreground()).await;
1971        let client_a = server.create_client(&mut cx_a, "user_a").await;
1972        let client_b = server.create_client(&mut cx_b, "user_b").await;
1973
1974        // Share a project as client A
1975        fs.insert_tree(
1976            "/a",
1977            json!({
1978                ".zed.toml": r#"collaborators = ["user_b"]"#,
1979                "a.rs": "let one = two",
1980                "other.rs": "",
1981            }),
1982        )
1983        .await;
1984        let project_a = cx_a.update(|cx| {
1985            Project::local(
1986                client_a.clone(),
1987                client_a.user_store.clone(),
1988                lang_registry.clone(),
1989                fs.clone(),
1990                cx,
1991            )
1992        });
1993        let (worktree_a, _) = project_a
1994            .update(&mut cx_a, |p, cx| {
1995                p.find_or_create_local_worktree("/a", false, cx)
1996            })
1997            .await
1998            .unwrap();
1999        worktree_a
2000            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2001            .await;
2002        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2003        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2004        project_a
2005            .update(&mut cx_a, |p, cx| p.share(cx))
2006            .await
2007            .unwrap();
2008
2009        // Cause the language server to start.
2010        let _ = cx_a
2011            .background()
2012            .spawn(project_a.update(&mut cx_a, |project, cx| {
2013                project.open_buffer(
2014                    ProjectPath {
2015                        worktree_id,
2016                        path: Path::new("other.rs").into(),
2017                    },
2018                    cx,
2019                )
2020            }))
2021            .await
2022            .unwrap();
2023
2024        // Simulate a language server reporting errors for a file.
2025        fake_language_server
2026            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2027                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2028                version: None,
2029                diagnostics: vec![lsp::Diagnostic {
2030                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2031                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2032                    message: "message 1".to_string(),
2033                    ..Default::default()
2034                }],
2035            })
2036            .await;
2037
2038        // Wait for server to see the diagnostics update.
2039        server
2040            .condition(|store| {
2041                let worktree = store
2042                    .project(project_id)
2043                    .unwrap()
2044                    .worktrees
2045                    .get(&worktree_id.to_proto())
2046                    .unwrap();
2047
2048                !worktree
2049                    .share
2050                    .as_ref()
2051                    .unwrap()
2052                    .diagnostic_summaries
2053                    .is_empty()
2054            })
2055            .await;
2056
2057        // Join the worktree as client B.
2058        let project_b = Project::remote(
2059            project_id,
2060            client_b.clone(),
2061            client_b.user_store.clone(),
2062            lang_registry.clone(),
2063            fs.clone(),
2064            &mut cx_b.to_async(),
2065        )
2066        .await
2067        .unwrap();
2068
2069        project_b.read_with(&cx_b, |project, cx| {
2070            assert_eq!(
2071                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2072                &[(
2073                    ProjectPath {
2074                        worktree_id,
2075                        path: Arc::from(Path::new("a.rs")),
2076                    },
2077                    DiagnosticSummary {
2078                        error_count: 1,
2079                        warning_count: 0,
2080                        ..Default::default()
2081                    },
2082                )]
2083            )
2084        });
2085
2086        // Simulate a language server reporting more errors for a file.
2087        fake_language_server
2088            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2089                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2090                version: None,
2091                diagnostics: vec![
2092                    lsp::Diagnostic {
2093                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2094                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2095                        message: "message 1".to_string(),
2096                        ..Default::default()
2097                    },
2098                    lsp::Diagnostic {
2099                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2100                        range: lsp::Range::new(
2101                            lsp::Position::new(0, 10),
2102                            lsp::Position::new(0, 13),
2103                        ),
2104                        message: "message 2".to_string(),
2105                        ..Default::default()
2106                    },
2107                ],
2108            })
2109            .await;
2110
2111        // Client b gets the updated summaries
2112        project_b
2113            .condition(&cx_b, |project, cx| {
2114                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2115                    == &[(
2116                        ProjectPath {
2117                            worktree_id,
2118                            path: Arc::from(Path::new("a.rs")),
2119                        },
2120                        DiagnosticSummary {
2121                            error_count: 1,
2122                            warning_count: 1,
2123                            ..Default::default()
2124                        },
2125                    )]
2126            })
2127            .await;
2128
2129        // Open the file with the errors on client B. They should be present.
2130        let buffer_b = cx_b
2131            .background()
2132            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2133            .await
2134            .unwrap();
2135
2136        buffer_b.read_with(&cx_b, |buffer, _| {
2137            assert_eq!(
2138                buffer
2139                    .snapshot()
2140                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2141                    .map(|entry| entry)
2142                    .collect::<Vec<_>>(),
2143                &[
2144                    DiagnosticEntry {
2145                        range: Point::new(0, 4)..Point::new(0, 7),
2146                        diagnostic: Diagnostic {
2147                            group_id: 0,
2148                            message: "message 1".to_string(),
2149                            severity: lsp::DiagnosticSeverity::ERROR,
2150                            is_primary: true,
2151                            ..Default::default()
2152                        }
2153                    },
2154                    DiagnosticEntry {
2155                        range: Point::new(0, 10)..Point::new(0, 13),
2156                        diagnostic: Diagnostic {
2157                            group_id: 1,
2158                            severity: lsp::DiagnosticSeverity::WARNING,
2159                            message: "message 2".to_string(),
2160                            is_primary: true,
2161                            ..Default::default()
2162                        }
2163                    }
2164                ]
2165            );
2166        });
2167    }
2168
2169    #[gpui::test(iterations = 10)]
2170    async fn test_collaborating_with_completion(
2171        mut cx_a: TestAppContext,
2172        mut cx_b: TestAppContext,
2173    ) {
2174        cx_a.foreground().forbid_parking();
2175        let mut lang_registry = Arc::new(LanguageRegistry::new());
2176        let fs = Arc::new(FakeFs::new(cx_a.background()));
2177
2178        // Set up a fake language server.
2179        let (language_server_config, mut fake_language_server) =
2180            LanguageServerConfig::fake_with_capabilities(
2181                lsp::ServerCapabilities {
2182                    completion_provider: Some(lsp::CompletionOptions {
2183                        trigger_characters: Some(vec![".".to_string()]),
2184                        ..Default::default()
2185                    }),
2186                    ..Default::default()
2187                },
2188                &cx_a,
2189            )
2190            .await;
2191        Arc::get_mut(&mut lang_registry)
2192            .unwrap()
2193            .add(Arc::new(Language::new(
2194                LanguageConfig {
2195                    name: "Rust".to_string(),
2196                    path_suffixes: vec!["rs".to_string()],
2197                    language_server: Some(language_server_config),
2198                    ..Default::default()
2199                },
2200                Some(tree_sitter_rust::language()),
2201            )));
2202
2203        // Connect to a server as 2 clients.
2204        let mut server = TestServer::start(cx_a.foreground()).await;
2205        let client_a = server.create_client(&mut cx_a, "user_a").await;
2206        let client_b = server.create_client(&mut cx_b, "user_b").await;
2207
2208        // Share a project as client A
2209        fs.insert_tree(
2210            "/a",
2211            json!({
2212                ".zed.toml": r#"collaborators = ["user_b"]"#,
2213                "main.rs": "fn main() { a }",
2214                "other.rs": "",
2215            }),
2216        )
2217        .await;
2218        let project_a = cx_a.update(|cx| {
2219            Project::local(
2220                client_a.clone(),
2221                client_a.user_store.clone(),
2222                lang_registry.clone(),
2223                fs.clone(),
2224                cx,
2225            )
2226        });
2227        let (worktree_a, _) = project_a
2228            .update(&mut cx_a, |p, cx| {
2229                p.find_or_create_local_worktree("/a", false, cx)
2230            })
2231            .await
2232            .unwrap();
2233        worktree_a
2234            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2235            .await;
2236        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2237        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2238        project_a
2239            .update(&mut cx_a, |p, cx| p.share(cx))
2240            .await
2241            .unwrap();
2242
2243        // Join the worktree as client B.
2244        let project_b = Project::remote(
2245            project_id,
2246            client_b.clone(),
2247            client_b.user_store.clone(),
2248            lang_registry.clone(),
2249            fs.clone(),
2250            &mut cx_b.to_async(),
2251        )
2252        .await
2253        .unwrap();
2254
2255        // Open a file in an editor as the guest.
2256        let buffer_b = project_b
2257            .update(&mut cx_b, |p, cx| {
2258                p.open_buffer((worktree_id, "main.rs"), cx)
2259            })
2260            .await
2261            .unwrap();
2262        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2263        let editor_b = cx_b.add_view(window_b, |cx| {
2264            Editor::for_buffer(
2265                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2266                Arc::new(|cx| EditorSettings::test(cx)),
2267                Some(project_b.clone()),
2268                cx,
2269            )
2270        });
2271
2272        // Type a completion trigger character as the guest.
2273        editor_b.update(&mut cx_b, |editor, cx| {
2274            editor.select_ranges([13..13], None, cx);
2275            editor.handle_input(&Input(".".into()), cx);
2276            cx.focus(&editor_b);
2277        });
2278
2279        // Receive a completion request as the host's language server.
2280        // Return some completions from the host's language server.
2281        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2282            assert_eq!(
2283                params.text_document_position.text_document.uri,
2284                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2285            );
2286            assert_eq!(
2287                params.text_document_position.position,
2288                lsp::Position::new(0, 14),
2289            );
2290
2291            Some(lsp::CompletionResponse::Array(vec![
2292                lsp::CompletionItem {
2293                    label: "first_method(…)".into(),
2294                    detail: Some("fn(&mut self, B) -> C".into()),
2295                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2296                        new_text: "first_method($1)".to_string(),
2297                        range: lsp::Range::new(
2298                            lsp::Position::new(0, 14),
2299                            lsp::Position::new(0, 14),
2300                        ),
2301                    })),
2302                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2303                    ..Default::default()
2304                },
2305                lsp::CompletionItem {
2306                    label: "second_method(…)".into(),
2307                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2308                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2309                        new_text: "second_method()".to_string(),
2310                        range: lsp::Range::new(
2311                            lsp::Position::new(0, 14),
2312                            lsp::Position::new(0, 14),
2313                        ),
2314                    })),
2315                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2316                    ..Default::default()
2317                },
2318            ]))
2319        });
2320
2321        // Open the buffer on the host.
2322        let buffer_a = project_a
2323            .update(&mut cx_a, |p, cx| {
2324                p.open_buffer((worktree_id, "main.rs"), cx)
2325            })
2326            .await
2327            .unwrap();
2328        buffer_a
2329            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2330            .await;
2331
2332        // Confirm a completion on the guest.
2333        editor_b.next_notification(&cx_b).await;
2334        editor_b.update(&mut cx_b, |editor, cx| {
2335            assert!(editor.context_menu_visible());
2336            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2337            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2338        });
2339
2340        // Return a resolved completion from the host's language server.
2341        // The resolved completion has an additional text edit.
2342        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2343            assert_eq!(params.label, "first_method(…)");
2344            lsp::CompletionItem {
2345                label: "first_method(…)".into(),
2346                detail: Some("fn(&mut self, B) -> C".into()),
2347                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2348                    new_text: "first_method($1)".to_string(),
2349                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2350                })),
2351                additional_text_edits: Some(vec![lsp::TextEdit {
2352                    new_text: "use d::SomeTrait;\n".to_string(),
2353                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2354                }]),
2355                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2356                ..Default::default()
2357            }
2358        });
2359
2360        buffer_a
2361            .condition(&cx_a, |buffer, _| {
2362                buffer.text() == "fn main() { a.first_method() }"
2363            })
2364            .await;
2365
2366        // The additional edit is applied.
2367        buffer_b
2368            .condition(&cx_b, |buffer, _| {
2369                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2370            })
2371            .await;
2372        assert_eq!(
2373            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2374            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2375        );
2376    }
2377
2378    #[gpui::test(iterations = 10)]
2379    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2380        cx_a.foreground().forbid_parking();
2381        let mut lang_registry = Arc::new(LanguageRegistry::new());
2382        let fs = Arc::new(FakeFs::new(cx_a.background()));
2383
2384        // Set up a fake language server.
2385        let (language_server_config, mut fake_language_server) =
2386            LanguageServerConfig::fake(&cx_a).await;
2387        Arc::get_mut(&mut lang_registry)
2388            .unwrap()
2389            .add(Arc::new(Language::new(
2390                LanguageConfig {
2391                    name: "Rust".to_string(),
2392                    path_suffixes: vec!["rs".to_string()],
2393                    language_server: Some(language_server_config),
2394                    ..Default::default()
2395                },
2396                Some(tree_sitter_rust::language()),
2397            )));
2398
2399        // Connect to a server as 2 clients.
2400        let mut server = TestServer::start(cx_a.foreground()).await;
2401        let client_a = server.create_client(&mut cx_a, "user_a").await;
2402        let client_b = server.create_client(&mut cx_b, "user_b").await;
2403
2404        // Share a project as client A
2405        fs.insert_tree(
2406            "/a",
2407            json!({
2408                ".zed.toml": r#"collaborators = ["user_b"]"#,
2409                "a.rs": "let one = two",
2410            }),
2411        )
2412        .await;
2413        let project_a = cx_a.update(|cx| {
2414            Project::local(
2415                client_a.clone(),
2416                client_a.user_store.clone(),
2417                lang_registry.clone(),
2418                fs.clone(),
2419                cx,
2420            )
2421        });
2422        let (worktree_a, _) = project_a
2423            .update(&mut cx_a, |p, cx| {
2424                p.find_or_create_local_worktree("/a", false, cx)
2425            })
2426            .await
2427            .unwrap();
2428        worktree_a
2429            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2430            .await;
2431        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2432        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2433        project_a
2434            .update(&mut cx_a, |p, cx| p.share(cx))
2435            .await
2436            .unwrap();
2437
2438        // Join the worktree as client B.
2439        let project_b = Project::remote(
2440            project_id,
2441            client_b.clone(),
2442            client_b.user_store.clone(),
2443            lang_registry.clone(),
2444            fs.clone(),
2445            &mut cx_b.to_async(),
2446        )
2447        .await
2448        .unwrap();
2449
2450        let buffer_b = cx_b
2451            .background()
2452            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2453            .await
2454            .unwrap();
2455
2456        let format = project_b.update(&mut cx_b, |project, cx| {
2457            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2458        });
2459
2460        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2461            Some(vec![
2462                lsp::TextEdit {
2463                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2464                    new_text: "h".to_string(),
2465                },
2466                lsp::TextEdit {
2467                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2468                    new_text: "y".to_string(),
2469                },
2470            ])
2471        });
2472
2473        format.await.unwrap();
2474        assert_eq!(
2475            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2476            "let honey = two"
2477        );
2478    }
2479
2480    #[gpui::test(iterations = 10)]
2481    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2482        cx_a.foreground().forbid_parking();
2483        let mut lang_registry = Arc::new(LanguageRegistry::new());
2484        let fs = Arc::new(FakeFs::new(cx_a.background()));
2485        fs.insert_tree(
2486            "/root-1",
2487            json!({
2488                ".zed.toml": r#"collaborators = ["user_b"]"#,
2489                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2490            }),
2491        )
2492        .await;
2493        fs.insert_tree(
2494            "/root-2",
2495            json!({
2496                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2497            }),
2498        )
2499        .await;
2500
2501        // Set up a fake language server.
2502        let (language_server_config, mut fake_language_server) =
2503            LanguageServerConfig::fake(&cx_a).await;
2504        Arc::get_mut(&mut lang_registry)
2505            .unwrap()
2506            .add(Arc::new(Language::new(
2507                LanguageConfig {
2508                    name: "Rust".to_string(),
2509                    path_suffixes: vec!["rs".to_string()],
2510                    language_server: Some(language_server_config),
2511                    ..Default::default()
2512                },
2513                Some(tree_sitter_rust::language()),
2514            )));
2515
2516        // Connect to a server as 2 clients.
2517        let mut server = TestServer::start(cx_a.foreground()).await;
2518        let client_a = server.create_client(&mut cx_a, "user_a").await;
2519        let client_b = server.create_client(&mut cx_b, "user_b").await;
2520
2521        // Share a project as client A
2522        let project_a = cx_a.update(|cx| {
2523            Project::local(
2524                client_a.clone(),
2525                client_a.user_store.clone(),
2526                lang_registry.clone(),
2527                fs.clone(),
2528                cx,
2529            )
2530        });
2531        let (worktree_a, _) = project_a
2532            .update(&mut cx_a, |p, cx| {
2533                p.find_or_create_local_worktree("/root-1", false, cx)
2534            })
2535            .await
2536            .unwrap();
2537        worktree_a
2538            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2539            .await;
2540        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2541        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2542        project_a
2543            .update(&mut cx_a, |p, cx| p.share(cx))
2544            .await
2545            .unwrap();
2546
2547        // Join the worktree as client B.
2548        let project_b = Project::remote(
2549            project_id,
2550            client_b.clone(),
2551            client_b.user_store.clone(),
2552            lang_registry.clone(),
2553            fs.clone(),
2554            &mut cx_b.to_async(),
2555        )
2556        .await
2557        .unwrap();
2558
2559        // Open the file on client B.
2560        let buffer_b = cx_b
2561            .background()
2562            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2563            .await
2564            .unwrap();
2565
2566        // Request the definition of a symbol as the guest.
2567        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2568        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2569            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2570                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2571                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2572            )))
2573        });
2574
2575        let definitions_1 = definitions_1.await.unwrap();
2576        cx_b.read(|cx| {
2577            assert_eq!(definitions_1.len(), 1);
2578            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2579            let target_buffer = definitions_1[0].target_buffer.read(cx);
2580            assert_eq!(
2581                target_buffer.text(),
2582                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2583            );
2584            assert_eq!(
2585                definitions_1[0].target_range.to_point(target_buffer),
2586                Point::new(0, 6)..Point::new(0, 9)
2587            );
2588        });
2589
2590        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2591        // the previous call to `definition`.
2592        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2593        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2594            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2595                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2596                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2597            )))
2598        });
2599
2600        let definitions_2 = definitions_2.await.unwrap();
2601        cx_b.read(|cx| {
2602            assert_eq!(definitions_2.len(), 1);
2603            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2604            let target_buffer = definitions_2[0].target_buffer.read(cx);
2605            assert_eq!(
2606                target_buffer.text(),
2607                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2608            );
2609            assert_eq!(
2610                definitions_2[0].target_range.to_point(target_buffer),
2611                Point::new(1, 6)..Point::new(1, 11)
2612            );
2613        });
2614        assert_eq!(
2615            definitions_1[0].target_buffer,
2616            definitions_2[0].target_buffer
2617        );
2618
2619        cx_b.update(|_| {
2620            drop(definitions_1);
2621            drop(definitions_2);
2622        });
2623        project_b
2624            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2625            .await;
2626    }
2627
2628    #[gpui::test(iterations = 10)]
2629    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2630        mut cx_a: TestAppContext,
2631        mut cx_b: TestAppContext,
2632        mut rng: StdRng,
2633    ) {
2634        cx_a.foreground().forbid_parking();
2635        let mut lang_registry = Arc::new(LanguageRegistry::new());
2636        let fs = Arc::new(FakeFs::new(cx_a.background()));
2637        fs.insert_tree(
2638            "/root",
2639            json!({
2640                ".zed.toml": r#"collaborators = ["user_b"]"#,
2641                "a.rs": "const ONE: usize = b::TWO;",
2642                "b.rs": "const TWO: usize = 2",
2643            }),
2644        )
2645        .await;
2646
2647        // Set up a fake language server.
2648        let (language_server_config, mut fake_language_server) =
2649            LanguageServerConfig::fake(&cx_a).await;
2650        Arc::get_mut(&mut lang_registry)
2651            .unwrap()
2652            .add(Arc::new(Language::new(
2653                LanguageConfig {
2654                    name: "Rust".to_string(),
2655                    path_suffixes: vec!["rs".to_string()],
2656                    language_server: Some(language_server_config),
2657                    ..Default::default()
2658                },
2659                Some(tree_sitter_rust::language()),
2660            )));
2661
2662        // Connect to a server as 2 clients.
2663        let mut server = TestServer::start(cx_a.foreground()).await;
2664        let client_a = server.create_client(&mut cx_a, "user_a").await;
2665        let client_b = server.create_client(&mut cx_b, "user_b").await;
2666
2667        // Share a project as client A
2668        let project_a = cx_a.update(|cx| {
2669            Project::local(
2670                client_a.clone(),
2671                client_a.user_store.clone(),
2672                lang_registry.clone(),
2673                fs.clone(),
2674                cx,
2675            )
2676        });
2677        let (worktree_a, _) = project_a
2678            .update(&mut cx_a, |p, cx| {
2679                p.find_or_create_local_worktree("/root", false, cx)
2680            })
2681            .await
2682            .unwrap();
2683        worktree_a
2684            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2685            .await;
2686        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2687        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2688        project_a
2689            .update(&mut cx_a, |p, cx| p.share(cx))
2690            .await
2691            .unwrap();
2692
2693        // Join the worktree as client B.
2694        let project_b = Project::remote(
2695            project_id,
2696            client_b.clone(),
2697            client_b.user_store.clone(),
2698            lang_registry.clone(),
2699            fs.clone(),
2700            &mut cx_b.to_async(),
2701        )
2702        .await
2703        .unwrap();
2704
2705        let buffer_b1 = cx_b
2706            .background()
2707            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2708            .await
2709            .unwrap();
2710
2711        let definitions;
2712        let buffer_b2;
2713        if rng.gen() {
2714            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2715            buffer_b2 =
2716                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2717        } else {
2718            buffer_b2 =
2719                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2720            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2721        }
2722
2723        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2724            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2725                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2726                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2727            )))
2728        });
2729
2730        let buffer_b2 = buffer_b2.await.unwrap();
2731        let definitions = definitions.await.unwrap();
2732        assert_eq!(definitions.len(), 1);
2733        assert_eq!(definitions[0].target_buffer, buffer_b2);
2734    }
2735
2736    #[gpui::test(iterations = 10)]
2737    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2738        cx_a.foreground().forbid_parking();
2739
2740        // Connect to a server as 2 clients.
2741        let mut server = TestServer::start(cx_a.foreground()).await;
2742        let client_a = server.create_client(&mut cx_a, "user_a").await;
2743        let client_b = server.create_client(&mut cx_b, "user_b").await;
2744
2745        // Create an org that includes these 2 users.
2746        let db = &server.app_state.db;
2747        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2748        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2749            .await
2750            .unwrap();
2751        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2752            .await
2753            .unwrap();
2754
2755        // Create a channel that includes all the users.
2756        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2757        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2758            .await
2759            .unwrap();
2760        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2761            .await
2762            .unwrap();
2763        db.create_channel_message(
2764            channel_id,
2765            client_b.current_user_id(&cx_b),
2766            "hello A, it's B.",
2767            OffsetDateTime::now_utc(),
2768            1,
2769        )
2770        .await
2771        .unwrap();
2772
2773        let channels_a = cx_a
2774            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2775        channels_a
2776            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2777            .await;
2778        channels_a.read_with(&cx_a, |list, _| {
2779            assert_eq!(
2780                list.available_channels().unwrap(),
2781                &[ChannelDetails {
2782                    id: channel_id.to_proto(),
2783                    name: "test-channel".to_string()
2784                }]
2785            )
2786        });
2787        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2788            this.get_channel(channel_id.to_proto(), cx).unwrap()
2789        });
2790        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2791        channel_a
2792            .condition(&cx_a, |channel, _| {
2793                channel_messages(channel)
2794                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2795            })
2796            .await;
2797
2798        let channels_b = cx_b
2799            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2800        channels_b
2801            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2802            .await;
2803        channels_b.read_with(&cx_b, |list, _| {
2804            assert_eq!(
2805                list.available_channels().unwrap(),
2806                &[ChannelDetails {
2807                    id: channel_id.to_proto(),
2808                    name: "test-channel".to_string()
2809                }]
2810            )
2811        });
2812
2813        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2814            this.get_channel(channel_id.to_proto(), cx).unwrap()
2815        });
2816        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2817        channel_b
2818            .condition(&cx_b, |channel, _| {
2819                channel_messages(channel)
2820                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2821            })
2822            .await;
2823
2824        channel_a
2825            .update(&mut cx_a, |channel, cx| {
2826                channel
2827                    .send_message("oh, hi B.".to_string(), cx)
2828                    .unwrap()
2829                    .detach();
2830                let task = channel.send_message("sup".to_string(), cx).unwrap();
2831                assert_eq!(
2832                    channel_messages(channel),
2833                    &[
2834                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2835                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2836                        ("user_a".to_string(), "sup".to_string(), true)
2837                    ]
2838                );
2839                task
2840            })
2841            .await
2842            .unwrap();
2843
2844        channel_b
2845            .condition(&cx_b, |channel, _| {
2846                channel_messages(channel)
2847                    == [
2848                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2849                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2850                        ("user_a".to_string(), "sup".to_string(), false),
2851                    ]
2852            })
2853            .await;
2854
2855        assert_eq!(
2856            server
2857                .state()
2858                .await
2859                .channel(channel_id)
2860                .unwrap()
2861                .connection_ids
2862                .len(),
2863            2
2864        );
2865        cx_b.update(|_| drop(channel_b));
2866        server
2867            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2868            .await;
2869
2870        cx_a.update(|_| drop(channel_a));
2871        server
2872            .condition(|state| state.channel(channel_id).is_none())
2873            .await;
2874    }
2875
2876    #[gpui::test(iterations = 10)]
2877    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2878        cx_a.foreground().forbid_parking();
2879
2880        let mut server = TestServer::start(cx_a.foreground()).await;
2881        let client_a = server.create_client(&mut cx_a, "user_a").await;
2882
2883        let db = &server.app_state.db;
2884        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2885        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2886        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2887            .await
2888            .unwrap();
2889        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2890            .await
2891            .unwrap();
2892
2893        let channels_a = cx_a
2894            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2895        channels_a
2896            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2897            .await;
2898        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2899            this.get_channel(channel_id.to_proto(), cx).unwrap()
2900        });
2901
2902        // Messages aren't allowed to be too long.
2903        channel_a
2904            .update(&mut cx_a, |channel, cx| {
2905                let long_body = "this is long.\n".repeat(1024);
2906                channel.send_message(long_body, cx).unwrap()
2907            })
2908            .await
2909            .unwrap_err();
2910
2911        // Messages aren't allowed to be blank.
2912        channel_a.update(&mut cx_a, |channel, cx| {
2913            channel.send_message(String::new(), cx).unwrap_err()
2914        });
2915
2916        // Leading and trailing whitespace are trimmed.
2917        channel_a
2918            .update(&mut cx_a, |channel, cx| {
2919                channel
2920                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2921                    .unwrap()
2922            })
2923            .await
2924            .unwrap();
2925        assert_eq!(
2926            db.get_channel_messages(channel_id, 10, None)
2927                .await
2928                .unwrap()
2929                .iter()
2930                .map(|m| &m.body)
2931                .collect::<Vec<_>>(),
2932            &["surrounded by whitespace"]
2933        );
2934    }
2935
2936    #[gpui::test(iterations = 10)]
2937    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2938        cx_a.foreground().forbid_parking();
2939
2940        // Connect to a server as 2 clients.
2941        let mut server = TestServer::start(cx_a.foreground()).await;
2942        let client_a = server.create_client(&mut cx_a, "user_a").await;
2943        let client_b = server.create_client(&mut cx_b, "user_b").await;
2944        let mut status_b = client_b.status();
2945
2946        // Create an org that includes these 2 users.
2947        let db = &server.app_state.db;
2948        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2949        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2950            .await
2951            .unwrap();
2952        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2953            .await
2954            .unwrap();
2955
2956        // Create a channel that includes all the users.
2957        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2958        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2959            .await
2960            .unwrap();
2961        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2962            .await
2963            .unwrap();
2964        db.create_channel_message(
2965            channel_id,
2966            client_b.current_user_id(&cx_b),
2967            "hello A, it's B.",
2968            OffsetDateTime::now_utc(),
2969            2,
2970        )
2971        .await
2972        .unwrap();
2973
2974        let channels_a = cx_a
2975            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2976        channels_a
2977            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2978            .await;
2979
2980        channels_a.read_with(&cx_a, |list, _| {
2981            assert_eq!(
2982                list.available_channels().unwrap(),
2983                &[ChannelDetails {
2984                    id: channel_id.to_proto(),
2985                    name: "test-channel".to_string()
2986                }]
2987            )
2988        });
2989        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2990            this.get_channel(channel_id.to_proto(), cx).unwrap()
2991        });
2992        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2993        channel_a
2994            .condition(&cx_a, |channel, _| {
2995                channel_messages(channel)
2996                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2997            })
2998            .await;
2999
3000        let channels_b = cx_b
3001            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3002        channels_b
3003            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3004            .await;
3005        channels_b.read_with(&cx_b, |list, _| {
3006            assert_eq!(
3007                list.available_channels().unwrap(),
3008                &[ChannelDetails {
3009                    id: channel_id.to_proto(),
3010                    name: "test-channel".to_string()
3011                }]
3012            )
3013        });
3014
3015        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3016            this.get_channel(channel_id.to_proto(), cx).unwrap()
3017        });
3018        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3019        channel_b
3020            .condition(&cx_b, |channel, _| {
3021                channel_messages(channel)
3022                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3023            })
3024            .await;
3025
3026        // Disconnect client B, ensuring we can still access its cached channel data.
3027        server.forbid_connections();
3028        server.disconnect_client(client_b.current_user_id(&cx_b));
3029        while !matches!(
3030            status_b.next().await,
3031            Some(client::Status::ReconnectionError { .. })
3032        ) {}
3033
3034        channels_b.read_with(&cx_b, |channels, _| {
3035            assert_eq!(
3036                channels.available_channels().unwrap(),
3037                [ChannelDetails {
3038                    id: channel_id.to_proto(),
3039                    name: "test-channel".to_string()
3040                }]
3041            )
3042        });
3043        channel_b.read_with(&cx_b, |channel, _| {
3044            assert_eq!(
3045                channel_messages(channel),
3046                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3047            )
3048        });
3049
3050        // Send a message from client B while it is disconnected.
3051        channel_b
3052            .update(&mut cx_b, |channel, cx| {
3053                let task = channel
3054                    .send_message("can you see this?".to_string(), cx)
3055                    .unwrap();
3056                assert_eq!(
3057                    channel_messages(channel),
3058                    &[
3059                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3060                        ("user_b".to_string(), "can you see this?".to_string(), true)
3061                    ]
3062                );
3063                task
3064            })
3065            .await
3066            .unwrap_err();
3067
3068        // Send a message from client A while B is disconnected.
3069        channel_a
3070            .update(&mut cx_a, |channel, cx| {
3071                channel
3072                    .send_message("oh, hi B.".to_string(), cx)
3073                    .unwrap()
3074                    .detach();
3075                let task = channel.send_message("sup".to_string(), cx).unwrap();
3076                assert_eq!(
3077                    channel_messages(channel),
3078                    &[
3079                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3080                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3081                        ("user_a".to_string(), "sup".to_string(), true)
3082                    ]
3083                );
3084                task
3085            })
3086            .await
3087            .unwrap();
3088
3089        // Give client B a chance to reconnect.
3090        server.allow_connections();
3091        cx_b.foreground().advance_clock(Duration::from_secs(10));
3092
3093        // Verify that B sees the new messages upon reconnection, as well as the message client B
3094        // sent while offline.
3095        channel_b
3096            .condition(&cx_b, |channel, _| {
3097                channel_messages(channel)
3098                    == [
3099                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3100                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3101                        ("user_a".to_string(), "sup".to_string(), false),
3102                        ("user_b".to_string(), "can you see this?".to_string(), false),
3103                    ]
3104            })
3105            .await;
3106
3107        // Ensure client A and B can communicate normally after reconnection.
3108        channel_a
3109            .update(&mut cx_a, |channel, cx| {
3110                channel.send_message("you online?".to_string(), cx).unwrap()
3111            })
3112            .await
3113            .unwrap();
3114        channel_b
3115            .condition(&cx_b, |channel, _| {
3116                channel_messages(channel)
3117                    == [
3118                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3119                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3120                        ("user_a".to_string(), "sup".to_string(), false),
3121                        ("user_b".to_string(), "can you see this?".to_string(), false),
3122                        ("user_a".to_string(), "you online?".to_string(), false),
3123                    ]
3124            })
3125            .await;
3126
3127        channel_b
3128            .update(&mut cx_b, |channel, cx| {
3129                channel.send_message("yep".to_string(), cx).unwrap()
3130            })
3131            .await
3132            .unwrap();
3133        channel_a
3134            .condition(&cx_a, |channel, _| {
3135                channel_messages(channel)
3136                    == [
3137                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3138                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3139                        ("user_a".to_string(), "sup".to_string(), false),
3140                        ("user_b".to_string(), "can you see this?".to_string(), false),
3141                        ("user_a".to_string(), "you online?".to_string(), false),
3142                        ("user_b".to_string(), "yep".to_string(), false),
3143                    ]
3144            })
3145            .await;
3146    }
3147
3148    #[gpui::test(iterations = 10)]
3149    async fn test_contacts(
3150        mut cx_a: TestAppContext,
3151        mut cx_b: TestAppContext,
3152        mut cx_c: TestAppContext,
3153    ) {
3154        cx_a.foreground().forbid_parking();
3155        let lang_registry = Arc::new(LanguageRegistry::new());
3156        let fs = Arc::new(FakeFs::new(cx_a.background()));
3157
3158        // Connect to a server as 3 clients.
3159        let mut server = TestServer::start(cx_a.foreground()).await;
3160        let client_a = server.create_client(&mut cx_a, "user_a").await;
3161        let client_b = server.create_client(&mut cx_b, "user_b").await;
3162        let client_c = server.create_client(&mut cx_c, "user_c").await;
3163
3164        // Share a worktree as client A.
3165        fs.insert_tree(
3166            "/a",
3167            json!({
3168                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3169            }),
3170        )
3171        .await;
3172
3173        let project_a = cx_a.update(|cx| {
3174            Project::local(
3175                client_a.clone(),
3176                client_a.user_store.clone(),
3177                lang_registry.clone(),
3178                fs.clone(),
3179                cx,
3180            )
3181        });
3182        let (worktree_a, _) = project_a
3183            .update(&mut cx_a, |p, cx| {
3184                p.find_or_create_local_worktree("/a", false, cx)
3185            })
3186            .await
3187            .unwrap();
3188        worktree_a
3189            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3190            .await;
3191
3192        client_a
3193            .user_store
3194            .condition(&cx_a, |user_store, _| {
3195                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3196            })
3197            .await;
3198        client_b
3199            .user_store
3200            .condition(&cx_b, |user_store, _| {
3201                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3202            })
3203            .await;
3204        client_c
3205            .user_store
3206            .condition(&cx_c, |user_store, _| {
3207                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3208            })
3209            .await;
3210
3211        let project_id = project_a
3212            .update(&mut cx_a, |project, _| project.next_remote_id())
3213            .await;
3214        project_a
3215            .update(&mut cx_a, |project, cx| project.share(cx))
3216            .await
3217            .unwrap();
3218
3219        let _project_b = Project::remote(
3220            project_id,
3221            client_b.clone(),
3222            client_b.user_store.clone(),
3223            lang_registry.clone(),
3224            fs.clone(),
3225            &mut cx_b.to_async(),
3226        )
3227        .await
3228        .unwrap();
3229
3230        client_a
3231            .user_store
3232            .condition(&cx_a, |user_store, _| {
3233                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3234            })
3235            .await;
3236        client_b
3237            .user_store
3238            .condition(&cx_b, |user_store, _| {
3239                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3240            })
3241            .await;
3242        client_c
3243            .user_store
3244            .condition(&cx_c, |user_store, _| {
3245                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3246            })
3247            .await;
3248
3249        project_a
3250            .condition(&cx_a, |project, _| {
3251                project.collaborators().contains_key(&client_b.peer_id)
3252            })
3253            .await;
3254
3255        cx_a.update(move |_| drop(project_a));
3256        client_a
3257            .user_store
3258            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3259            .await;
3260        client_b
3261            .user_store
3262            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3263            .await;
3264        client_c
3265            .user_store
3266            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3267            .await;
3268
3269        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3270            user_store
3271                .contacts()
3272                .iter()
3273                .map(|contact| {
3274                    let worktrees = contact
3275                        .projects
3276                        .iter()
3277                        .map(|p| {
3278                            (
3279                                p.worktree_root_names[0].as_str(),
3280                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3281                            )
3282                        })
3283                        .collect();
3284                    (contact.user.github_login.as_str(), worktrees)
3285                })
3286                .collect()
3287        }
3288    }
3289
3290    struct TestServer {
3291        peer: Arc<Peer>,
3292        app_state: Arc<AppState>,
3293        server: Arc<Server>,
3294        foreground: Rc<executor::Foreground>,
3295        notifications: mpsc::Receiver<()>,
3296        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3297        forbid_connections: Arc<AtomicBool>,
3298        _test_db: TestDb,
3299    }
3300
3301    impl TestServer {
3302        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3303            let test_db = TestDb::new();
3304            let app_state = Self::build_app_state(&test_db).await;
3305            let peer = Peer::new();
3306            let notifications = mpsc::channel(128);
3307            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3308            Self {
3309                peer,
3310                app_state,
3311                server,
3312                foreground,
3313                notifications: notifications.1,
3314                connection_killers: Default::default(),
3315                forbid_connections: Default::default(),
3316                _test_db: test_db,
3317            }
3318        }
3319
3320        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3321            let http = FakeHttpClient::with_404_response();
3322            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3323            let client_name = name.to_string();
3324            let mut client = Client::new(http.clone());
3325            let server = self.server.clone();
3326            let connection_killers = self.connection_killers.clone();
3327            let forbid_connections = self.forbid_connections.clone();
3328            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3329
3330            Arc::get_mut(&mut client)
3331                .unwrap()
3332                .override_authenticate(move |cx| {
3333                    cx.spawn(|_| async move {
3334                        let access_token = "the-token".to_string();
3335                        Ok(Credentials {
3336                            user_id: user_id.0 as u64,
3337                            access_token,
3338                        })
3339                    })
3340                })
3341                .override_establish_connection(move |credentials, cx| {
3342                    assert_eq!(credentials.user_id, user_id.0 as u64);
3343                    assert_eq!(credentials.access_token, "the-token");
3344
3345                    let server = server.clone();
3346                    let connection_killers = connection_killers.clone();
3347                    let forbid_connections = forbid_connections.clone();
3348                    let client_name = client_name.clone();
3349                    let connection_id_tx = connection_id_tx.clone();
3350                    cx.spawn(move |cx| async move {
3351                        if forbid_connections.load(SeqCst) {
3352                            Err(EstablishConnectionError::other(anyhow!(
3353                                "server is forbidding connections"
3354                            )))
3355                        } else {
3356                            let (client_conn, server_conn, kill_conn) =
3357                                Connection::in_memory(cx.background());
3358                            connection_killers.lock().insert(user_id, kill_conn);
3359                            cx.background()
3360                                .spawn(server.handle_connection(
3361                                    server_conn,
3362                                    client_name,
3363                                    user_id,
3364                                    Some(connection_id_tx),
3365                                ))
3366                                .detach();
3367                            Ok(client_conn)
3368                        }
3369                    })
3370                });
3371
3372            client
3373                .authenticate_and_connect(&cx.to_async())
3374                .await
3375                .unwrap();
3376
3377            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3378            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3379            let mut authed_user =
3380                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3381            while authed_user.next().await.unwrap().is_none() {}
3382
3383            TestClient {
3384                client,
3385                peer_id,
3386                user_store,
3387            }
3388        }
3389
3390        fn disconnect_client(&self, user_id: UserId) {
3391            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3392                let _ = kill_conn.try_send(Some(()));
3393            }
3394        }
3395
3396        fn forbid_connections(&self) {
3397            self.forbid_connections.store(true, SeqCst);
3398        }
3399
3400        fn allow_connections(&self) {
3401            self.forbid_connections.store(false, SeqCst);
3402        }
3403
3404        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3405            let mut config = Config::default();
3406            config.session_secret = "a".repeat(32);
3407            config.database_url = test_db.url.clone();
3408            let github_client = github::AppClient::test();
3409            Arc::new(AppState {
3410                db: test_db.db().clone(),
3411                handlebars: Default::default(),
3412                auth_client: auth::build_client("", ""),
3413                repo_client: github::RepoClient::test(&github_client),
3414                github_client,
3415                config,
3416            })
3417        }
3418
3419        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3420            self.server.store.read()
3421        }
3422
3423        async fn condition<F>(&mut self, mut predicate: F)
3424        where
3425            F: FnMut(&Store) -> bool,
3426        {
3427            async_std::future::timeout(Duration::from_millis(500), async {
3428                while !(predicate)(&*self.server.store.read()) {
3429                    self.foreground.start_waiting();
3430                    self.notifications.next().await;
3431                    self.foreground.finish_waiting();
3432                }
3433            })
3434            .await
3435            .expect("condition timed out");
3436        }
3437    }
3438
3439    impl Drop for TestServer {
3440        fn drop(&mut self) {
3441            self.peer.reset();
3442        }
3443    }
3444
3445    struct TestClient {
3446        client: Arc<Client>,
3447        pub peer_id: PeerId,
3448        pub user_store: ModelHandle<UserStore>,
3449    }
3450
3451    impl Deref for TestClient {
3452        type Target = Arc<Client>;
3453
3454        fn deref(&self) -> &Self::Target {
3455            &self.client
3456        }
3457    }
3458
3459    impl TestClient {
3460        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3461            UserId::from_proto(
3462                self.user_store
3463                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3464            )
3465        }
3466    }
3467
3468    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3469        channel
3470            .messages()
3471            .cursor::<()>()
3472            .map(|m| {
3473                (
3474                    m.sender.github_login.clone(),
3475                    m.body.clone(),
3476                    m.is_pending(),
3477                )
3478            })
3479            .collect()
3480    }
3481
3482    struct EmptyView;
3483
3484    impl gpui::Entity for EmptyView {
3485        type Event = ();
3486    }
3487
3488    impl gpui::View for EmptyView {
3489        fn ui_name() -> &'static str {
3490            "empty view"
3491        }
3492
3493        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3494            gpui::Element::boxed(gpui::elements::Empty)
3495        }
3496    }
3497}