rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44pub trait Executor {
  45    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  46}
  47
  48pub struct RealExecutor;
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_request_handler(Server::ping)
  69            .add_request_handler(Server::register_project)
  70            .add_message_handler(Server::unregister_project)
  71            .add_request_handler(Server::share_project)
  72            .add_message_handler(Server::unshare_project)
  73            .add_request_handler(Server::join_project)
  74            .add_message_handler(Server::leave_project)
  75            .add_request_handler(Server::register_worktree)
  76            .add_message_handler(Server::unregister_worktree)
  77            .add_request_handler(Server::share_worktree)
  78            .add_message_handler(Server::update_worktree)
  79            .add_message_handler(Server::update_diagnostic_summary)
  80            .add_message_handler(Server::disk_based_diagnostics_updating)
  81            .add_message_handler(Server::disk_based_diagnostics_updated)
  82            .add_request_handler(Server::get_definition)
  83            .add_request_handler(Server::open_buffer)
  84            .add_message_handler(Server::close_buffer)
  85            .add_request_handler(Server::update_buffer)
  86            .add_message_handler(Server::update_buffer_file)
  87            .add_message_handler(Server::buffer_reloaded)
  88            .add_message_handler(Server::buffer_saved)
  89            .add_request_handler(Server::save_buffer)
  90            .add_request_handler(Server::format_buffers)
  91            .add_request_handler(Server::get_completions)
  92            .add_request_handler(Server::apply_additional_edits_for_completion)
  93            .add_request_handler(Server::get_code_actions)
  94            .add_request_handler(Server::apply_code_action)
  95            .add_request_handler(Server::get_channels)
  96            .add_request_handler(Server::get_users)
  97            .add_request_handler(Server::join_channel)
  98            .add_message_handler(Server::leave_channel)
  99            .add_request_handler(Server::send_channel_message)
 100            .add_request_handler(Server::get_channel_messages);
 101
 102        Arc::new(server)
 103    }
 104
 105    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 106    where
 107        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 108        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 109        M: EnvelopedMessage,
 110    {
 111        let prev_handler = self.handlers.insert(
 112            TypeId::of::<M>(),
 113            Box::new(move |server, envelope| {
 114                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 115                (handler)(server, *envelope).boxed()
 116            }),
 117        );
 118        if prev_handler.is_some() {
 119            panic!("registered a handler for the same message twice");
 120        }
 121        self
 122    }
 123
 124    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 128        M: RequestMessage,
 129    {
 130        self.add_message_handler(move |server, envelope| {
 131            let receipt = envelope.receipt();
 132            let response = (handler)(server.clone(), envelope);
 133            async move {
 134                match response.await {
 135                    Ok(response) => {
 136                        server.peer.respond(receipt, response)?;
 137                        Ok(())
 138                    }
 139                    Err(error) => {
 140                        server.peer.respond_with_error(
 141                            receipt,
 142                            proto::Error {
 143                                message: error.to_string(),
 144                            },
 145                        )?;
 146                        Err(error)
 147                    }
 148                }
 149            }
 150        })
 151    }
 152
 153    pub fn handle_connection<E: Executor>(
 154        self: &Arc<Self>,
 155        connection: Connection,
 156        addr: String,
 157        user_id: UserId,
 158        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 159        executor: E,
 160    ) -> impl Future<Output = ()> {
 161        let mut this = self.clone();
 162        async move {
 163            let (connection_id, handle_io, mut incoming_rx) =
 164                this.peer.add_connection(connection).await;
 165
 166            if let Some(send_connection_id) = send_connection_id.as_mut() {
 167                let _ = send_connection_id.send(connection_id).await;
 168            }
 169
 170            this.state_mut().add_connection(connection_id, user_id);
 171            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 172                log::error!("error updating contacts for {:?}: {}", user_id, err);
 173            }
 174
 175            let handle_io = handle_io.fuse();
 176            futures::pin_mut!(handle_io);
 177            loop {
 178                let next_message = incoming_rx.next().fuse();
 179                futures::pin_mut!(next_message);
 180                futures::select_biased! {
 181                    result = handle_io => {
 182                        if let Err(err) = result {
 183                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 184                        }
 185                        break;
 186                    }
 187                    message = next_message => {
 188                        if let Some(message) = message {
 189                            let start_time = Instant::now();
 190                            let type_name = message.payload_type_name();
 191                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 192                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 193                                let handle_message = (handler)(this.clone(), message);
 194                                executor.spawn_detached(async move {
 195                                    if let Err(err) = handle_message.await {
 196                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 197                                    } else {
 198                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 199                                    }
 200                                });
 201                                if let Some(mut notifications) = this.notifications.clone() {
 202                                    let _ = notifications.send(()).await;
 203                                }
 204                            } else {
 205                                log::warn!("unhandled message: {}", type_name);
 206                            }
 207                        } else {
 208                            log::info!("rpc connection closed {:?}", addr);
 209                            break;
 210                        }
 211                    }
 212                }
 213            }
 214
 215            if let Err(err) = this.sign_out(connection_id).await {
 216                log::error!("error signing out connection {:?} - {:?}", addr, err);
 217            }
 218        }
 219    }
 220
 221    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 222        self.peer.disconnect(connection_id);
 223        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 224
 225        for (project_id, project) in removed_connection.hosted_projects {
 226            if let Some(share) = project.share {
 227                broadcast(
 228                    connection_id,
 229                    share.guests.keys().copied().collect(),
 230                    |conn_id| {
 231                        self.peer
 232                            .send(conn_id, proto::UnshareProject { project_id })
 233                    },
 234                )?;
 235            }
 236        }
 237
 238        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 239            broadcast(connection_id, peer_ids, |conn_id| {
 240                self.peer.send(
 241                    conn_id,
 242                    proto::RemoveProjectCollaborator {
 243                        project_id,
 244                        peer_id: connection_id.0,
 245                    },
 246                )
 247            })?;
 248        }
 249
 250        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 251        Ok(())
 252    }
 253
 254    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 255        Ok(proto::Ack {})
 256    }
 257
 258    async fn register_project(
 259        mut self: Arc<Server>,
 260        request: TypedEnvelope<proto::RegisterProject>,
 261    ) -> tide::Result<proto::RegisterProjectResponse> {
 262        let project_id = {
 263            let mut state = self.state_mut();
 264            let user_id = state.user_id_for_connection(request.sender_id)?;
 265            state.register_project(request.sender_id, user_id)
 266        };
 267        Ok(proto::RegisterProjectResponse { project_id })
 268    }
 269
 270    async fn unregister_project(
 271        mut self: Arc<Server>,
 272        request: TypedEnvelope<proto::UnregisterProject>,
 273    ) -> tide::Result<()> {
 274        let project = self
 275            .state_mut()
 276            .unregister_project(request.payload.project_id, request.sender_id)?;
 277        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 278        Ok(())
 279    }
 280
 281    async fn share_project(
 282        mut self: Arc<Server>,
 283        request: TypedEnvelope<proto::ShareProject>,
 284    ) -> tide::Result<proto::Ack> {
 285        self.state_mut()
 286            .share_project(request.payload.project_id, request.sender_id);
 287        Ok(proto::Ack {})
 288    }
 289
 290    async fn unshare_project(
 291        mut self: Arc<Server>,
 292        request: TypedEnvelope<proto::UnshareProject>,
 293    ) -> tide::Result<()> {
 294        let project_id = request.payload.project_id;
 295        let project = self
 296            .state_mut()
 297            .unshare_project(project_id, request.sender_id)?;
 298
 299        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 300            self.peer
 301                .send(conn_id, proto::UnshareProject { project_id })
 302        })?;
 303        self.update_contacts_for_users(&project.authorized_user_ids)?;
 304        Ok(())
 305    }
 306
 307    async fn join_project(
 308        mut self: Arc<Server>,
 309        request: TypedEnvelope<proto::JoinProject>,
 310    ) -> tide::Result<proto::JoinProjectResponse> {
 311        let project_id = request.payload.project_id;
 312
 313        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 314        let (response, connection_ids, contact_user_ids) = self
 315            .state_mut()
 316            .join_project(request.sender_id, user_id, project_id)
 317            .and_then(|joined| {
 318                let share = joined.project.share()?;
 319                let peer_count = share.guests.len();
 320                let mut collaborators = Vec::with_capacity(peer_count);
 321                collaborators.push(proto::Collaborator {
 322                    peer_id: joined.project.host_connection_id.0,
 323                    replica_id: 0,
 324                    user_id: joined.project.host_user_id.to_proto(),
 325                });
 326                let worktrees = joined
 327                    .project
 328                    .worktrees
 329                    .iter()
 330                    .filter_map(|(id, worktree)| {
 331                        worktree.share.as_ref().map(|share| proto::Worktree {
 332                            id: *id,
 333                            root_name: worktree.root_name.clone(),
 334                            entries: share.entries.values().cloned().collect(),
 335                            diagnostic_summaries: share
 336                                .diagnostic_summaries
 337                                .values()
 338                                .cloned()
 339                                .collect(),
 340                            weak: worktree.weak,
 341                        })
 342                    })
 343                    .collect();
 344                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 345                    if *peer_conn_id != request.sender_id {
 346                        collaborators.push(proto::Collaborator {
 347                            peer_id: peer_conn_id.0,
 348                            replica_id: *peer_replica_id as u32,
 349                            user_id: peer_user_id.to_proto(),
 350                        });
 351                    }
 352                }
 353                let response = proto::JoinProjectResponse {
 354                    worktrees,
 355                    replica_id: joined.replica_id as u32,
 356                    collaborators,
 357                };
 358                let connection_ids = joined.project.connection_ids();
 359                let contact_user_ids = joined.project.authorized_user_ids();
 360                Ok((response, connection_ids, contact_user_ids))
 361            })?;
 362
 363        broadcast(request.sender_id, connection_ids, |conn_id| {
 364            self.peer.send(
 365                conn_id,
 366                proto::AddProjectCollaborator {
 367                    project_id,
 368                    collaborator: Some(proto::Collaborator {
 369                        peer_id: request.sender_id.0,
 370                        replica_id: response.replica_id,
 371                        user_id: user_id.to_proto(),
 372                    }),
 373                },
 374            )
 375        })?;
 376        self.update_contacts_for_users(&contact_user_ids)?;
 377        Ok(response)
 378    }
 379
 380    async fn leave_project(
 381        mut self: Arc<Server>,
 382        request: TypedEnvelope<proto::LeaveProject>,
 383    ) -> tide::Result<()> {
 384        let sender_id = request.sender_id;
 385        let project_id = request.payload.project_id;
 386        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 387
 388        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 389            self.peer.send(
 390                conn_id,
 391                proto::RemoveProjectCollaborator {
 392                    project_id,
 393                    peer_id: sender_id.0,
 394                },
 395            )
 396        })?;
 397        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 398
 399        Ok(())
 400    }
 401
 402    async fn register_worktree(
 403        mut self: Arc<Server>,
 404        request: TypedEnvelope<proto::RegisterWorktree>,
 405    ) -> tide::Result<proto::Ack> {
 406        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 407
 408        let mut contact_user_ids = HashSet::default();
 409        contact_user_ids.insert(host_user_id);
 410        for github_login in request.payload.authorized_logins {
 411            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 412            contact_user_ids.insert(contact_user_id);
 413        }
 414
 415        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 416        self.state_mut().register_worktree(
 417            request.payload.project_id,
 418            request.payload.worktree_id,
 419            request.sender_id,
 420            Worktree {
 421                authorized_user_ids: contact_user_ids.clone(),
 422                root_name: request.payload.root_name,
 423                share: None,
 424                weak: false,
 425            },
 426        )?;
 427        self.update_contacts_for_users(&contact_user_ids)?;
 428        Ok(proto::Ack {})
 429    }
 430
 431    async fn unregister_worktree(
 432        mut self: Arc<Server>,
 433        request: TypedEnvelope<proto::UnregisterWorktree>,
 434    ) -> tide::Result<()> {
 435        let project_id = request.payload.project_id;
 436        let worktree_id = request.payload.worktree_id;
 437        let (worktree, guest_connection_ids) =
 438            self.state_mut()
 439                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 440        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 441            self.peer.send(
 442                conn_id,
 443                proto::UnregisterWorktree {
 444                    project_id,
 445                    worktree_id,
 446                },
 447            )
 448        })?;
 449        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 450        Ok(())
 451    }
 452
 453    async fn share_worktree(
 454        mut self: Arc<Server>,
 455        mut request: TypedEnvelope<proto::ShareWorktree>,
 456    ) -> tide::Result<proto::Ack> {
 457        let worktree = request
 458            .payload
 459            .worktree
 460            .as_mut()
 461            .ok_or_else(|| anyhow!("missing worktree"))?;
 462        let entries = worktree
 463            .entries
 464            .iter()
 465            .map(|entry| (entry.id, entry.clone()))
 466            .collect();
 467        let diagnostic_summaries = worktree
 468            .diagnostic_summaries
 469            .iter()
 470            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 471            .collect();
 472
 473        let shared_worktree = self.state_mut().share_worktree(
 474            request.payload.project_id,
 475            worktree.id,
 476            request.sender_id,
 477            entries,
 478            diagnostic_summaries,
 479        )?;
 480
 481        broadcast(
 482            request.sender_id,
 483            shared_worktree.connection_ids,
 484            |connection_id| {
 485                self.peer
 486                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 487            },
 488        )?;
 489        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 490
 491        Ok(proto::Ack {})
 492    }
 493
 494    async fn update_worktree(
 495        mut self: Arc<Server>,
 496        request: TypedEnvelope<proto::UpdateWorktree>,
 497    ) -> tide::Result<()> {
 498        let connection_ids = self.state_mut().update_worktree(
 499            request.sender_id,
 500            request.payload.project_id,
 501            request.payload.worktree_id,
 502            &request.payload.removed_entries,
 503            &request.payload.updated_entries,
 504        )?;
 505
 506        broadcast(request.sender_id, connection_ids, |connection_id| {
 507            self.peer
 508                .forward_send(request.sender_id, connection_id, request.payload.clone())
 509        })?;
 510
 511        Ok(())
 512    }
 513
 514    async fn update_diagnostic_summary(
 515        mut self: Arc<Server>,
 516        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 517    ) -> tide::Result<()> {
 518        let summary = request
 519            .payload
 520            .summary
 521            .clone()
 522            .ok_or_else(|| anyhow!("invalid summary"))?;
 523        let receiver_ids = self.state_mut().update_diagnostic_summary(
 524            request.payload.project_id,
 525            request.payload.worktree_id,
 526            request.sender_id,
 527            summary,
 528        )?;
 529
 530        broadcast(request.sender_id, receiver_ids, |connection_id| {
 531            self.peer
 532                .forward_send(request.sender_id, connection_id, request.payload.clone())
 533        })?;
 534        Ok(())
 535    }
 536
 537    async fn disk_based_diagnostics_updating(
 538        self: Arc<Server>,
 539        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 540    ) -> tide::Result<()> {
 541        let receiver_ids = self
 542            .state()
 543            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 544        broadcast(request.sender_id, receiver_ids, |connection_id| {
 545            self.peer
 546                .forward_send(request.sender_id, connection_id, request.payload.clone())
 547        })?;
 548        Ok(())
 549    }
 550
 551    async fn disk_based_diagnostics_updated(
 552        self: Arc<Server>,
 553        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 554    ) -> tide::Result<()> {
 555        let receiver_ids = self
 556            .state()
 557            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 558        broadcast(request.sender_id, receiver_ids, |connection_id| {
 559            self.peer
 560                .forward_send(request.sender_id, connection_id, request.payload.clone())
 561        })?;
 562        Ok(())
 563    }
 564
 565    async fn get_definition(
 566        self: Arc<Server>,
 567        request: TypedEnvelope<proto::GetDefinition>,
 568    ) -> tide::Result<proto::GetDefinitionResponse> {
 569        let host_connection_id = self
 570            .state()
 571            .read_project(request.payload.project_id, request.sender_id)?
 572            .host_connection_id;
 573        Ok(self
 574            .peer
 575            .forward_request(request.sender_id, host_connection_id, request.payload)
 576            .await?)
 577    }
 578
 579    async fn open_buffer(
 580        self: Arc<Server>,
 581        request: TypedEnvelope<proto::OpenBuffer>,
 582    ) -> tide::Result<proto::OpenBufferResponse> {
 583        let host_connection_id = self
 584            .state()
 585            .read_project(request.payload.project_id, request.sender_id)?
 586            .host_connection_id;
 587        Ok(self
 588            .peer
 589            .forward_request(request.sender_id, host_connection_id, request.payload)
 590            .await?)
 591    }
 592
 593    async fn close_buffer(
 594        self: Arc<Server>,
 595        request: TypedEnvelope<proto::CloseBuffer>,
 596    ) -> tide::Result<()> {
 597        let host_connection_id = self
 598            .state()
 599            .read_project(request.payload.project_id, request.sender_id)?
 600            .host_connection_id;
 601        self.peer
 602            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 603        Ok(())
 604    }
 605
 606    async fn save_buffer(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::SaveBuffer>,
 609    ) -> tide::Result<proto::BufferSaved> {
 610        let host;
 611        let mut guests;
 612        {
 613            let state = self.state();
 614            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 615            host = project.host_connection_id;
 616            guests = project.guest_connection_ids()
 617        }
 618
 619        let response = self
 620            .peer
 621            .forward_request(request.sender_id, host, request.payload.clone())
 622            .await?;
 623
 624        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 625        broadcast(host, guests, |conn_id| {
 626            self.peer.forward_send(host, conn_id, response.clone())
 627        })?;
 628
 629        Ok(response)
 630    }
 631
 632    async fn format_buffers(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::FormatBuffers>,
 635    ) -> tide::Result<proto::FormatBuffersResponse> {
 636        let host = self
 637            .state()
 638            .read_project(request.payload.project_id, request.sender_id)?
 639            .host_connection_id;
 640        Ok(self
 641            .peer
 642            .forward_request(request.sender_id, host, request.payload.clone())
 643            .await?)
 644    }
 645
 646    async fn get_completions(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::GetCompletions>,
 649    ) -> tide::Result<proto::GetCompletionsResponse> {
 650        let host = self
 651            .state()
 652            .read_project(request.payload.project_id, request.sender_id)?
 653            .host_connection_id;
 654        Ok(self
 655            .peer
 656            .forward_request(request.sender_id, host, request.payload.clone())
 657            .await?)
 658    }
 659
 660    async fn apply_additional_edits_for_completion(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 663    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 664        let host = self
 665            .state()
 666            .read_project(request.payload.project_id, request.sender_id)?
 667            .host_connection_id;
 668        Ok(self
 669            .peer
 670            .forward_request(request.sender_id, host, request.payload.clone())
 671            .await?)
 672    }
 673
 674    async fn get_code_actions(
 675        self: Arc<Server>,
 676        request: TypedEnvelope<proto::GetCodeActions>,
 677    ) -> tide::Result<proto::GetCodeActionsResponse> {
 678        let host = self
 679            .state()
 680            .read_project(request.payload.project_id, request.sender_id)?
 681            .host_connection_id;
 682        Ok(self
 683            .peer
 684            .forward_request(request.sender_id, host, request.payload.clone())
 685            .await?)
 686    }
 687
 688    async fn apply_code_action(
 689        self: Arc<Server>,
 690        request: TypedEnvelope<proto::ApplyCodeAction>,
 691    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 692        let host = self
 693            .state()
 694            .read_project(request.payload.project_id, request.sender_id)?
 695            .host_connection_id;
 696        Ok(self
 697            .peer
 698            .forward_request(request.sender_id, host, request.payload.clone())
 699            .await?)
 700    }
 701
 702    async fn update_buffer(
 703        self: Arc<Server>,
 704        request: TypedEnvelope<proto::UpdateBuffer>,
 705    ) -> tide::Result<proto::Ack> {
 706        let receiver_ids = self
 707            .state()
 708            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 709        broadcast(request.sender_id, receiver_ids, |connection_id| {
 710            self.peer
 711                .forward_send(request.sender_id, connection_id, request.payload.clone())
 712        })?;
 713        Ok(proto::Ack {})
 714    }
 715
 716    async fn update_buffer_file(
 717        self: Arc<Server>,
 718        request: TypedEnvelope<proto::UpdateBufferFile>,
 719    ) -> tide::Result<()> {
 720        let receiver_ids = self
 721            .state()
 722            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 723        broadcast(request.sender_id, receiver_ids, |connection_id| {
 724            self.peer
 725                .forward_send(request.sender_id, connection_id, request.payload.clone())
 726        })?;
 727        Ok(())
 728    }
 729
 730    async fn buffer_reloaded(
 731        self: Arc<Server>,
 732        request: TypedEnvelope<proto::BufferReloaded>,
 733    ) -> tide::Result<()> {
 734        let receiver_ids = self
 735            .state()
 736            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 737        broadcast(request.sender_id, receiver_ids, |connection_id| {
 738            self.peer
 739                .forward_send(request.sender_id, connection_id, request.payload.clone())
 740        })?;
 741        Ok(())
 742    }
 743
 744    async fn buffer_saved(
 745        self: Arc<Server>,
 746        request: TypedEnvelope<proto::BufferSaved>,
 747    ) -> tide::Result<()> {
 748        let receiver_ids = self
 749            .state()
 750            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 751        broadcast(request.sender_id, receiver_ids, |connection_id| {
 752            self.peer
 753                .forward_send(request.sender_id, connection_id, request.payload.clone())
 754        })?;
 755        Ok(())
 756    }
 757
 758    async fn get_channels(
 759        self: Arc<Server>,
 760        request: TypedEnvelope<proto::GetChannels>,
 761    ) -> tide::Result<proto::GetChannelsResponse> {
 762        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 763        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 764        Ok(proto::GetChannelsResponse {
 765            channels: channels
 766                .into_iter()
 767                .map(|chan| proto::Channel {
 768                    id: chan.id.to_proto(),
 769                    name: chan.name,
 770                })
 771                .collect(),
 772        })
 773    }
 774
 775    async fn get_users(
 776        self: Arc<Server>,
 777        request: TypedEnvelope<proto::GetUsers>,
 778    ) -> tide::Result<proto::GetUsersResponse> {
 779        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 780        let users = self
 781            .app_state
 782            .db
 783            .get_users_by_ids(user_ids)
 784            .await?
 785            .into_iter()
 786            .map(|user| proto::User {
 787                id: user.id.to_proto(),
 788                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 789                github_login: user.github_login,
 790            })
 791            .collect();
 792        Ok(proto::GetUsersResponse { users })
 793    }
 794
 795    fn update_contacts_for_users<'a>(
 796        self: &Arc<Server>,
 797        user_ids: impl IntoIterator<Item = &'a UserId>,
 798    ) -> anyhow::Result<()> {
 799        let mut result = Ok(());
 800        let state = self.state();
 801        for user_id in user_ids {
 802            let contacts = state.contacts_for_user(*user_id);
 803            for connection_id in state.connection_ids_for_user(*user_id) {
 804                if let Err(error) = self.peer.send(
 805                    connection_id,
 806                    proto::UpdateContacts {
 807                        contacts: contacts.clone(),
 808                    },
 809                ) {
 810                    result = Err(error);
 811                }
 812            }
 813        }
 814        result
 815    }
 816
 817    async fn join_channel(
 818        mut self: Arc<Self>,
 819        request: TypedEnvelope<proto::JoinChannel>,
 820    ) -> tide::Result<proto::JoinChannelResponse> {
 821        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 822        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 823        if !self
 824            .app_state
 825            .db
 826            .can_user_access_channel(user_id, channel_id)
 827            .await?
 828        {
 829            Err(anyhow!("access denied"))?;
 830        }
 831
 832        self.state_mut().join_channel(request.sender_id, channel_id);
 833        let messages = self
 834            .app_state
 835            .db
 836            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 837            .await?
 838            .into_iter()
 839            .map(|msg| proto::ChannelMessage {
 840                id: msg.id.to_proto(),
 841                body: msg.body,
 842                timestamp: msg.sent_at.unix_timestamp() as u64,
 843                sender_id: msg.sender_id.to_proto(),
 844                nonce: Some(msg.nonce.as_u128().into()),
 845            })
 846            .collect::<Vec<_>>();
 847        Ok(proto::JoinChannelResponse {
 848            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 849            messages,
 850        })
 851    }
 852
 853    async fn leave_channel(
 854        mut self: Arc<Self>,
 855        request: TypedEnvelope<proto::LeaveChannel>,
 856    ) -> tide::Result<()> {
 857        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 858        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 859        if !self
 860            .app_state
 861            .db
 862            .can_user_access_channel(user_id, channel_id)
 863            .await?
 864        {
 865            Err(anyhow!("access denied"))?;
 866        }
 867
 868        self.state_mut()
 869            .leave_channel(request.sender_id, channel_id);
 870
 871        Ok(())
 872    }
 873
 874    async fn send_channel_message(
 875        self: Arc<Self>,
 876        request: TypedEnvelope<proto::SendChannelMessage>,
 877    ) -> tide::Result<proto::SendChannelMessageResponse> {
 878        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 879        let user_id;
 880        let connection_ids;
 881        {
 882            let state = self.state();
 883            user_id = state.user_id_for_connection(request.sender_id)?;
 884            connection_ids = state.channel_connection_ids(channel_id)?;
 885        }
 886
 887        // Validate the message body.
 888        let body = request.payload.body.trim().to_string();
 889        if body.len() > MAX_MESSAGE_LEN {
 890            return Err(anyhow!("message is too long"))?;
 891        }
 892        if body.is_empty() {
 893            return Err(anyhow!("message can't be blank"))?;
 894        }
 895
 896        let timestamp = OffsetDateTime::now_utc();
 897        let nonce = request
 898            .payload
 899            .nonce
 900            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 901
 902        let message_id = self
 903            .app_state
 904            .db
 905            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 906            .await?
 907            .to_proto();
 908        let message = proto::ChannelMessage {
 909            sender_id: user_id.to_proto(),
 910            id: message_id,
 911            body,
 912            timestamp: timestamp.unix_timestamp() as u64,
 913            nonce: Some(nonce),
 914        };
 915        broadcast(request.sender_id, connection_ids, |conn_id| {
 916            self.peer.send(
 917                conn_id,
 918                proto::ChannelMessageSent {
 919                    channel_id: channel_id.to_proto(),
 920                    message: Some(message.clone()),
 921                },
 922            )
 923        })?;
 924        Ok(proto::SendChannelMessageResponse {
 925            message: Some(message),
 926        })
 927    }
 928
 929    async fn get_channel_messages(
 930        self: Arc<Self>,
 931        request: TypedEnvelope<proto::GetChannelMessages>,
 932    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 933        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 934        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 935        if !self
 936            .app_state
 937            .db
 938            .can_user_access_channel(user_id, channel_id)
 939            .await?
 940        {
 941            Err(anyhow!("access denied"))?;
 942        }
 943
 944        let messages = self
 945            .app_state
 946            .db
 947            .get_channel_messages(
 948                channel_id,
 949                MESSAGE_COUNT_PER_PAGE,
 950                Some(MessageId::from_proto(request.payload.before_message_id)),
 951            )
 952            .await?
 953            .into_iter()
 954            .map(|msg| proto::ChannelMessage {
 955                id: msg.id.to_proto(),
 956                body: msg.body,
 957                timestamp: msg.sent_at.unix_timestamp() as u64,
 958                sender_id: msg.sender_id.to_proto(),
 959                nonce: Some(msg.nonce.as_u128().into()),
 960            })
 961            .collect::<Vec<_>>();
 962
 963        Ok(proto::GetChannelMessagesResponse {
 964            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 965            messages,
 966        })
 967    }
 968
 969    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 970        self.store.read()
 971    }
 972
 973    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 974        self.store.write()
 975    }
 976}
 977
 978impl Executor for RealExecutor {
 979    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 980        task::spawn(future);
 981    }
 982}
 983
 984fn broadcast<F>(
 985    sender_id: ConnectionId,
 986    receiver_ids: Vec<ConnectionId>,
 987    mut f: F,
 988) -> anyhow::Result<()>
 989where
 990    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 991{
 992    let mut result = Ok(());
 993    for receiver_id in receiver_ids {
 994        if receiver_id != sender_id {
 995            if let Err(error) = f(receiver_id) {
 996                if result.is_ok() {
 997                    result = Err(error);
 998                }
 999            }
1000        }
1001    }
1002    result
1003}
1004
1005pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1006    let server = Server::new(app.state().clone(), rpc.clone(), None);
1007    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1008        let server = server.clone();
1009        async move {
1010            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1011
1012            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1013            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1014            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1015            let client_protocol_version: Option<u32> = request
1016                .header("X-Zed-Protocol-Version")
1017                .and_then(|v| v.as_str().parse().ok());
1018
1019            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1020                return Ok(Response::new(StatusCode::UpgradeRequired));
1021            }
1022
1023            let header = match request.header("Sec-Websocket-Key") {
1024                Some(h) => h.as_str(),
1025                None => return Err(anyhow!("expected sec-websocket-key"))?,
1026            };
1027
1028            let user_id = process_auth_header(&request).await?;
1029
1030            let mut response = Response::new(StatusCode::SwitchingProtocols);
1031            response.insert_header(UPGRADE, "websocket");
1032            response.insert_header(CONNECTION, "Upgrade");
1033            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1034            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1035            response.insert_header("Sec-Websocket-Version", "13");
1036
1037            let http_res: &mut tide::http::Response = response.as_mut();
1038            let upgrade_receiver = http_res.recv_upgrade().await;
1039            let addr = request.remote().unwrap_or("unknown").to_string();
1040            task::spawn(async move {
1041                if let Some(stream) = upgrade_receiver.await {
1042                    server
1043                        .handle_connection(
1044                            Connection::new(
1045                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1046                            ),
1047                            addr,
1048                            user_id,
1049                            None,
1050                            RealExecutor,
1051                        )
1052                        .await;
1053                }
1054            });
1055
1056            Ok(response)
1057        }
1058    });
1059}
1060
1061fn header_contains_ignore_case<T>(
1062    request: &tide::Request<T>,
1063    header_name: HeaderName,
1064    value: &str,
1065) -> bool {
1066    request
1067        .header(header_name)
1068        .map(|h| {
1069            h.as_str()
1070                .split(',')
1071                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1072        })
1073        .unwrap_or(false)
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::*;
1079    use crate::{
1080        auth,
1081        db::{tests::TestDb, UserId},
1082        github, AppState, Config,
1083    };
1084    use ::rpc::Peer;
1085    use async_std::task;
1086    use gpui::{executor, ModelHandle, TestAppContext};
1087    use parking_lot::Mutex;
1088    use postage::{mpsc, watch};
1089    use rand::prelude::*;
1090    use rpc::PeerId;
1091    use serde_json::json;
1092    use sqlx::types::time::OffsetDateTime;
1093    use std::{
1094        ops::Deref,
1095        path::Path,
1096        rc::Rc,
1097        sync::{
1098            atomic::{AtomicBool, Ordering::SeqCst},
1099            Arc,
1100        },
1101        time::Duration,
1102    };
1103    use zed::{
1104        client::{
1105            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1106            EstablishConnectionError, UserStore,
1107        },
1108        editor::{
1109            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1110            Redo, ToggleCodeActions, Undo,
1111        },
1112        fs::{FakeFs, Fs as _},
1113        language::{
1114            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1115            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1116        },
1117        lsp,
1118        project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath},
1119        workspace::{Workspace, WorkspaceParams},
1120    };
1121
1122    #[cfg(test)]
1123    #[ctor::ctor]
1124    fn init_logger() {
1125        if std::env::var("RUST_LOG").is_ok() {
1126            env_logger::init();
1127        }
1128    }
1129
1130    #[gpui::test(iterations = 10)]
1131    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1132        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1133        let lang_registry = Arc::new(LanguageRegistry::new());
1134        let fs = Arc::new(FakeFs::new(cx_a.background()));
1135        cx_a.foreground().forbid_parking();
1136
1137        // Connect to a server as 2 clients.
1138        let mut server = TestServer::start(cx_a.foreground()).await;
1139        let client_a = server.create_client(&mut cx_a, "user_a").await;
1140        let client_b = server.create_client(&mut cx_b, "user_b").await;
1141
1142        // Share a project as client A
1143        fs.insert_tree(
1144            "/a",
1145            json!({
1146                ".zed.toml": r#"collaborators = ["user_b"]"#,
1147                "a.txt": "a-contents",
1148                "b.txt": "b-contents",
1149            }),
1150        )
1151        .await;
1152        let project_a = cx_a.update(|cx| {
1153            Project::local(
1154                client_a.clone(),
1155                client_a.user_store.clone(),
1156                lang_registry.clone(),
1157                fs.clone(),
1158                cx,
1159            )
1160        });
1161        let (worktree_a, _) = project_a
1162            .update(&mut cx_a, |p, cx| {
1163                p.find_or_create_local_worktree("/a", false, cx)
1164            })
1165            .await
1166            .unwrap();
1167        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1168        worktree_a
1169            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1170            .await;
1171        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1172        project_a
1173            .update(&mut cx_a, |p, cx| p.share(cx))
1174            .await
1175            .unwrap();
1176
1177        // Join that project as client B
1178        let project_b = Project::remote(
1179            project_id,
1180            client_b.clone(),
1181            client_b.user_store.clone(),
1182            lang_registry.clone(),
1183            fs.clone(),
1184            &mut cx_b.to_async(),
1185        )
1186        .await
1187        .unwrap();
1188
1189        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1190            assert_eq!(
1191                project
1192                    .collaborators()
1193                    .get(&client_a.peer_id)
1194                    .unwrap()
1195                    .user
1196                    .github_login,
1197                "user_a"
1198            );
1199            project.replica_id()
1200        });
1201        project_a
1202            .condition(&cx_a, |tree, _| {
1203                tree.collaborators()
1204                    .get(&client_b.peer_id)
1205                    .map_or(false, |collaborator| {
1206                        collaborator.replica_id == replica_id_b
1207                            && collaborator.user.github_login == "user_b"
1208                    })
1209            })
1210            .await;
1211
1212        // Open the same file as client B and client A.
1213        let buffer_b = project_b
1214            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1215            .await
1216            .unwrap();
1217        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1218        buffer_b.read_with(&cx_b, |buf, cx| {
1219            assert_eq!(buf.read(cx).text(), "b-contents")
1220        });
1221        project_a.read_with(&cx_a, |project, cx| {
1222            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1223        });
1224        let buffer_a = project_a
1225            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1226            .await
1227            .unwrap();
1228
1229        let editor_b = cx_b.add_view(window_b, |cx| {
1230            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1231        });
1232
1233        // TODO
1234        // // Create a selection set as client B and see that selection set as client A.
1235        // buffer_a
1236        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1237        //     .await;
1238
1239        // Edit the buffer as client B and see that edit as client A.
1240        editor_b.update(&mut cx_b, |editor, cx| {
1241            editor.handle_input(&Input("ok, ".into()), cx)
1242        });
1243        buffer_a
1244            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1245            .await;
1246
1247        // TODO
1248        // // Remove the selection set as client B, see those selections disappear as client A.
1249        cx_b.update(move |_| drop(editor_b));
1250        // buffer_a
1251        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1252        //     .await;
1253
1254        // Close the buffer as client A, see that the buffer is closed.
1255        cx_a.update(move |_| drop(buffer_a));
1256        project_a
1257            .condition(&cx_a, |project, cx| {
1258                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1259            })
1260            .await;
1261
1262        // Dropping the client B's project removes client B from client A's collaborators.
1263        cx_b.update(move |_| drop(project_b));
1264        project_a
1265            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1266            .await;
1267    }
1268
1269    #[gpui::test(iterations = 10)]
1270    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1271        let lang_registry = Arc::new(LanguageRegistry::new());
1272        let fs = Arc::new(FakeFs::new(cx_a.background()));
1273        cx_a.foreground().forbid_parking();
1274
1275        // Connect to a server as 2 clients.
1276        let mut server = TestServer::start(cx_a.foreground()).await;
1277        let client_a = server.create_client(&mut cx_a, "user_a").await;
1278        let client_b = server.create_client(&mut cx_b, "user_b").await;
1279
1280        // Share a project as client A
1281        fs.insert_tree(
1282            "/a",
1283            json!({
1284                ".zed.toml": r#"collaborators = ["user_b"]"#,
1285                "a.txt": "a-contents",
1286                "b.txt": "b-contents",
1287            }),
1288        )
1289        .await;
1290        let project_a = cx_a.update(|cx| {
1291            Project::local(
1292                client_a.clone(),
1293                client_a.user_store.clone(),
1294                lang_registry.clone(),
1295                fs.clone(),
1296                cx,
1297            )
1298        });
1299        let (worktree_a, _) = project_a
1300            .update(&mut cx_a, |p, cx| {
1301                p.find_or_create_local_worktree("/a", false, cx)
1302            })
1303            .await
1304            .unwrap();
1305        worktree_a
1306            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1307            .await;
1308        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1309        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1310        project_a
1311            .update(&mut cx_a, |p, cx| p.share(cx))
1312            .await
1313            .unwrap();
1314        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1315
1316        // Join that project as client B
1317        let project_b = Project::remote(
1318            project_id,
1319            client_b.clone(),
1320            client_b.user_store.clone(),
1321            lang_registry.clone(),
1322            fs.clone(),
1323            &mut cx_b.to_async(),
1324        )
1325        .await
1326        .unwrap();
1327        project_b
1328            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1329            .await
1330            .unwrap();
1331
1332        // Unshare the project as client A
1333        project_a
1334            .update(&mut cx_a, |project, cx| project.unshare(cx))
1335            .await
1336            .unwrap();
1337        project_b
1338            .condition(&mut cx_b, |project, _| project.is_read_only())
1339            .await;
1340        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1341        drop(project_b);
1342
1343        // Share the project again and ensure guests can still join.
1344        project_a
1345            .update(&mut cx_a, |project, cx| project.share(cx))
1346            .await
1347            .unwrap();
1348        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1349
1350        let project_c = Project::remote(
1351            project_id,
1352            client_b.clone(),
1353            client_b.user_store.clone(),
1354            lang_registry.clone(),
1355            fs.clone(),
1356            &mut cx_b.to_async(),
1357        )
1358        .await
1359        .unwrap();
1360        project_c
1361            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1362            .await
1363            .unwrap();
1364    }
1365
1366    #[gpui::test(iterations = 10)]
1367    async fn test_propagate_saves_and_fs_changes(
1368        mut cx_a: TestAppContext,
1369        mut cx_b: TestAppContext,
1370        mut cx_c: TestAppContext,
1371    ) {
1372        let lang_registry = Arc::new(LanguageRegistry::new());
1373        let fs = Arc::new(FakeFs::new(cx_a.background()));
1374        cx_a.foreground().forbid_parking();
1375
1376        // Connect to a server as 3 clients.
1377        let mut server = TestServer::start(cx_a.foreground()).await;
1378        let client_a = server.create_client(&mut cx_a, "user_a").await;
1379        let client_b = server.create_client(&mut cx_b, "user_b").await;
1380        let client_c = server.create_client(&mut cx_c, "user_c").await;
1381
1382        // Share a worktree as client A.
1383        fs.insert_tree(
1384            "/a",
1385            json!({
1386                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1387                "file1": "",
1388                "file2": ""
1389            }),
1390        )
1391        .await;
1392        let project_a = cx_a.update(|cx| {
1393            Project::local(
1394                client_a.clone(),
1395                client_a.user_store.clone(),
1396                lang_registry.clone(),
1397                fs.clone(),
1398                cx,
1399            )
1400        });
1401        let (worktree_a, _) = project_a
1402            .update(&mut cx_a, |p, cx| {
1403                p.find_or_create_local_worktree("/a", false, cx)
1404            })
1405            .await
1406            .unwrap();
1407        worktree_a
1408            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1409            .await;
1410        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1411        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1412        project_a
1413            .update(&mut cx_a, |p, cx| p.share(cx))
1414            .await
1415            .unwrap();
1416
1417        // Join that worktree as clients B and C.
1418        let project_b = Project::remote(
1419            project_id,
1420            client_b.clone(),
1421            client_b.user_store.clone(),
1422            lang_registry.clone(),
1423            fs.clone(),
1424            &mut cx_b.to_async(),
1425        )
1426        .await
1427        .unwrap();
1428        let project_c = Project::remote(
1429            project_id,
1430            client_c.clone(),
1431            client_c.user_store.clone(),
1432            lang_registry.clone(),
1433            fs.clone(),
1434            &mut cx_c.to_async(),
1435        )
1436        .await
1437        .unwrap();
1438        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1439        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1440
1441        // Open and edit a buffer as both guests B and C.
1442        let buffer_b = project_b
1443            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1444            .await
1445            .unwrap();
1446        let buffer_c = project_c
1447            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1448            .await
1449            .unwrap();
1450        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1451        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1452
1453        // Open and edit that buffer as the host.
1454        let buffer_a = project_a
1455            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1456            .await
1457            .unwrap();
1458
1459        buffer_a
1460            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1461            .await;
1462        buffer_a.update(&mut cx_a, |buf, cx| {
1463            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1464        });
1465
1466        // Wait for edits to propagate
1467        buffer_a
1468            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1469            .await;
1470        buffer_b
1471            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1472            .await;
1473        buffer_c
1474            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1475            .await;
1476
1477        // Edit the buffer as the host and concurrently save as guest B.
1478        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1479        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1480        save_b.await.unwrap();
1481        assert_eq!(
1482            fs.load("/a/file1".as_ref()).await.unwrap(),
1483            "hi-a, i-am-c, i-am-b, i-am-a"
1484        );
1485        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1486        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1487        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1488
1489        // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise
1490        // when interpreting the change event it will mistakenly think that the file has been
1491        // deleted (because its path has changed) and will subsequently fail to detect the rename.
1492        worktree_a.flush_fs_events(&cx_a).await;
1493
1494        // Make changes on host's file system, see those changes on guest worktrees.
1495        fs.rename(
1496            "/a/file1".as_ref(),
1497            "/a/file1-renamed".as_ref(),
1498            Default::default(),
1499        )
1500        .await
1501        .unwrap();
1502        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1503            .await
1504            .unwrap();
1505        fs.insert_file(Path::new("/a/file4"), "4".into())
1506            .await
1507            .unwrap();
1508
1509        worktree_a
1510            .condition(&cx_a, |tree, _| {
1511                tree.paths()
1512                    .map(|p| p.to_string_lossy())
1513                    .collect::<Vec<_>>()
1514                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1515            })
1516            .await;
1517        worktree_b
1518            .condition(&cx_b, |tree, _| {
1519                tree.paths()
1520                    .map(|p| p.to_string_lossy())
1521                    .collect::<Vec<_>>()
1522                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1523            })
1524            .await;
1525        worktree_c
1526            .condition(&cx_c, |tree, _| {
1527                tree.paths()
1528                    .map(|p| p.to_string_lossy())
1529                    .collect::<Vec<_>>()
1530                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1531            })
1532            .await;
1533
1534        // Ensure buffer files are updated as well.
1535        buffer_a
1536            .condition(&cx_a, |buf, _| {
1537                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1538            })
1539            .await;
1540        buffer_b
1541            .condition(&cx_b, |buf, _| {
1542                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1543            })
1544            .await;
1545        buffer_c
1546            .condition(&cx_c, |buf, _| {
1547                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1548            })
1549            .await;
1550    }
1551
1552    #[gpui::test(iterations = 10)]
1553    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1554        cx_a.foreground().forbid_parking();
1555        let lang_registry = Arc::new(LanguageRegistry::new());
1556        let fs = Arc::new(FakeFs::new(cx_a.background()));
1557
1558        // Connect to a server as 2 clients.
1559        let mut server = TestServer::start(cx_a.foreground()).await;
1560        let client_a = server.create_client(&mut cx_a, "user_a").await;
1561        let client_b = server.create_client(&mut cx_b, "user_b").await;
1562
1563        // Share a project as client A
1564        fs.insert_tree(
1565            "/dir",
1566            json!({
1567                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1568                "a.txt": "a-contents",
1569            }),
1570        )
1571        .await;
1572
1573        let project_a = cx_a.update(|cx| {
1574            Project::local(
1575                client_a.clone(),
1576                client_a.user_store.clone(),
1577                lang_registry.clone(),
1578                fs.clone(),
1579                cx,
1580            )
1581        });
1582        let (worktree_a, _) = project_a
1583            .update(&mut cx_a, |p, cx| {
1584                p.find_or_create_local_worktree("/dir", false, cx)
1585            })
1586            .await
1587            .unwrap();
1588        worktree_a
1589            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1590            .await;
1591        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1592        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1593        project_a
1594            .update(&mut cx_a, |p, cx| p.share(cx))
1595            .await
1596            .unwrap();
1597
1598        // Join that project as client B
1599        let project_b = Project::remote(
1600            project_id,
1601            client_b.clone(),
1602            client_b.user_store.clone(),
1603            lang_registry.clone(),
1604            fs.clone(),
1605            &mut cx_b.to_async(),
1606        )
1607        .await
1608        .unwrap();
1609        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1610
1611        // Open a buffer as client B
1612        let buffer_b = project_b
1613            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1614            .await
1615            .unwrap();
1616        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1617
1618        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1619        buffer_b.read_with(&cx_b, |buf, _| {
1620            assert!(buf.is_dirty());
1621            assert!(!buf.has_conflict());
1622        });
1623
1624        buffer_b
1625            .update(&mut cx_b, |buf, cx| buf.save(cx))
1626            .await
1627            .unwrap();
1628        worktree_b
1629            .condition(&cx_b, |_, cx| {
1630                buffer_b.read(cx).file().unwrap().mtime() != mtime
1631            })
1632            .await;
1633        buffer_b.read_with(&cx_b, |buf, _| {
1634            assert!(!buf.is_dirty());
1635            assert!(!buf.has_conflict());
1636        });
1637
1638        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1639        buffer_b.read_with(&cx_b, |buf, _| {
1640            assert!(buf.is_dirty());
1641            assert!(!buf.has_conflict());
1642        });
1643    }
1644
1645    #[gpui::test(iterations = 10)]
1646    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1647        cx_a.foreground().forbid_parking();
1648        let lang_registry = Arc::new(LanguageRegistry::new());
1649        let fs = Arc::new(FakeFs::new(cx_a.background()));
1650
1651        // Connect to a server as 2 clients.
1652        let mut server = TestServer::start(cx_a.foreground()).await;
1653        let client_a = server.create_client(&mut cx_a, "user_a").await;
1654        let client_b = server.create_client(&mut cx_b, "user_b").await;
1655
1656        // Share a project as client A
1657        fs.insert_tree(
1658            "/dir",
1659            json!({
1660                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1661                "a.txt": "a-contents",
1662            }),
1663        )
1664        .await;
1665
1666        let project_a = cx_a.update(|cx| {
1667            Project::local(
1668                client_a.clone(),
1669                client_a.user_store.clone(),
1670                lang_registry.clone(),
1671                fs.clone(),
1672                cx,
1673            )
1674        });
1675        let (worktree_a, _) = project_a
1676            .update(&mut cx_a, |p, cx| {
1677                p.find_or_create_local_worktree("/dir", false, cx)
1678            })
1679            .await
1680            .unwrap();
1681        worktree_a
1682            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1683            .await;
1684        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1685        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1686        project_a
1687            .update(&mut cx_a, |p, cx| p.share(cx))
1688            .await
1689            .unwrap();
1690
1691        // Join that project as client B
1692        let project_b = Project::remote(
1693            project_id,
1694            client_b.clone(),
1695            client_b.user_store.clone(),
1696            lang_registry.clone(),
1697            fs.clone(),
1698            &mut cx_b.to_async(),
1699        )
1700        .await
1701        .unwrap();
1702        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1703
1704        // Open a buffer as client B
1705        let buffer_b = project_b
1706            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1707            .await
1708            .unwrap();
1709        buffer_b.read_with(&cx_b, |buf, _| {
1710            assert!(!buf.is_dirty());
1711            assert!(!buf.has_conflict());
1712        });
1713
1714        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1715            .await
1716            .unwrap();
1717        buffer_b
1718            .condition(&cx_b, |buf, _| {
1719                buf.text() == "new contents" && !buf.is_dirty()
1720            })
1721            .await;
1722        buffer_b.read_with(&cx_b, |buf, _| {
1723            assert!(!buf.has_conflict());
1724        });
1725    }
1726
1727    #[gpui::test(iterations = 10)]
1728    async fn test_editing_while_guest_opens_buffer(
1729        mut cx_a: TestAppContext,
1730        mut cx_b: TestAppContext,
1731    ) {
1732        cx_a.foreground().forbid_parking();
1733        let lang_registry = Arc::new(LanguageRegistry::new());
1734        let fs = Arc::new(FakeFs::new(cx_a.background()));
1735
1736        // Connect to a server as 2 clients.
1737        let mut server = TestServer::start(cx_a.foreground()).await;
1738        let client_a = server.create_client(&mut cx_a, "user_a").await;
1739        let client_b = server.create_client(&mut cx_b, "user_b").await;
1740
1741        // Share a project as client A
1742        fs.insert_tree(
1743            "/dir",
1744            json!({
1745                ".zed.toml": r#"collaborators = ["user_b"]"#,
1746                "a.txt": "a-contents",
1747            }),
1748        )
1749        .await;
1750        let project_a = cx_a.update(|cx| {
1751            Project::local(
1752                client_a.clone(),
1753                client_a.user_store.clone(),
1754                lang_registry.clone(),
1755                fs.clone(),
1756                cx,
1757            )
1758        });
1759        let (worktree_a, _) = project_a
1760            .update(&mut cx_a, |p, cx| {
1761                p.find_or_create_local_worktree("/dir", false, cx)
1762            })
1763            .await
1764            .unwrap();
1765        worktree_a
1766            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1767            .await;
1768        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1769        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1770        project_a
1771            .update(&mut cx_a, |p, cx| p.share(cx))
1772            .await
1773            .unwrap();
1774
1775        // Join that project as client B
1776        let project_b = Project::remote(
1777            project_id,
1778            client_b.clone(),
1779            client_b.user_store.clone(),
1780            lang_registry.clone(),
1781            fs.clone(),
1782            &mut cx_b.to_async(),
1783        )
1784        .await
1785        .unwrap();
1786
1787        // Open a buffer as client A
1788        let buffer_a = project_a
1789            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1790            .await
1791            .unwrap();
1792
1793        // Start opening the same buffer as client B
1794        let buffer_b = cx_b
1795            .background()
1796            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1797
1798        // Edit the buffer as client A while client B is still opening it.
1799        cx_b.background().simulate_random_delay().await;
1800        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1801        cx_b.background().simulate_random_delay().await;
1802        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1803
1804        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1805        let buffer_b = buffer_b.await.unwrap();
1806        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1807    }
1808
1809    #[gpui::test(iterations = 10)]
1810    async fn test_leaving_worktree_while_opening_buffer(
1811        mut cx_a: TestAppContext,
1812        mut cx_b: TestAppContext,
1813    ) {
1814        cx_a.foreground().forbid_parking();
1815        let lang_registry = Arc::new(LanguageRegistry::new());
1816        let fs = Arc::new(FakeFs::new(cx_a.background()));
1817
1818        // Connect to a server as 2 clients.
1819        let mut server = TestServer::start(cx_a.foreground()).await;
1820        let client_a = server.create_client(&mut cx_a, "user_a").await;
1821        let client_b = server.create_client(&mut cx_b, "user_b").await;
1822
1823        // Share a project as client A
1824        fs.insert_tree(
1825            "/dir",
1826            json!({
1827                ".zed.toml": r#"collaborators = ["user_b"]"#,
1828                "a.txt": "a-contents",
1829            }),
1830        )
1831        .await;
1832        let project_a = cx_a.update(|cx| {
1833            Project::local(
1834                client_a.clone(),
1835                client_a.user_store.clone(),
1836                lang_registry.clone(),
1837                fs.clone(),
1838                cx,
1839            )
1840        });
1841        let (worktree_a, _) = project_a
1842            .update(&mut cx_a, |p, cx| {
1843                p.find_or_create_local_worktree("/dir", false, cx)
1844            })
1845            .await
1846            .unwrap();
1847        worktree_a
1848            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1849            .await;
1850        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1851        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1852        project_a
1853            .update(&mut cx_a, |p, cx| p.share(cx))
1854            .await
1855            .unwrap();
1856
1857        // Join that project as client B
1858        let project_b = Project::remote(
1859            project_id,
1860            client_b.clone(),
1861            client_b.user_store.clone(),
1862            lang_registry.clone(),
1863            fs.clone(),
1864            &mut cx_b.to_async(),
1865        )
1866        .await
1867        .unwrap();
1868
1869        // See that a guest has joined as client A.
1870        project_a
1871            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1872            .await;
1873
1874        // Begin opening a buffer as client B, but leave the project before the open completes.
1875        let buffer_b = cx_b
1876            .background()
1877            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1878        cx_b.update(|_| drop(project_b));
1879        drop(buffer_b);
1880
1881        // See that the guest has left.
1882        project_a
1883            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1884            .await;
1885    }
1886
1887    #[gpui::test(iterations = 10)]
1888    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1889        cx_a.foreground().forbid_parking();
1890        let lang_registry = Arc::new(LanguageRegistry::new());
1891        let fs = Arc::new(FakeFs::new(cx_a.background()));
1892
1893        // Connect to a server as 2 clients.
1894        let mut server = TestServer::start(cx_a.foreground()).await;
1895        let client_a = server.create_client(&mut cx_a, "user_a").await;
1896        let client_b = server.create_client(&mut cx_b, "user_b").await;
1897
1898        // Share a project as client A
1899        fs.insert_tree(
1900            "/a",
1901            json!({
1902                ".zed.toml": r#"collaborators = ["user_b"]"#,
1903                "a.txt": "a-contents",
1904                "b.txt": "b-contents",
1905            }),
1906        )
1907        .await;
1908        let project_a = cx_a.update(|cx| {
1909            Project::local(
1910                client_a.clone(),
1911                client_a.user_store.clone(),
1912                lang_registry.clone(),
1913                fs.clone(),
1914                cx,
1915            )
1916        });
1917        let (worktree_a, _) = project_a
1918            .update(&mut cx_a, |p, cx| {
1919                p.find_or_create_local_worktree("/a", false, cx)
1920            })
1921            .await
1922            .unwrap();
1923        worktree_a
1924            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1925            .await;
1926        let project_id = project_a
1927            .update(&mut cx_a, |project, _| project.next_remote_id())
1928            .await;
1929        project_a
1930            .update(&mut cx_a, |project, cx| project.share(cx))
1931            .await
1932            .unwrap();
1933
1934        // Join that project as client B
1935        let _project_b = Project::remote(
1936            project_id,
1937            client_b.clone(),
1938            client_b.user_store.clone(),
1939            lang_registry.clone(),
1940            fs.clone(),
1941            &mut cx_b.to_async(),
1942        )
1943        .await
1944        .unwrap();
1945
1946        // See that a guest has joined as client A.
1947        project_a
1948            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1949            .await;
1950
1951        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1952        client_b.disconnect(&cx_b.to_async()).unwrap();
1953        project_a
1954            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1955            .await;
1956    }
1957
1958    #[gpui::test(iterations = 10)]
1959    async fn test_collaborating_with_diagnostics(
1960        mut cx_a: TestAppContext,
1961        mut cx_b: TestAppContext,
1962    ) {
1963        cx_a.foreground().forbid_parking();
1964        let mut lang_registry = Arc::new(LanguageRegistry::new());
1965        let fs = Arc::new(FakeFs::new(cx_a.background()));
1966
1967        // Set up a fake language server.
1968        let (language_server_config, mut fake_language_server) =
1969            LanguageServerConfig::fake(&cx_a).await;
1970        Arc::get_mut(&mut lang_registry)
1971            .unwrap()
1972            .add(Arc::new(Language::new(
1973                LanguageConfig {
1974                    name: "Rust".to_string(),
1975                    path_suffixes: vec!["rs".to_string()],
1976                    language_server: Some(language_server_config),
1977                    ..Default::default()
1978                },
1979                Some(tree_sitter_rust::language()),
1980            )));
1981
1982        // Connect to a server as 2 clients.
1983        let mut server = TestServer::start(cx_a.foreground()).await;
1984        let client_a = server.create_client(&mut cx_a, "user_a").await;
1985        let client_b = server.create_client(&mut cx_b, "user_b").await;
1986
1987        // Share a project as client A
1988        fs.insert_tree(
1989            "/a",
1990            json!({
1991                ".zed.toml": r#"collaborators = ["user_b"]"#,
1992                "a.rs": "let one = two",
1993                "other.rs": "",
1994            }),
1995        )
1996        .await;
1997        let project_a = cx_a.update(|cx| {
1998            Project::local(
1999                client_a.clone(),
2000                client_a.user_store.clone(),
2001                lang_registry.clone(),
2002                fs.clone(),
2003                cx,
2004            )
2005        });
2006        let (worktree_a, _) = project_a
2007            .update(&mut cx_a, |p, cx| {
2008                p.find_or_create_local_worktree("/a", false, cx)
2009            })
2010            .await
2011            .unwrap();
2012        worktree_a
2013            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2014            .await;
2015        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2016        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2017        project_a
2018            .update(&mut cx_a, |p, cx| p.share(cx))
2019            .await
2020            .unwrap();
2021
2022        // Cause the language server to start.
2023        let _ = cx_a
2024            .background()
2025            .spawn(project_a.update(&mut cx_a, |project, cx| {
2026                project.open_buffer(
2027                    ProjectPath {
2028                        worktree_id,
2029                        path: Path::new("other.rs").into(),
2030                    },
2031                    cx,
2032                )
2033            }))
2034            .await
2035            .unwrap();
2036
2037        // Simulate a language server reporting errors for a file.
2038        fake_language_server
2039            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2040                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2041                version: None,
2042                diagnostics: vec![lsp::Diagnostic {
2043                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2044                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2045                    message: "message 1".to_string(),
2046                    ..Default::default()
2047                }],
2048            })
2049            .await;
2050
2051        // Wait for server to see the diagnostics update.
2052        server
2053            .condition(|store| {
2054                let worktree = store
2055                    .project(project_id)
2056                    .unwrap()
2057                    .worktrees
2058                    .get(&worktree_id.to_proto())
2059                    .unwrap();
2060
2061                !worktree
2062                    .share
2063                    .as_ref()
2064                    .unwrap()
2065                    .diagnostic_summaries
2066                    .is_empty()
2067            })
2068            .await;
2069
2070        // Join the worktree as client B.
2071        let project_b = Project::remote(
2072            project_id,
2073            client_b.clone(),
2074            client_b.user_store.clone(),
2075            lang_registry.clone(),
2076            fs.clone(),
2077            &mut cx_b.to_async(),
2078        )
2079        .await
2080        .unwrap();
2081
2082        project_b.read_with(&cx_b, |project, cx| {
2083            assert_eq!(
2084                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2085                &[(
2086                    ProjectPath {
2087                        worktree_id,
2088                        path: Arc::from(Path::new("a.rs")),
2089                    },
2090                    DiagnosticSummary {
2091                        error_count: 1,
2092                        warning_count: 0,
2093                        ..Default::default()
2094                    },
2095                )]
2096            )
2097        });
2098
2099        // Simulate a language server reporting more errors for a file.
2100        fake_language_server
2101            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2102                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2103                version: None,
2104                diagnostics: vec![
2105                    lsp::Diagnostic {
2106                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2107                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2108                        message: "message 1".to_string(),
2109                        ..Default::default()
2110                    },
2111                    lsp::Diagnostic {
2112                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2113                        range: lsp::Range::new(
2114                            lsp::Position::new(0, 10),
2115                            lsp::Position::new(0, 13),
2116                        ),
2117                        message: "message 2".to_string(),
2118                        ..Default::default()
2119                    },
2120                ],
2121            })
2122            .await;
2123
2124        // Client b gets the updated summaries
2125        project_b
2126            .condition(&cx_b, |project, cx| {
2127                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2128                    == &[(
2129                        ProjectPath {
2130                            worktree_id,
2131                            path: Arc::from(Path::new("a.rs")),
2132                        },
2133                        DiagnosticSummary {
2134                            error_count: 1,
2135                            warning_count: 1,
2136                            ..Default::default()
2137                        },
2138                    )]
2139            })
2140            .await;
2141
2142        // Open the file with the errors on client B. They should be present.
2143        let buffer_b = cx_b
2144            .background()
2145            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2146            .await
2147            .unwrap();
2148
2149        buffer_b.read_with(&cx_b, |buffer, _| {
2150            assert_eq!(
2151                buffer
2152                    .snapshot()
2153                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2154                    .map(|entry| entry)
2155                    .collect::<Vec<_>>(),
2156                &[
2157                    DiagnosticEntry {
2158                        range: Point::new(0, 4)..Point::new(0, 7),
2159                        diagnostic: Diagnostic {
2160                            group_id: 0,
2161                            message: "message 1".to_string(),
2162                            severity: lsp::DiagnosticSeverity::ERROR,
2163                            is_primary: true,
2164                            ..Default::default()
2165                        }
2166                    },
2167                    DiagnosticEntry {
2168                        range: Point::new(0, 10)..Point::new(0, 13),
2169                        diagnostic: Diagnostic {
2170                            group_id: 1,
2171                            severity: lsp::DiagnosticSeverity::WARNING,
2172                            message: "message 2".to_string(),
2173                            is_primary: true,
2174                            ..Default::default()
2175                        }
2176                    }
2177                ]
2178            );
2179        });
2180    }
2181
2182    #[gpui::test(iterations = 10)]
2183    async fn test_collaborating_with_completion(
2184        mut cx_a: TestAppContext,
2185        mut cx_b: TestAppContext,
2186    ) {
2187        cx_a.foreground().forbid_parking();
2188        let mut lang_registry = Arc::new(LanguageRegistry::new());
2189        let fs = Arc::new(FakeFs::new(cx_a.background()));
2190
2191        // Set up a fake language server.
2192        let (language_server_config, mut fake_language_server) =
2193            LanguageServerConfig::fake_with_capabilities(
2194                lsp::ServerCapabilities {
2195                    completion_provider: Some(lsp::CompletionOptions {
2196                        trigger_characters: Some(vec![".".to_string()]),
2197                        ..Default::default()
2198                    }),
2199                    ..Default::default()
2200                },
2201                &cx_a,
2202            )
2203            .await;
2204        Arc::get_mut(&mut lang_registry)
2205            .unwrap()
2206            .add(Arc::new(Language::new(
2207                LanguageConfig {
2208                    name: "Rust".to_string(),
2209                    path_suffixes: vec!["rs".to_string()],
2210                    language_server: Some(language_server_config),
2211                    ..Default::default()
2212                },
2213                Some(tree_sitter_rust::language()),
2214            )));
2215
2216        // Connect to a server as 2 clients.
2217        let mut server = TestServer::start(cx_a.foreground()).await;
2218        let client_a = server.create_client(&mut cx_a, "user_a").await;
2219        let client_b = server.create_client(&mut cx_b, "user_b").await;
2220
2221        // Share a project as client A
2222        fs.insert_tree(
2223            "/a",
2224            json!({
2225                ".zed.toml": r#"collaborators = ["user_b"]"#,
2226                "main.rs": "fn main() { a }",
2227                "other.rs": "",
2228            }),
2229        )
2230        .await;
2231        let project_a = cx_a.update(|cx| {
2232            Project::local(
2233                client_a.clone(),
2234                client_a.user_store.clone(),
2235                lang_registry.clone(),
2236                fs.clone(),
2237                cx,
2238            )
2239        });
2240        let (worktree_a, _) = project_a
2241            .update(&mut cx_a, |p, cx| {
2242                p.find_or_create_local_worktree("/a", false, cx)
2243            })
2244            .await
2245            .unwrap();
2246        worktree_a
2247            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2248            .await;
2249        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2250        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2251        project_a
2252            .update(&mut cx_a, |p, cx| p.share(cx))
2253            .await
2254            .unwrap();
2255
2256        // Join the worktree as client B.
2257        let project_b = Project::remote(
2258            project_id,
2259            client_b.clone(),
2260            client_b.user_store.clone(),
2261            lang_registry.clone(),
2262            fs.clone(),
2263            &mut cx_b.to_async(),
2264        )
2265        .await
2266        .unwrap();
2267
2268        // Open a file in an editor as the guest.
2269        let buffer_b = project_b
2270            .update(&mut cx_b, |p, cx| {
2271                p.open_buffer((worktree_id, "main.rs"), cx)
2272            })
2273            .await
2274            .unwrap();
2275        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2276        let editor_b = cx_b.add_view(window_b, |cx| {
2277            Editor::for_buffer(
2278                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2279                Arc::new(|cx| EditorSettings::test(cx)),
2280                Some(project_b.clone()),
2281                cx,
2282            )
2283        });
2284
2285        // Type a completion trigger character as the guest.
2286        editor_b.update(&mut cx_b, |editor, cx| {
2287            editor.select_ranges([13..13], None, cx);
2288            editor.handle_input(&Input(".".into()), cx);
2289            cx.focus(&editor_b);
2290        });
2291
2292        // Receive a completion request as the host's language server.
2293        // Return some completions from the host's language server.
2294        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2295            assert_eq!(
2296                params.text_document_position.text_document.uri,
2297                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2298            );
2299            assert_eq!(
2300                params.text_document_position.position,
2301                lsp::Position::new(0, 14),
2302            );
2303
2304            Some(lsp::CompletionResponse::Array(vec![
2305                lsp::CompletionItem {
2306                    label: "first_method(…)".into(),
2307                    detail: Some("fn(&mut self, B) -> C".into()),
2308                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2309                        new_text: "first_method($1)".to_string(),
2310                        range: lsp::Range::new(
2311                            lsp::Position::new(0, 14),
2312                            lsp::Position::new(0, 14),
2313                        ),
2314                    })),
2315                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2316                    ..Default::default()
2317                },
2318                lsp::CompletionItem {
2319                    label: "second_method(…)".into(),
2320                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2321                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2322                        new_text: "second_method()".to_string(),
2323                        range: lsp::Range::new(
2324                            lsp::Position::new(0, 14),
2325                            lsp::Position::new(0, 14),
2326                        ),
2327                    })),
2328                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2329                    ..Default::default()
2330                },
2331            ]))
2332        });
2333
2334        // Open the buffer on the host.
2335        let buffer_a = project_a
2336            .update(&mut cx_a, |p, cx| {
2337                p.open_buffer((worktree_id, "main.rs"), cx)
2338            })
2339            .await
2340            .unwrap();
2341        buffer_a
2342            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2343            .await;
2344
2345        // Confirm a completion on the guest.
2346        editor_b.next_notification(&cx_b).await;
2347        editor_b.update(&mut cx_b, |editor, cx| {
2348            assert!(editor.context_menu_visible());
2349            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2350            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2351        });
2352
2353        // Return a resolved completion from the host's language server.
2354        // The resolved completion has an additional text edit.
2355        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2356            assert_eq!(params.label, "first_method(…)");
2357            lsp::CompletionItem {
2358                label: "first_method(…)".into(),
2359                detail: Some("fn(&mut self, B) -> C".into()),
2360                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2361                    new_text: "first_method($1)".to_string(),
2362                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2363                })),
2364                additional_text_edits: Some(vec![lsp::TextEdit {
2365                    new_text: "use d::SomeTrait;\n".to_string(),
2366                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2367                }]),
2368                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2369                ..Default::default()
2370            }
2371        });
2372
2373        buffer_a
2374            .condition(&cx_a, |buffer, _| {
2375                buffer.text() == "fn main() { a.first_method() }"
2376            })
2377            .await;
2378
2379        // The additional edit is applied.
2380        buffer_b
2381            .condition(&cx_b, |buffer, _| {
2382                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2383            })
2384            .await;
2385        assert_eq!(
2386            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2387            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2388        );
2389    }
2390
2391    #[gpui::test(iterations = 10)]
2392    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2393        cx_a.foreground().forbid_parking();
2394        let mut lang_registry = Arc::new(LanguageRegistry::new());
2395        let fs = Arc::new(FakeFs::new(cx_a.background()));
2396
2397        // Set up a fake language server.
2398        let (language_server_config, mut fake_language_server) =
2399            LanguageServerConfig::fake(&cx_a).await;
2400        Arc::get_mut(&mut lang_registry)
2401            .unwrap()
2402            .add(Arc::new(Language::new(
2403                LanguageConfig {
2404                    name: "Rust".to_string(),
2405                    path_suffixes: vec!["rs".to_string()],
2406                    language_server: Some(language_server_config),
2407                    ..Default::default()
2408                },
2409                Some(tree_sitter_rust::language()),
2410            )));
2411
2412        // Connect to a server as 2 clients.
2413        let mut server = TestServer::start(cx_a.foreground()).await;
2414        let client_a = server.create_client(&mut cx_a, "user_a").await;
2415        let client_b = server.create_client(&mut cx_b, "user_b").await;
2416
2417        // Share a project as client A
2418        fs.insert_tree(
2419            "/a",
2420            json!({
2421                ".zed.toml": r#"collaborators = ["user_b"]"#,
2422                "a.rs": "let one = two",
2423            }),
2424        )
2425        .await;
2426        let project_a = cx_a.update(|cx| {
2427            Project::local(
2428                client_a.clone(),
2429                client_a.user_store.clone(),
2430                lang_registry.clone(),
2431                fs.clone(),
2432                cx,
2433            )
2434        });
2435        let (worktree_a, _) = project_a
2436            .update(&mut cx_a, |p, cx| {
2437                p.find_or_create_local_worktree("/a", false, cx)
2438            })
2439            .await
2440            .unwrap();
2441        worktree_a
2442            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2443            .await;
2444        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2445        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2446        project_a
2447            .update(&mut cx_a, |p, cx| p.share(cx))
2448            .await
2449            .unwrap();
2450
2451        // Join the worktree as client B.
2452        let project_b = Project::remote(
2453            project_id,
2454            client_b.clone(),
2455            client_b.user_store.clone(),
2456            lang_registry.clone(),
2457            fs.clone(),
2458            &mut cx_b.to_async(),
2459        )
2460        .await
2461        .unwrap();
2462
2463        let buffer_b = cx_b
2464            .background()
2465            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2466            .await
2467            .unwrap();
2468
2469        let format = project_b.update(&mut cx_b, |project, cx| {
2470            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2471        });
2472
2473        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2474            Some(vec![
2475                lsp::TextEdit {
2476                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2477                    new_text: "h".to_string(),
2478                },
2479                lsp::TextEdit {
2480                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2481                    new_text: "y".to_string(),
2482                },
2483            ])
2484        });
2485
2486        format.await.unwrap();
2487        assert_eq!(
2488            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2489            "let honey = two"
2490        );
2491    }
2492
2493    #[gpui::test(iterations = 10)]
2494    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2495        cx_a.foreground().forbid_parking();
2496        let mut lang_registry = Arc::new(LanguageRegistry::new());
2497        let fs = Arc::new(FakeFs::new(cx_a.background()));
2498        fs.insert_tree(
2499            "/root-1",
2500            json!({
2501                ".zed.toml": r#"collaborators = ["user_b"]"#,
2502                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2503            }),
2504        )
2505        .await;
2506        fs.insert_tree(
2507            "/root-2",
2508            json!({
2509                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2510            }),
2511        )
2512        .await;
2513
2514        // Set up a fake language server.
2515        let (language_server_config, mut fake_language_server) =
2516            LanguageServerConfig::fake(&cx_a).await;
2517        Arc::get_mut(&mut lang_registry)
2518            .unwrap()
2519            .add(Arc::new(Language::new(
2520                LanguageConfig {
2521                    name: "Rust".to_string(),
2522                    path_suffixes: vec!["rs".to_string()],
2523                    language_server: Some(language_server_config),
2524                    ..Default::default()
2525                },
2526                Some(tree_sitter_rust::language()),
2527            )));
2528
2529        // Connect to a server as 2 clients.
2530        let mut server = TestServer::start(cx_a.foreground()).await;
2531        let client_a = server.create_client(&mut cx_a, "user_a").await;
2532        let client_b = server.create_client(&mut cx_b, "user_b").await;
2533
2534        // Share a project as client A
2535        let project_a = cx_a.update(|cx| {
2536            Project::local(
2537                client_a.clone(),
2538                client_a.user_store.clone(),
2539                lang_registry.clone(),
2540                fs.clone(),
2541                cx,
2542            )
2543        });
2544        let (worktree_a, _) = project_a
2545            .update(&mut cx_a, |p, cx| {
2546                p.find_or_create_local_worktree("/root-1", false, cx)
2547            })
2548            .await
2549            .unwrap();
2550        worktree_a
2551            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2552            .await;
2553        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2554        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2555        project_a
2556            .update(&mut cx_a, |p, cx| p.share(cx))
2557            .await
2558            .unwrap();
2559
2560        // Join the worktree as client B.
2561        let project_b = Project::remote(
2562            project_id,
2563            client_b.clone(),
2564            client_b.user_store.clone(),
2565            lang_registry.clone(),
2566            fs.clone(),
2567            &mut cx_b.to_async(),
2568        )
2569        .await
2570        .unwrap();
2571
2572        // Open the file on client B.
2573        let buffer_b = cx_b
2574            .background()
2575            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2576            .await
2577            .unwrap();
2578
2579        // Request the definition of a symbol as the guest.
2580        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2581        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2582            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2583                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2584                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2585            )))
2586        });
2587
2588        let definitions_1 = definitions_1.await.unwrap();
2589        cx_b.read(|cx| {
2590            assert_eq!(definitions_1.len(), 1);
2591            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2592            let target_buffer = definitions_1[0].target_buffer.read(cx);
2593            assert_eq!(
2594                target_buffer.text(),
2595                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2596            );
2597            assert_eq!(
2598                definitions_1[0].target_range.to_point(target_buffer),
2599                Point::new(0, 6)..Point::new(0, 9)
2600            );
2601        });
2602
2603        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2604        // the previous call to `definition`.
2605        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2606        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2607            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2608                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2609                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2610            )))
2611        });
2612
2613        let definitions_2 = definitions_2.await.unwrap();
2614        cx_b.read(|cx| {
2615            assert_eq!(definitions_2.len(), 1);
2616            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2617            let target_buffer = definitions_2[0].target_buffer.read(cx);
2618            assert_eq!(
2619                target_buffer.text(),
2620                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2621            );
2622            assert_eq!(
2623                definitions_2[0].target_range.to_point(target_buffer),
2624                Point::new(1, 6)..Point::new(1, 11)
2625            );
2626        });
2627        assert_eq!(
2628            definitions_1[0].target_buffer,
2629            definitions_2[0].target_buffer
2630        );
2631
2632        cx_b.update(|_| {
2633            drop(definitions_1);
2634            drop(definitions_2);
2635        });
2636        project_b
2637            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2638            .await;
2639    }
2640
2641    #[gpui::test(iterations = 10)]
2642    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2643        mut cx_a: TestAppContext,
2644        mut cx_b: TestAppContext,
2645        mut rng: StdRng,
2646    ) {
2647        cx_a.foreground().forbid_parking();
2648        let mut lang_registry = Arc::new(LanguageRegistry::new());
2649        let fs = Arc::new(FakeFs::new(cx_a.background()));
2650        fs.insert_tree(
2651            "/root",
2652            json!({
2653                ".zed.toml": r#"collaborators = ["user_b"]"#,
2654                "a.rs": "const ONE: usize = b::TWO;",
2655                "b.rs": "const TWO: usize = 2",
2656            }),
2657        )
2658        .await;
2659
2660        // Set up a fake language server.
2661        let (language_server_config, mut fake_language_server) =
2662            LanguageServerConfig::fake(&cx_a).await;
2663        Arc::get_mut(&mut lang_registry)
2664            .unwrap()
2665            .add(Arc::new(Language::new(
2666                LanguageConfig {
2667                    name: "Rust".to_string(),
2668                    path_suffixes: vec!["rs".to_string()],
2669                    language_server: Some(language_server_config),
2670                    ..Default::default()
2671                },
2672                Some(tree_sitter_rust::language()),
2673            )));
2674
2675        // Connect to a server as 2 clients.
2676        let mut server = TestServer::start(cx_a.foreground()).await;
2677        let client_a = server.create_client(&mut cx_a, "user_a").await;
2678        let client_b = server.create_client(&mut cx_b, "user_b").await;
2679
2680        // Share a project as client A
2681        let project_a = cx_a.update(|cx| {
2682            Project::local(
2683                client_a.clone(),
2684                client_a.user_store.clone(),
2685                lang_registry.clone(),
2686                fs.clone(),
2687                cx,
2688            )
2689        });
2690        let (worktree_a, _) = project_a
2691            .update(&mut cx_a, |p, cx| {
2692                p.find_or_create_local_worktree("/root", false, cx)
2693            })
2694            .await
2695            .unwrap();
2696        worktree_a
2697            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2698            .await;
2699        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2700        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2701        project_a
2702            .update(&mut cx_a, |p, cx| p.share(cx))
2703            .await
2704            .unwrap();
2705
2706        // Join the worktree as client B.
2707        let project_b = Project::remote(
2708            project_id,
2709            client_b.clone(),
2710            client_b.user_store.clone(),
2711            lang_registry.clone(),
2712            fs.clone(),
2713            &mut cx_b.to_async(),
2714        )
2715        .await
2716        .unwrap();
2717
2718        let buffer_b1 = cx_b
2719            .background()
2720            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2721            .await
2722            .unwrap();
2723
2724        let definitions;
2725        let buffer_b2;
2726        if rng.gen() {
2727            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2728            buffer_b2 =
2729                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2730        } else {
2731            buffer_b2 =
2732                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2733            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2734        }
2735
2736        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2737            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2738                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2739                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2740            )))
2741        });
2742
2743        let buffer_b2 = buffer_b2.await.unwrap();
2744        let definitions = definitions.await.unwrap();
2745        assert_eq!(definitions.len(), 1);
2746        assert_eq!(definitions[0].target_buffer, buffer_b2);
2747    }
2748
2749    #[gpui::test(iterations = 10)]
2750    async fn test_collaborating_with_code_actions(
2751        mut cx_a: TestAppContext,
2752        mut cx_b: TestAppContext,
2753    ) {
2754        cx_a.foreground().forbid_parking();
2755        let mut lang_registry = Arc::new(LanguageRegistry::new());
2756        let fs = Arc::new(FakeFs::new(cx_a.background()));
2757        let mut path_openers_b = Vec::new();
2758        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2759
2760        // Set up a fake language server.
2761        let (language_server_config, mut fake_language_server) =
2762            LanguageServerConfig::fake_with_capabilities(
2763                lsp::ServerCapabilities {
2764                    ..Default::default()
2765                },
2766                &cx_a,
2767            )
2768            .await;
2769        Arc::get_mut(&mut lang_registry)
2770            .unwrap()
2771            .add(Arc::new(Language::new(
2772                LanguageConfig {
2773                    name: "Rust".to_string(),
2774                    path_suffixes: vec!["rs".to_string()],
2775                    language_server: Some(language_server_config),
2776                    ..Default::default()
2777                },
2778                Some(tree_sitter_rust::language()),
2779            )));
2780
2781        // Connect to a server as 2 clients.
2782        let mut server = TestServer::start(cx_a.foreground()).await;
2783        let client_a = server.create_client(&mut cx_a, "user_a").await;
2784        let client_b = server.create_client(&mut cx_b, "user_b").await;
2785
2786        // Share a project as client A
2787        fs.insert_tree(
2788            "/a",
2789            json!({
2790                ".zed.toml": r#"collaborators = ["user_b"]"#,
2791                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2792                "other.rs": "pub fn foo() -> usize { 4 }",
2793            }),
2794        )
2795        .await;
2796        let project_a = cx_a.update(|cx| {
2797            Project::local(
2798                client_a.clone(),
2799                client_a.user_store.clone(),
2800                lang_registry.clone(),
2801                fs.clone(),
2802                cx,
2803            )
2804        });
2805        let (worktree_a, _) = project_a
2806            .update(&mut cx_a, |p, cx| {
2807                p.find_or_create_local_worktree("/a", false, cx)
2808            })
2809            .await
2810            .unwrap();
2811        worktree_a
2812            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2813            .await;
2814        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2815        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2816        project_a
2817            .update(&mut cx_a, |p, cx| p.share(cx))
2818            .await
2819            .unwrap();
2820
2821        // Join the worktree as client B.
2822        let project_b = Project::remote(
2823            project_id,
2824            client_b.clone(),
2825            client_b.user_store.clone(),
2826            lang_registry.clone(),
2827            fs.clone(),
2828            &mut cx_b.to_async(),
2829        )
2830        .await
2831        .unwrap();
2832        let mut params = cx_b.update(WorkspaceParams::test);
2833        params.languages = lang_registry.clone();
2834        params.client = client_b.client.clone();
2835        params.user_store = client_b.user_store.clone();
2836        params.project = project_b;
2837        params.path_openers = path_openers_b.into();
2838
2839        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2840        let editor_b = workspace_b
2841            .update(&mut cx_b, |workspace, cx| {
2842                workspace.open_path((worktree_id, "main.rs").into(), cx)
2843            })
2844            .await
2845            .unwrap()
2846            .downcast::<Editor>()
2847            .unwrap();
2848        fake_language_server
2849            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2850                assert_eq!(
2851                    params.text_document.uri,
2852                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2853                );
2854                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2855                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2856                None
2857            })
2858            .next()
2859            .await;
2860
2861        // Move cursor to a location that contains code actions.
2862        editor_b.update(&mut cx_b, |editor, cx| {
2863            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2864            cx.focus(&editor_b);
2865        });
2866        fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2867            assert_eq!(
2868                params.text_document.uri,
2869                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2870            );
2871            assert_eq!(params.range.start, lsp::Position::new(1, 31));
2872            assert_eq!(params.range.end, lsp::Position::new(1, 31));
2873
2874            Some(vec![lsp::CodeActionOrCommand::CodeAction(
2875                lsp::CodeAction {
2876                    title: "Inline into all callers".to_string(),
2877                    edit: Some(lsp::WorkspaceEdit {
2878                        changes: Some(
2879                            [
2880                                (
2881                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2882                                    vec![lsp::TextEdit::new(
2883                                        lsp::Range::new(
2884                                            lsp::Position::new(1, 22),
2885                                            lsp::Position::new(1, 34),
2886                                        ),
2887                                        "4".to_string(),
2888                                    )],
2889                                ),
2890                                (
2891                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
2892                                    vec![lsp::TextEdit::new(
2893                                        lsp::Range::new(
2894                                            lsp::Position::new(0, 0),
2895                                            lsp::Position::new(0, 27),
2896                                        ),
2897                                        "".to_string(),
2898                                    )],
2899                                ),
2900                            ]
2901                            .into_iter()
2902                            .collect(),
2903                        ),
2904                        ..Default::default()
2905                    }),
2906                    data: Some(json!({
2907                        "codeActionParams": {
2908                            "range": {
2909                                "start": {"line": 1, "column": 31},
2910                                "end": {"line": 1, "column": 31},
2911                            }
2912                        }
2913                    })),
2914                    ..Default::default()
2915                },
2916            )])
2917        });
2918
2919        // Toggle code actions and wait for them to display.
2920        editor_b.update(&mut cx_b, |editor, cx| {
2921            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2922        });
2923        editor_b
2924            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2925            .await;
2926
2927        // Confirming the code action will trigger a resolve request.
2928        let confirm_action = workspace_b
2929            .update(&mut cx_b, |workspace, cx| {
2930                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2931            })
2932            .unwrap();
2933        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2934            lsp::CodeAction {
2935                title: "Inline into all callers".to_string(),
2936                edit: Some(lsp::WorkspaceEdit {
2937                    changes: Some(
2938                        [
2939                            (
2940                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2941                                vec![lsp::TextEdit::new(
2942                                    lsp::Range::new(
2943                                        lsp::Position::new(1, 22),
2944                                        lsp::Position::new(1, 34),
2945                                    ),
2946                                    "4".to_string(),
2947                                )],
2948                            ),
2949                            (
2950                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2951                                vec![lsp::TextEdit::new(
2952                                    lsp::Range::new(
2953                                        lsp::Position::new(0, 0),
2954                                        lsp::Position::new(0, 27),
2955                                    ),
2956                                    "".to_string(),
2957                                )],
2958                            ),
2959                        ]
2960                        .into_iter()
2961                        .collect(),
2962                    ),
2963                    ..Default::default()
2964                }),
2965                ..Default::default()
2966            }
2967        });
2968
2969        // After the action is confirmed, an editor containing both modified files is opened.
2970        confirm_action.await.unwrap();
2971        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
2972            workspace
2973                .active_item(cx)
2974                .unwrap()
2975                .downcast::<Editor>()
2976                .unwrap()
2977        });
2978        code_action_editor.update(&mut cx_b, |editor, cx| {
2979            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2980            editor.undo(&Undo, cx);
2981            assert_eq!(
2982                editor.text(cx),
2983                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
2984            );
2985            editor.redo(&Redo, cx);
2986            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2987        });
2988    }
2989
2990    #[gpui::test(iterations = 10)]
2991    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2992        cx_a.foreground().forbid_parking();
2993
2994        // Connect to a server as 2 clients.
2995        let mut server = TestServer::start(cx_a.foreground()).await;
2996        let client_a = server.create_client(&mut cx_a, "user_a").await;
2997        let client_b = server.create_client(&mut cx_b, "user_b").await;
2998
2999        // Create an org that includes these 2 users.
3000        let db = &server.app_state.db;
3001        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3002        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3003            .await
3004            .unwrap();
3005        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3006            .await
3007            .unwrap();
3008
3009        // Create a channel that includes all the users.
3010        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3011        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3012            .await
3013            .unwrap();
3014        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3015            .await
3016            .unwrap();
3017        db.create_channel_message(
3018            channel_id,
3019            client_b.current_user_id(&cx_b),
3020            "hello A, it's B.",
3021            OffsetDateTime::now_utc(),
3022            1,
3023        )
3024        .await
3025        .unwrap();
3026
3027        let channels_a = cx_a
3028            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3029        channels_a
3030            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3031            .await;
3032        channels_a.read_with(&cx_a, |list, _| {
3033            assert_eq!(
3034                list.available_channels().unwrap(),
3035                &[ChannelDetails {
3036                    id: channel_id.to_proto(),
3037                    name: "test-channel".to_string()
3038                }]
3039            )
3040        });
3041        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3042            this.get_channel(channel_id.to_proto(), cx).unwrap()
3043        });
3044        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3045        channel_a
3046            .condition(&cx_a, |channel, _| {
3047                channel_messages(channel)
3048                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3049            })
3050            .await;
3051
3052        let channels_b = cx_b
3053            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3054        channels_b
3055            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3056            .await;
3057        channels_b.read_with(&cx_b, |list, _| {
3058            assert_eq!(
3059                list.available_channels().unwrap(),
3060                &[ChannelDetails {
3061                    id: channel_id.to_proto(),
3062                    name: "test-channel".to_string()
3063                }]
3064            )
3065        });
3066
3067        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3068            this.get_channel(channel_id.to_proto(), cx).unwrap()
3069        });
3070        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3071        channel_b
3072            .condition(&cx_b, |channel, _| {
3073                channel_messages(channel)
3074                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3075            })
3076            .await;
3077
3078        channel_a
3079            .update(&mut cx_a, |channel, cx| {
3080                channel
3081                    .send_message("oh, hi B.".to_string(), cx)
3082                    .unwrap()
3083                    .detach();
3084                let task = channel.send_message("sup".to_string(), cx).unwrap();
3085                assert_eq!(
3086                    channel_messages(channel),
3087                    &[
3088                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3089                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3090                        ("user_a".to_string(), "sup".to_string(), true)
3091                    ]
3092                );
3093                task
3094            })
3095            .await
3096            .unwrap();
3097
3098        channel_b
3099            .condition(&cx_b, |channel, _| {
3100                channel_messages(channel)
3101                    == [
3102                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3103                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3104                        ("user_a".to_string(), "sup".to_string(), false),
3105                    ]
3106            })
3107            .await;
3108
3109        assert_eq!(
3110            server
3111                .state()
3112                .await
3113                .channel(channel_id)
3114                .unwrap()
3115                .connection_ids
3116                .len(),
3117            2
3118        );
3119        cx_b.update(|_| drop(channel_b));
3120        server
3121            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3122            .await;
3123
3124        cx_a.update(|_| drop(channel_a));
3125        server
3126            .condition(|state| state.channel(channel_id).is_none())
3127            .await;
3128    }
3129
3130    #[gpui::test(iterations = 10)]
3131    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3132        cx_a.foreground().forbid_parking();
3133
3134        let mut server = TestServer::start(cx_a.foreground()).await;
3135        let client_a = server.create_client(&mut cx_a, "user_a").await;
3136
3137        let db = &server.app_state.db;
3138        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3139        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3140        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3141            .await
3142            .unwrap();
3143        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3144            .await
3145            .unwrap();
3146
3147        let channels_a = cx_a
3148            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3149        channels_a
3150            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3151            .await;
3152        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3153            this.get_channel(channel_id.to_proto(), cx).unwrap()
3154        });
3155
3156        // Messages aren't allowed to be too long.
3157        channel_a
3158            .update(&mut cx_a, |channel, cx| {
3159                let long_body = "this is long.\n".repeat(1024);
3160                channel.send_message(long_body, cx).unwrap()
3161            })
3162            .await
3163            .unwrap_err();
3164
3165        // Messages aren't allowed to be blank.
3166        channel_a.update(&mut cx_a, |channel, cx| {
3167            channel.send_message(String::new(), cx).unwrap_err()
3168        });
3169
3170        // Leading and trailing whitespace are trimmed.
3171        channel_a
3172            .update(&mut cx_a, |channel, cx| {
3173                channel
3174                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3175                    .unwrap()
3176            })
3177            .await
3178            .unwrap();
3179        assert_eq!(
3180            db.get_channel_messages(channel_id, 10, None)
3181                .await
3182                .unwrap()
3183                .iter()
3184                .map(|m| &m.body)
3185                .collect::<Vec<_>>(),
3186            &["surrounded by whitespace"]
3187        );
3188    }
3189
3190    #[gpui::test(iterations = 10)]
3191    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3192        cx_a.foreground().forbid_parking();
3193
3194        // Connect to a server as 2 clients.
3195        let mut server = TestServer::start(cx_a.foreground()).await;
3196        let client_a = server.create_client(&mut cx_a, "user_a").await;
3197        let client_b = server.create_client(&mut cx_b, "user_b").await;
3198        let mut status_b = client_b.status();
3199
3200        // Create an org that includes these 2 users.
3201        let db = &server.app_state.db;
3202        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3203        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3204            .await
3205            .unwrap();
3206        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3207            .await
3208            .unwrap();
3209
3210        // Create a channel that includes all the users.
3211        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3212        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3213            .await
3214            .unwrap();
3215        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3216            .await
3217            .unwrap();
3218        db.create_channel_message(
3219            channel_id,
3220            client_b.current_user_id(&cx_b),
3221            "hello A, it's B.",
3222            OffsetDateTime::now_utc(),
3223            2,
3224        )
3225        .await
3226        .unwrap();
3227
3228        let channels_a = cx_a
3229            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3230        channels_a
3231            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3232            .await;
3233
3234        channels_a.read_with(&cx_a, |list, _| {
3235            assert_eq!(
3236                list.available_channels().unwrap(),
3237                &[ChannelDetails {
3238                    id: channel_id.to_proto(),
3239                    name: "test-channel".to_string()
3240                }]
3241            )
3242        });
3243        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3244            this.get_channel(channel_id.to_proto(), cx).unwrap()
3245        });
3246        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3247        channel_a
3248            .condition(&cx_a, |channel, _| {
3249                channel_messages(channel)
3250                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3251            })
3252            .await;
3253
3254        let channels_b = cx_b
3255            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3256        channels_b
3257            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3258            .await;
3259        channels_b.read_with(&cx_b, |list, _| {
3260            assert_eq!(
3261                list.available_channels().unwrap(),
3262                &[ChannelDetails {
3263                    id: channel_id.to_proto(),
3264                    name: "test-channel".to_string()
3265                }]
3266            )
3267        });
3268
3269        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3270            this.get_channel(channel_id.to_proto(), cx).unwrap()
3271        });
3272        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3273        channel_b
3274            .condition(&cx_b, |channel, _| {
3275                channel_messages(channel)
3276                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3277            })
3278            .await;
3279
3280        // Disconnect client B, ensuring we can still access its cached channel data.
3281        server.forbid_connections();
3282        server.disconnect_client(client_b.current_user_id(&cx_b));
3283        while !matches!(
3284            status_b.next().await,
3285            Some(client::Status::ReconnectionError { .. })
3286        ) {}
3287
3288        channels_b.read_with(&cx_b, |channels, _| {
3289            assert_eq!(
3290                channels.available_channels().unwrap(),
3291                [ChannelDetails {
3292                    id: channel_id.to_proto(),
3293                    name: "test-channel".to_string()
3294                }]
3295            )
3296        });
3297        channel_b.read_with(&cx_b, |channel, _| {
3298            assert_eq!(
3299                channel_messages(channel),
3300                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3301            )
3302        });
3303
3304        // Send a message from client B while it is disconnected.
3305        channel_b
3306            .update(&mut cx_b, |channel, cx| {
3307                let task = channel
3308                    .send_message("can you see this?".to_string(), cx)
3309                    .unwrap();
3310                assert_eq!(
3311                    channel_messages(channel),
3312                    &[
3313                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3314                        ("user_b".to_string(), "can you see this?".to_string(), true)
3315                    ]
3316                );
3317                task
3318            })
3319            .await
3320            .unwrap_err();
3321
3322        // Send a message from client A while B is disconnected.
3323        channel_a
3324            .update(&mut cx_a, |channel, cx| {
3325                channel
3326                    .send_message("oh, hi B.".to_string(), cx)
3327                    .unwrap()
3328                    .detach();
3329                let task = channel.send_message("sup".to_string(), cx).unwrap();
3330                assert_eq!(
3331                    channel_messages(channel),
3332                    &[
3333                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3334                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3335                        ("user_a".to_string(), "sup".to_string(), true)
3336                    ]
3337                );
3338                task
3339            })
3340            .await
3341            .unwrap();
3342
3343        // Give client B a chance to reconnect.
3344        server.allow_connections();
3345        cx_b.foreground().advance_clock(Duration::from_secs(10));
3346
3347        // Verify that B sees the new messages upon reconnection, as well as the message client B
3348        // sent while offline.
3349        channel_b
3350            .condition(&cx_b, |channel, _| {
3351                channel_messages(channel)
3352                    == [
3353                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3354                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3355                        ("user_a".to_string(), "sup".to_string(), false),
3356                        ("user_b".to_string(), "can you see this?".to_string(), false),
3357                    ]
3358            })
3359            .await;
3360
3361        // Ensure client A and B can communicate normally after reconnection.
3362        channel_a
3363            .update(&mut cx_a, |channel, cx| {
3364                channel.send_message("you online?".to_string(), cx).unwrap()
3365            })
3366            .await
3367            .unwrap();
3368        channel_b
3369            .condition(&cx_b, |channel, _| {
3370                channel_messages(channel)
3371                    == [
3372                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3373                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3374                        ("user_a".to_string(), "sup".to_string(), false),
3375                        ("user_b".to_string(), "can you see this?".to_string(), false),
3376                        ("user_a".to_string(), "you online?".to_string(), false),
3377                    ]
3378            })
3379            .await;
3380
3381        channel_b
3382            .update(&mut cx_b, |channel, cx| {
3383                channel.send_message("yep".to_string(), cx).unwrap()
3384            })
3385            .await
3386            .unwrap();
3387        channel_a
3388            .condition(&cx_a, |channel, _| {
3389                channel_messages(channel)
3390                    == [
3391                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3392                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3393                        ("user_a".to_string(), "sup".to_string(), false),
3394                        ("user_b".to_string(), "can you see this?".to_string(), false),
3395                        ("user_a".to_string(), "you online?".to_string(), false),
3396                        ("user_b".to_string(), "yep".to_string(), false),
3397                    ]
3398            })
3399            .await;
3400    }
3401
3402    #[gpui::test(iterations = 10)]
3403    async fn test_contacts(
3404        mut cx_a: TestAppContext,
3405        mut cx_b: TestAppContext,
3406        mut cx_c: TestAppContext,
3407    ) {
3408        cx_a.foreground().forbid_parking();
3409        let lang_registry = Arc::new(LanguageRegistry::new());
3410        let fs = Arc::new(FakeFs::new(cx_a.background()));
3411
3412        // Connect to a server as 3 clients.
3413        let mut server = TestServer::start(cx_a.foreground()).await;
3414        let client_a = server.create_client(&mut cx_a, "user_a").await;
3415        let client_b = server.create_client(&mut cx_b, "user_b").await;
3416        let client_c = server.create_client(&mut cx_c, "user_c").await;
3417
3418        // Share a worktree as client A.
3419        fs.insert_tree(
3420            "/a",
3421            json!({
3422                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3423            }),
3424        )
3425        .await;
3426
3427        let project_a = cx_a.update(|cx| {
3428            Project::local(
3429                client_a.clone(),
3430                client_a.user_store.clone(),
3431                lang_registry.clone(),
3432                fs.clone(),
3433                cx,
3434            )
3435        });
3436        let (worktree_a, _) = project_a
3437            .update(&mut cx_a, |p, cx| {
3438                p.find_or_create_local_worktree("/a", false, cx)
3439            })
3440            .await
3441            .unwrap();
3442        worktree_a
3443            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3444            .await;
3445
3446        client_a
3447            .user_store
3448            .condition(&cx_a, |user_store, _| {
3449                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3450            })
3451            .await;
3452        client_b
3453            .user_store
3454            .condition(&cx_b, |user_store, _| {
3455                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3456            })
3457            .await;
3458        client_c
3459            .user_store
3460            .condition(&cx_c, |user_store, _| {
3461                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3462            })
3463            .await;
3464
3465        let project_id = project_a
3466            .update(&mut cx_a, |project, _| project.next_remote_id())
3467            .await;
3468        project_a
3469            .update(&mut cx_a, |project, cx| project.share(cx))
3470            .await
3471            .unwrap();
3472
3473        let _project_b = Project::remote(
3474            project_id,
3475            client_b.clone(),
3476            client_b.user_store.clone(),
3477            lang_registry.clone(),
3478            fs.clone(),
3479            &mut cx_b.to_async(),
3480        )
3481        .await
3482        .unwrap();
3483
3484        client_a
3485            .user_store
3486            .condition(&cx_a, |user_store, _| {
3487                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3488            })
3489            .await;
3490        client_b
3491            .user_store
3492            .condition(&cx_b, |user_store, _| {
3493                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3494            })
3495            .await;
3496        client_c
3497            .user_store
3498            .condition(&cx_c, |user_store, _| {
3499                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3500            })
3501            .await;
3502
3503        project_a
3504            .condition(&cx_a, |project, _| {
3505                project.collaborators().contains_key(&client_b.peer_id)
3506            })
3507            .await;
3508
3509        cx_a.update(move |_| drop(project_a));
3510        client_a
3511            .user_store
3512            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3513            .await;
3514        client_b
3515            .user_store
3516            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3517            .await;
3518        client_c
3519            .user_store
3520            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3521            .await;
3522
3523        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3524            user_store
3525                .contacts()
3526                .iter()
3527                .map(|contact| {
3528                    let worktrees = contact
3529                        .projects
3530                        .iter()
3531                        .map(|p| {
3532                            (
3533                                p.worktree_root_names[0].as_str(),
3534                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3535                            )
3536                        })
3537                        .collect();
3538                    (contact.user.github_login.as_str(), worktrees)
3539                })
3540                .collect()
3541        }
3542    }
3543
3544    struct TestServer {
3545        peer: Arc<Peer>,
3546        app_state: Arc<AppState>,
3547        server: Arc<Server>,
3548        foreground: Rc<executor::Foreground>,
3549        notifications: mpsc::Receiver<()>,
3550        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3551        forbid_connections: Arc<AtomicBool>,
3552        _test_db: TestDb,
3553    }
3554
3555    impl TestServer {
3556        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3557            let test_db = TestDb::new();
3558            let app_state = Self::build_app_state(&test_db).await;
3559            let peer = Peer::new();
3560            let notifications = mpsc::channel(128);
3561            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3562            Self {
3563                peer,
3564                app_state,
3565                server,
3566                foreground,
3567                notifications: notifications.1,
3568                connection_killers: Default::default(),
3569                forbid_connections: Default::default(),
3570                _test_db: test_db,
3571            }
3572        }
3573
3574        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3575            let http = FakeHttpClient::with_404_response();
3576            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3577            let client_name = name.to_string();
3578            let mut client = Client::new(http.clone());
3579            let server = self.server.clone();
3580            let connection_killers = self.connection_killers.clone();
3581            let forbid_connections = self.forbid_connections.clone();
3582            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3583
3584            Arc::get_mut(&mut client)
3585                .unwrap()
3586                .override_authenticate(move |cx| {
3587                    cx.spawn(|_| async move {
3588                        let access_token = "the-token".to_string();
3589                        Ok(Credentials {
3590                            user_id: user_id.0 as u64,
3591                            access_token,
3592                        })
3593                    })
3594                })
3595                .override_establish_connection(move |credentials, cx| {
3596                    assert_eq!(credentials.user_id, user_id.0 as u64);
3597                    assert_eq!(credentials.access_token, "the-token");
3598
3599                    let server = server.clone();
3600                    let connection_killers = connection_killers.clone();
3601                    let forbid_connections = forbid_connections.clone();
3602                    let client_name = client_name.clone();
3603                    let connection_id_tx = connection_id_tx.clone();
3604                    cx.spawn(move |cx| async move {
3605                        if forbid_connections.load(SeqCst) {
3606                            Err(EstablishConnectionError::other(anyhow!(
3607                                "server is forbidding connections"
3608                            )))
3609                        } else {
3610                            let (client_conn, server_conn, kill_conn) =
3611                                Connection::in_memory(cx.background());
3612                            connection_killers.lock().insert(user_id, kill_conn);
3613                            cx.background()
3614                                .spawn(server.handle_connection(
3615                                    server_conn,
3616                                    client_name,
3617                                    user_id,
3618                                    Some(connection_id_tx),
3619                                    cx.background(),
3620                                ))
3621                                .detach();
3622                            Ok(client_conn)
3623                        }
3624                    })
3625                });
3626
3627            client
3628                .authenticate_and_connect(&cx.to_async())
3629                .await
3630                .unwrap();
3631
3632            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3633            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3634            let mut authed_user =
3635                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3636            while authed_user.next().await.unwrap().is_none() {}
3637
3638            TestClient {
3639                client,
3640                peer_id,
3641                user_store,
3642            }
3643        }
3644
3645        fn disconnect_client(&self, user_id: UserId) {
3646            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3647                let _ = kill_conn.try_send(Some(()));
3648            }
3649        }
3650
3651        fn forbid_connections(&self) {
3652            self.forbid_connections.store(true, SeqCst);
3653        }
3654
3655        fn allow_connections(&self) {
3656            self.forbid_connections.store(false, SeqCst);
3657        }
3658
3659        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3660            let mut config = Config::default();
3661            config.session_secret = "a".repeat(32);
3662            config.database_url = test_db.url.clone();
3663            let github_client = github::AppClient::test();
3664            Arc::new(AppState {
3665                db: test_db.db().clone(),
3666                handlebars: Default::default(),
3667                auth_client: auth::build_client("", ""),
3668                repo_client: github::RepoClient::test(&github_client),
3669                github_client,
3670                config,
3671            })
3672        }
3673
3674        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3675            self.server.store.read()
3676        }
3677
3678        async fn condition<F>(&mut self, mut predicate: F)
3679        where
3680            F: FnMut(&Store) -> bool,
3681        {
3682            async_std::future::timeout(Duration::from_millis(500), async {
3683                while !(predicate)(&*self.server.store.read()) {
3684                    self.foreground.start_waiting();
3685                    self.notifications.next().await;
3686                    self.foreground.finish_waiting();
3687                }
3688            })
3689            .await
3690            .expect("condition timed out");
3691        }
3692    }
3693
3694    impl Drop for TestServer {
3695        fn drop(&mut self) {
3696            self.peer.reset();
3697        }
3698    }
3699
3700    struct TestClient {
3701        client: Arc<Client>,
3702        pub peer_id: PeerId,
3703        pub user_store: ModelHandle<UserStore>,
3704    }
3705
3706    impl Deref for TestClient {
3707        type Target = Arc<Client>;
3708
3709        fn deref(&self) -> &Self::Target {
3710            &self.client
3711        }
3712    }
3713
3714    impl TestClient {
3715        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3716            UserId::from_proto(
3717                self.user_store
3718                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3719            )
3720        }
3721    }
3722
3723    impl Executor for Arc<gpui::executor::Background> {
3724        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
3725            self.spawn(future).detach();
3726        }
3727    }
3728
3729    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3730        channel
3731            .messages()
3732            .cursor::<()>()
3733            .map(|m| {
3734                (
3735                    m.sender.github_login.clone(),
3736                    m.body.clone(),
3737                    m.is_pending(),
3738                )
3739            })
3740            .collect()
3741    }
3742
3743    struct EmptyView;
3744
3745    impl gpui::Entity for EmptyView {
3746        type Event = ();
3747    }
3748
3749    impl gpui::View for EmptyView {
3750        fn ui_name() -> &'static str {
3751            "empty view"
3752        }
3753
3754        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3755            gpui::Element::boxed(gpui::elements::Empty)
3756        }
3757    }
3758}