rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44pub trait Executor {
  45    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  46}
  47
  48pub struct RealExecutor;
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_request_handler(Server::ping)
  69            .add_request_handler(Server::register_project)
  70            .add_message_handler(Server::unregister_project)
  71            .add_request_handler(Server::share_project)
  72            .add_message_handler(Server::unshare_project)
  73            .add_request_handler(Server::join_project)
  74            .add_message_handler(Server::leave_project)
  75            .add_request_handler(Server::register_worktree)
  76            .add_message_handler(Server::unregister_worktree)
  77            .add_request_handler(Server::share_worktree)
  78            .add_message_handler(Server::update_worktree)
  79            .add_message_handler(Server::update_diagnostic_summary)
  80            .add_message_handler(Server::disk_based_diagnostics_updating)
  81            .add_message_handler(Server::disk_based_diagnostics_updated)
  82            .add_request_handler(Server::get_definition)
  83            .add_request_handler(Server::open_buffer)
  84            .add_message_handler(Server::close_buffer)
  85            .add_request_handler(Server::update_buffer)
  86            .add_message_handler(Server::update_buffer_file)
  87            .add_message_handler(Server::buffer_reloaded)
  88            .add_message_handler(Server::buffer_saved)
  89            .add_request_handler(Server::save_buffer)
  90            .add_request_handler(Server::format_buffers)
  91            .add_request_handler(Server::get_completions)
  92            .add_request_handler(Server::apply_additional_edits_for_completion)
  93            .add_request_handler(Server::get_code_actions)
  94            .add_request_handler(Server::apply_code_action)
  95            .add_request_handler(Server::get_channels)
  96            .add_request_handler(Server::get_users)
  97            .add_request_handler(Server::join_channel)
  98            .add_message_handler(Server::leave_channel)
  99            .add_request_handler(Server::send_channel_message)
 100            .add_request_handler(Server::get_channel_messages);
 101
 102        Arc::new(server)
 103    }
 104
 105    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 106    where
 107        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 108        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 109        M: EnvelopedMessage,
 110    {
 111        let prev_handler = self.handlers.insert(
 112            TypeId::of::<M>(),
 113            Box::new(move |server, envelope| {
 114                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 115                (handler)(server, *envelope).boxed()
 116            }),
 117        );
 118        if prev_handler.is_some() {
 119            panic!("registered a handler for the same message twice");
 120        }
 121        self
 122    }
 123
 124    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 128        M: RequestMessage,
 129    {
 130        self.add_message_handler(move |server, envelope| {
 131            let receipt = envelope.receipt();
 132            let response = (handler)(server.clone(), envelope);
 133            async move {
 134                match response.await {
 135                    Ok(response) => {
 136                        server.peer.respond(receipt, response)?;
 137                        Ok(())
 138                    }
 139                    Err(error) => {
 140                        server.peer.respond_with_error(
 141                            receipt,
 142                            proto::Error {
 143                                message: error.to_string(),
 144                            },
 145                        )?;
 146                        Err(error)
 147                    }
 148                }
 149            }
 150        })
 151    }
 152
 153    pub fn handle_connection<E: Executor>(
 154        self: &Arc<Self>,
 155        connection: Connection,
 156        addr: String,
 157        user_id: UserId,
 158        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 159        executor: E,
 160    ) -> impl Future<Output = ()> {
 161        let mut this = self.clone();
 162        async move {
 163            let (connection_id, handle_io, mut incoming_rx) =
 164                this.peer.add_connection(connection).await;
 165
 166            if let Some(send_connection_id) = send_connection_id.as_mut() {
 167                let _ = send_connection_id.send(connection_id).await;
 168            }
 169
 170            this.state_mut().add_connection(connection_id, user_id);
 171            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 172                log::error!("error updating contacts for {:?}: {}", user_id, err);
 173            }
 174
 175            let handle_io = handle_io.fuse();
 176            futures::pin_mut!(handle_io);
 177            loop {
 178                let next_message = incoming_rx.next().fuse();
 179                futures::pin_mut!(next_message);
 180                futures::select_biased! {
 181                    result = handle_io => {
 182                        if let Err(err) = result {
 183                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 184                        }
 185                        break;
 186                    }
 187                    message = next_message => {
 188                        if let Some(message) = message {
 189                            let start_time = Instant::now();
 190                            let type_name = message.payload_type_name();
 191                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 192                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 193                                let handle_message = (handler)(this.clone(), message);
 194                                executor.spawn_detached(async move {
 195                                    if let Err(err) = handle_message.await {
 196                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 197                                    } else {
 198                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 199                                    }
 200                                });
 201                                if let Some(mut notifications) = this.notifications.clone() {
 202                                    let _ = notifications.send(()).await;
 203                                }
 204                            } else {
 205                                log::warn!("unhandled message: {}", type_name);
 206                            }
 207                        } else {
 208                            log::info!("rpc connection closed {:?}", addr);
 209                            break;
 210                        }
 211                    }
 212                }
 213            }
 214
 215            if let Err(err) = this.sign_out(connection_id).await {
 216                log::error!("error signing out connection {:?} - {:?}", addr, err);
 217            }
 218        }
 219    }
 220
 221    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 222        self.peer.disconnect(connection_id);
 223        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 224
 225        for (project_id, project) in removed_connection.hosted_projects {
 226            if let Some(share) = project.share {
 227                broadcast(
 228                    connection_id,
 229                    share.guests.keys().copied().collect(),
 230                    |conn_id| {
 231                        self.peer
 232                            .send(conn_id, proto::UnshareProject { project_id })
 233                    },
 234                )?;
 235            }
 236        }
 237
 238        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 239            broadcast(connection_id, peer_ids, |conn_id| {
 240                self.peer.send(
 241                    conn_id,
 242                    proto::RemoveProjectCollaborator {
 243                        project_id,
 244                        peer_id: connection_id.0,
 245                    },
 246                )
 247            })?;
 248        }
 249
 250        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 251        Ok(())
 252    }
 253
 254    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 255        Ok(proto::Ack {})
 256    }
 257
 258    async fn register_project(
 259        mut self: Arc<Server>,
 260        request: TypedEnvelope<proto::RegisterProject>,
 261    ) -> tide::Result<proto::RegisterProjectResponse> {
 262        let project_id = {
 263            let mut state = self.state_mut();
 264            let user_id = state.user_id_for_connection(request.sender_id)?;
 265            state.register_project(request.sender_id, user_id)
 266        };
 267        Ok(proto::RegisterProjectResponse { project_id })
 268    }
 269
 270    async fn unregister_project(
 271        mut self: Arc<Server>,
 272        request: TypedEnvelope<proto::UnregisterProject>,
 273    ) -> tide::Result<()> {
 274        let project = self
 275            .state_mut()
 276            .unregister_project(request.payload.project_id, request.sender_id)?;
 277        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 278        Ok(())
 279    }
 280
 281    async fn share_project(
 282        mut self: Arc<Server>,
 283        request: TypedEnvelope<proto::ShareProject>,
 284    ) -> tide::Result<proto::Ack> {
 285        self.state_mut()
 286            .share_project(request.payload.project_id, request.sender_id);
 287        Ok(proto::Ack {})
 288    }
 289
 290    async fn unshare_project(
 291        mut self: Arc<Server>,
 292        request: TypedEnvelope<proto::UnshareProject>,
 293    ) -> tide::Result<()> {
 294        let project_id = request.payload.project_id;
 295        let project = self
 296            .state_mut()
 297            .unshare_project(project_id, request.sender_id)?;
 298
 299        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 300            self.peer
 301                .send(conn_id, proto::UnshareProject { project_id })
 302        })?;
 303        self.update_contacts_for_users(&project.authorized_user_ids)?;
 304        Ok(())
 305    }
 306
 307    async fn join_project(
 308        mut self: Arc<Server>,
 309        request: TypedEnvelope<proto::JoinProject>,
 310    ) -> tide::Result<proto::JoinProjectResponse> {
 311        let project_id = request.payload.project_id;
 312
 313        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 314        let (response, connection_ids, contact_user_ids) = self
 315            .state_mut()
 316            .join_project(request.sender_id, user_id, project_id)
 317            .and_then(|joined| {
 318                let share = joined.project.share()?;
 319                let peer_count = share.guests.len();
 320                let mut collaborators = Vec::with_capacity(peer_count);
 321                collaborators.push(proto::Collaborator {
 322                    peer_id: joined.project.host_connection_id.0,
 323                    replica_id: 0,
 324                    user_id: joined.project.host_user_id.to_proto(),
 325                });
 326                let worktrees = joined
 327                    .project
 328                    .worktrees
 329                    .iter()
 330                    .filter_map(|(id, worktree)| {
 331                        worktree.share.as_ref().map(|share| proto::Worktree {
 332                            id: *id,
 333                            root_name: worktree.root_name.clone(),
 334                            entries: share.entries.values().cloned().collect(),
 335                            diagnostic_summaries: share
 336                                .diagnostic_summaries
 337                                .values()
 338                                .cloned()
 339                                .collect(),
 340                            weak: worktree.weak,
 341                        })
 342                    })
 343                    .collect();
 344                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 345                    if *peer_conn_id != request.sender_id {
 346                        collaborators.push(proto::Collaborator {
 347                            peer_id: peer_conn_id.0,
 348                            replica_id: *peer_replica_id as u32,
 349                            user_id: peer_user_id.to_proto(),
 350                        });
 351                    }
 352                }
 353                let response = proto::JoinProjectResponse {
 354                    worktrees,
 355                    replica_id: joined.replica_id as u32,
 356                    collaborators,
 357                };
 358                let connection_ids = joined.project.connection_ids();
 359                let contact_user_ids = joined.project.authorized_user_ids();
 360                Ok((response, connection_ids, contact_user_ids))
 361            })?;
 362
 363        broadcast(request.sender_id, connection_ids, |conn_id| {
 364            self.peer.send(
 365                conn_id,
 366                proto::AddProjectCollaborator {
 367                    project_id,
 368                    collaborator: Some(proto::Collaborator {
 369                        peer_id: request.sender_id.0,
 370                        replica_id: response.replica_id,
 371                        user_id: user_id.to_proto(),
 372                    }),
 373                },
 374            )
 375        })?;
 376        self.update_contacts_for_users(&contact_user_ids)?;
 377        Ok(response)
 378    }
 379
 380    async fn leave_project(
 381        mut self: Arc<Server>,
 382        request: TypedEnvelope<proto::LeaveProject>,
 383    ) -> tide::Result<()> {
 384        let sender_id = request.sender_id;
 385        let project_id = request.payload.project_id;
 386        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 387
 388        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 389            self.peer.send(
 390                conn_id,
 391                proto::RemoveProjectCollaborator {
 392                    project_id,
 393                    peer_id: sender_id.0,
 394                },
 395            )
 396        })?;
 397        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 398
 399        Ok(())
 400    }
 401
 402    async fn register_worktree(
 403        mut self: Arc<Server>,
 404        request: TypedEnvelope<proto::RegisterWorktree>,
 405    ) -> tide::Result<proto::Ack> {
 406        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 407
 408        let mut contact_user_ids = HashSet::default();
 409        contact_user_ids.insert(host_user_id);
 410        for github_login in request.payload.authorized_logins {
 411            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 412            contact_user_ids.insert(contact_user_id);
 413        }
 414
 415        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 416        self.state_mut().register_worktree(
 417            request.payload.project_id,
 418            request.payload.worktree_id,
 419            request.sender_id,
 420            Worktree {
 421                authorized_user_ids: contact_user_ids.clone(),
 422                root_name: request.payload.root_name,
 423                share: None,
 424                weak: false,
 425            },
 426        )?;
 427        self.update_contacts_for_users(&contact_user_ids)?;
 428        Ok(proto::Ack {})
 429    }
 430
 431    async fn unregister_worktree(
 432        mut self: Arc<Server>,
 433        request: TypedEnvelope<proto::UnregisterWorktree>,
 434    ) -> tide::Result<()> {
 435        let project_id = request.payload.project_id;
 436        let worktree_id = request.payload.worktree_id;
 437        let (worktree, guest_connection_ids) =
 438            self.state_mut()
 439                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 440        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 441            self.peer.send(
 442                conn_id,
 443                proto::UnregisterWorktree {
 444                    project_id,
 445                    worktree_id,
 446                },
 447            )
 448        })?;
 449        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 450        Ok(())
 451    }
 452
 453    async fn share_worktree(
 454        mut self: Arc<Server>,
 455        mut request: TypedEnvelope<proto::ShareWorktree>,
 456    ) -> tide::Result<proto::Ack> {
 457        let worktree = request
 458            .payload
 459            .worktree
 460            .as_mut()
 461            .ok_or_else(|| anyhow!("missing worktree"))?;
 462        let entries = worktree
 463            .entries
 464            .iter()
 465            .map(|entry| (entry.id, entry.clone()))
 466            .collect();
 467        let diagnostic_summaries = worktree
 468            .diagnostic_summaries
 469            .iter()
 470            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 471            .collect();
 472
 473        let shared_worktree = self.state_mut().share_worktree(
 474            request.payload.project_id,
 475            worktree.id,
 476            request.sender_id,
 477            entries,
 478            diagnostic_summaries,
 479        )?;
 480
 481        broadcast(
 482            request.sender_id,
 483            shared_worktree.connection_ids,
 484            |connection_id| {
 485                self.peer
 486                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 487            },
 488        )?;
 489        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 490
 491        Ok(proto::Ack {})
 492    }
 493
 494    async fn update_worktree(
 495        mut self: Arc<Server>,
 496        request: TypedEnvelope<proto::UpdateWorktree>,
 497    ) -> tide::Result<()> {
 498        let connection_ids = self.state_mut().update_worktree(
 499            request.sender_id,
 500            request.payload.project_id,
 501            request.payload.worktree_id,
 502            &request.payload.removed_entries,
 503            &request.payload.updated_entries,
 504        )?;
 505
 506        broadcast(request.sender_id, connection_ids, |connection_id| {
 507            self.peer
 508                .forward_send(request.sender_id, connection_id, request.payload.clone())
 509        })?;
 510
 511        Ok(())
 512    }
 513
 514    async fn update_diagnostic_summary(
 515        mut self: Arc<Server>,
 516        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 517    ) -> tide::Result<()> {
 518        let summary = request
 519            .payload
 520            .summary
 521            .clone()
 522            .ok_or_else(|| anyhow!("invalid summary"))?;
 523        let receiver_ids = self.state_mut().update_diagnostic_summary(
 524            request.payload.project_id,
 525            request.payload.worktree_id,
 526            request.sender_id,
 527            summary,
 528        )?;
 529
 530        broadcast(request.sender_id, receiver_ids, |connection_id| {
 531            self.peer
 532                .forward_send(request.sender_id, connection_id, request.payload.clone())
 533        })?;
 534        Ok(())
 535    }
 536
 537    async fn disk_based_diagnostics_updating(
 538        self: Arc<Server>,
 539        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 540    ) -> tide::Result<()> {
 541        let receiver_ids = self
 542            .state()
 543            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 544        broadcast(request.sender_id, receiver_ids, |connection_id| {
 545            self.peer
 546                .forward_send(request.sender_id, connection_id, request.payload.clone())
 547        })?;
 548        Ok(())
 549    }
 550
 551    async fn disk_based_diagnostics_updated(
 552        self: Arc<Server>,
 553        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 554    ) -> tide::Result<()> {
 555        let receiver_ids = self
 556            .state()
 557            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 558        broadcast(request.sender_id, receiver_ids, |connection_id| {
 559            self.peer
 560                .forward_send(request.sender_id, connection_id, request.payload.clone())
 561        })?;
 562        Ok(())
 563    }
 564
 565    async fn get_definition(
 566        self: Arc<Server>,
 567        request: TypedEnvelope<proto::GetDefinition>,
 568    ) -> tide::Result<proto::GetDefinitionResponse> {
 569        let host_connection_id = self
 570            .state()
 571            .read_project(request.payload.project_id, request.sender_id)?
 572            .host_connection_id;
 573        Ok(self
 574            .peer
 575            .forward_request(request.sender_id, host_connection_id, request.payload)
 576            .await?)
 577    }
 578
 579    async fn open_buffer(
 580        self: Arc<Server>,
 581        request: TypedEnvelope<proto::OpenBuffer>,
 582    ) -> tide::Result<proto::OpenBufferResponse> {
 583        let host_connection_id = self
 584            .state()
 585            .read_project(request.payload.project_id, request.sender_id)?
 586            .host_connection_id;
 587        Ok(self
 588            .peer
 589            .forward_request(request.sender_id, host_connection_id, request.payload)
 590            .await?)
 591    }
 592
 593    async fn close_buffer(
 594        self: Arc<Server>,
 595        request: TypedEnvelope<proto::CloseBuffer>,
 596    ) -> tide::Result<()> {
 597        let host_connection_id = self
 598            .state()
 599            .read_project(request.payload.project_id, request.sender_id)?
 600            .host_connection_id;
 601        self.peer
 602            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 603        Ok(())
 604    }
 605
 606    async fn save_buffer(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::SaveBuffer>,
 609    ) -> tide::Result<proto::BufferSaved> {
 610        let host;
 611        let mut guests;
 612        {
 613            let state = self.state();
 614            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 615            host = project.host_connection_id;
 616            guests = project.guest_connection_ids()
 617        }
 618
 619        let response = self
 620            .peer
 621            .forward_request(request.sender_id, host, request.payload.clone())
 622            .await?;
 623
 624        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 625        broadcast(host, guests, |conn_id| {
 626            self.peer.forward_send(host, conn_id, response.clone())
 627        })?;
 628
 629        Ok(response)
 630    }
 631
 632    async fn format_buffers(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::FormatBuffers>,
 635    ) -> tide::Result<proto::FormatBuffersResponse> {
 636        let host = self
 637            .state()
 638            .read_project(request.payload.project_id, request.sender_id)?
 639            .host_connection_id;
 640        Ok(self
 641            .peer
 642            .forward_request(request.sender_id, host, request.payload.clone())
 643            .await?)
 644    }
 645
 646    async fn get_completions(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::GetCompletions>,
 649    ) -> tide::Result<proto::GetCompletionsResponse> {
 650        let host = self
 651            .state()
 652            .read_project(request.payload.project_id, request.sender_id)?
 653            .host_connection_id;
 654        Ok(self
 655            .peer
 656            .forward_request(request.sender_id, host, request.payload.clone())
 657            .await?)
 658    }
 659
 660    async fn apply_additional_edits_for_completion(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 663    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 664        let host = self
 665            .state()
 666            .read_project(request.payload.project_id, request.sender_id)?
 667            .host_connection_id;
 668        Ok(self
 669            .peer
 670            .forward_request(request.sender_id, host, request.payload.clone())
 671            .await?)
 672    }
 673
 674    async fn get_code_actions(
 675        self: Arc<Server>,
 676        request: TypedEnvelope<proto::GetCodeActions>,
 677    ) -> tide::Result<proto::GetCodeActionsResponse> {
 678        let host = self
 679            .state()
 680            .read_project(request.payload.project_id, request.sender_id)?
 681            .host_connection_id;
 682        Ok(self
 683            .peer
 684            .forward_request(request.sender_id, host, request.payload.clone())
 685            .await?)
 686    }
 687
 688    async fn apply_code_action(
 689        self: Arc<Server>,
 690        request: TypedEnvelope<proto::ApplyCodeAction>,
 691    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 692        let host = self
 693            .state()
 694            .read_project(request.payload.project_id, request.sender_id)?
 695            .host_connection_id;
 696        Ok(self
 697            .peer
 698            .forward_request(request.sender_id, host, request.payload.clone())
 699            .await?)
 700    }
 701
 702    async fn update_buffer(
 703        self: Arc<Server>,
 704        request: TypedEnvelope<proto::UpdateBuffer>,
 705    ) -> tide::Result<proto::Ack> {
 706        let receiver_ids = self
 707            .state()
 708            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 709        broadcast(request.sender_id, receiver_ids, |connection_id| {
 710            self.peer
 711                .forward_send(request.sender_id, connection_id, request.payload.clone())
 712        })?;
 713        Ok(proto::Ack {})
 714    }
 715
 716    async fn update_buffer_file(
 717        self: Arc<Server>,
 718        request: TypedEnvelope<proto::UpdateBufferFile>,
 719    ) -> tide::Result<()> {
 720        let receiver_ids = self
 721            .state()
 722            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 723        broadcast(request.sender_id, receiver_ids, |connection_id| {
 724            self.peer
 725                .forward_send(request.sender_id, connection_id, request.payload.clone())
 726        })?;
 727        Ok(())
 728    }
 729
 730    async fn buffer_reloaded(
 731        self: Arc<Server>,
 732        request: TypedEnvelope<proto::BufferReloaded>,
 733    ) -> tide::Result<()> {
 734        let receiver_ids = self
 735            .state()
 736            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 737        broadcast(request.sender_id, receiver_ids, |connection_id| {
 738            self.peer
 739                .forward_send(request.sender_id, connection_id, request.payload.clone())
 740        })?;
 741        Ok(())
 742    }
 743
 744    async fn buffer_saved(
 745        self: Arc<Server>,
 746        request: TypedEnvelope<proto::BufferSaved>,
 747    ) -> tide::Result<()> {
 748        let receiver_ids = self
 749            .state()
 750            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 751        broadcast(request.sender_id, receiver_ids, |connection_id| {
 752            self.peer
 753                .forward_send(request.sender_id, connection_id, request.payload.clone())
 754        })?;
 755        Ok(())
 756    }
 757
 758    async fn get_channels(
 759        self: Arc<Server>,
 760        request: TypedEnvelope<proto::GetChannels>,
 761    ) -> tide::Result<proto::GetChannelsResponse> {
 762        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 763        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 764        Ok(proto::GetChannelsResponse {
 765            channels: channels
 766                .into_iter()
 767                .map(|chan| proto::Channel {
 768                    id: chan.id.to_proto(),
 769                    name: chan.name,
 770                })
 771                .collect(),
 772        })
 773    }
 774
 775    async fn get_users(
 776        self: Arc<Server>,
 777        request: TypedEnvelope<proto::GetUsers>,
 778    ) -> tide::Result<proto::GetUsersResponse> {
 779        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 780        let users = self
 781            .app_state
 782            .db
 783            .get_users_by_ids(user_ids)
 784            .await?
 785            .into_iter()
 786            .map(|user| proto::User {
 787                id: user.id.to_proto(),
 788                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 789                github_login: user.github_login,
 790            })
 791            .collect();
 792        Ok(proto::GetUsersResponse { users })
 793    }
 794
 795    fn update_contacts_for_users<'a>(
 796        self: &Arc<Server>,
 797        user_ids: impl IntoIterator<Item = &'a UserId>,
 798    ) -> anyhow::Result<()> {
 799        let mut result = Ok(());
 800        let state = self.state();
 801        for user_id in user_ids {
 802            let contacts = state.contacts_for_user(*user_id);
 803            for connection_id in state.connection_ids_for_user(*user_id) {
 804                if let Err(error) = self.peer.send(
 805                    connection_id,
 806                    proto::UpdateContacts {
 807                        contacts: contacts.clone(),
 808                    },
 809                ) {
 810                    result = Err(error);
 811                }
 812            }
 813        }
 814        result
 815    }
 816
 817    async fn join_channel(
 818        mut self: Arc<Self>,
 819        request: TypedEnvelope<proto::JoinChannel>,
 820    ) -> tide::Result<proto::JoinChannelResponse> {
 821        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 822        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 823        if !self
 824            .app_state
 825            .db
 826            .can_user_access_channel(user_id, channel_id)
 827            .await?
 828        {
 829            Err(anyhow!("access denied"))?;
 830        }
 831
 832        self.state_mut().join_channel(request.sender_id, channel_id);
 833        let messages = self
 834            .app_state
 835            .db
 836            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 837            .await?
 838            .into_iter()
 839            .map(|msg| proto::ChannelMessage {
 840                id: msg.id.to_proto(),
 841                body: msg.body,
 842                timestamp: msg.sent_at.unix_timestamp() as u64,
 843                sender_id: msg.sender_id.to_proto(),
 844                nonce: Some(msg.nonce.as_u128().into()),
 845            })
 846            .collect::<Vec<_>>();
 847        Ok(proto::JoinChannelResponse {
 848            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 849            messages,
 850        })
 851    }
 852
 853    async fn leave_channel(
 854        mut self: Arc<Self>,
 855        request: TypedEnvelope<proto::LeaveChannel>,
 856    ) -> tide::Result<()> {
 857        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 858        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 859        if !self
 860            .app_state
 861            .db
 862            .can_user_access_channel(user_id, channel_id)
 863            .await?
 864        {
 865            Err(anyhow!("access denied"))?;
 866        }
 867
 868        self.state_mut()
 869            .leave_channel(request.sender_id, channel_id);
 870
 871        Ok(())
 872    }
 873
 874    async fn send_channel_message(
 875        self: Arc<Self>,
 876        request: TypedEnvelope<proto::SendChannelMessage>,
 877    ) -> tide::Result<proto::SendChannelMessageResponse> {
 878        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 879        let user_id;
 880        let connection_ids;
 881        {
 882            let state = self.state();
 883            user_id = state.user_id_for_connection(request.sender_id)?;
 884            connection_ids = state.channel_connection_ids(channel_id)?;
 885        }
 886
 887        // Validate the message body.
 888        let body = request.payload.body.trim().to_string();
 889        if body.len() > MAX_MESSAGE_LEN {
 890            return Err(anyhow!("message is too long"))?;
 891        }
 892        if body.is_empty() {
 893            return Err(anyhow!("message can't be blank"))?;
 894        }
 895
 896        let timestamp = OffsetDateTime::now_utc();
 897        let nonce = request
 898            .payload
 899            .nonce
 900            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 901
 902        let message_id = self
 903            .app_state
 904            .db
 905            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 906            .await?
 907            .to_proto();
 908        let message = proto::ChannelMessage {
 909            sender_id: user_id.to_proto(),
 910            id: message_id,
 911            body,
 912            timestamp: timestamp.unix_timestamp() as u64,
 913            nonce: Some(nonce),
 914        };
 915        broadcast(request.sender_id, connection_ids, |conn_id| {
 916            self.peer.send(
 917                conn_id,
 918                proto::ChannelMessageSent {
 919                    channel_id: channel_id.to_proto(),
 920                    message: Some(message.clone()),
 921                },
 922            )
 923        })?;
 924        Ok(proto::SendChannelMessageResponse {
 925            message: Some(message),
 926        })
 927    }
 928
 929    async fn get_channel_messages(
 930        self: Arc<Self>,
 931        request: TypedEnvelope<proto::GetChannelMessages>,
 932    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 933        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 934        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 935        if !self
 936            .app_state
 937            .db
 938            .can_user_access_channel(user_id, channel_id)
 939            .await?
 940        {
 941            Err(anyhow!("access denied"))?;
 942        }
 943
 944        let messages = self
 945            .app_state
 946            .db
 947            .get_channel_messages(
 948                channel_id,
 949                MESSAGE_COUNT_PER_PAGE,
 950                Some(MessageId::from_proto(request.payload.before_message_id)),
 951            )
 952            .await?
 953            .into_iter()
 954            .map(|msg| proto::ChannelMessage {
 955                id: msg.id.to_proto(),
 956                body: msg.body,
 957                timestamp: msg.sent_at.unix_timestamp() as u64,
 958                sender_id: msg.sender_id.to_proto(),
 959                nonce: Some(msg.nonce.as_u128().into()),
 960            })
 961            .collect::<Vec<_>>();
 962
 963        Ok(proto::GetChannelMessagesResponse {
 964            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 965            messages,
 966        })
 967    }
 968
 969    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 970        self.store.read()
 971    }
 972
 973    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 974        self.store.write()
 975    }
 976}
 977
 978impl Executor for RealExecutor {
 979    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 980        task::spawn(future);
 981    }
 982}
 983
 984fn broadcast<F>(
 985    sender_id: ConnectionId,
 986    receiver_ids: Vec<ConnectionId>,
 987    mut f: F,
 988) -> anyhow::Result<()>
 989where
 990    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 991{
 992    let mut result = Ok(());
 993    for receiver_id in receiver_ids {
 994        if receiver_id != sender_id {
 995            if let Err(error) = f(receiver_id) {
 996                if result.is_ok() {
 997                    result = Err(error);
 998                }
 999            }
1000        }
1001    }
1002    result
1003}
1004
1005pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1006    let server = Server::new(app.state().clone(), rpc.clone(), None);
1007    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1008        let server = server.clone();
1009        async move {
1010            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1011
1012            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1013            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1014            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1015            let client_protocol_version: Option<u32> = request
1016                .header("X-Zed-Protocol-Version")
1017                .and_then(|v| v.as_str().parse().ok());
1018
1019            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1020                return Ok(Response::new(StatusCode::UpgradeRequired));
1021            }
1022
1023            let header = match request.header("Sec-Websocket-Key") {
1024                Some(h) => h.as_str(),
1025                None => return Err(anyhow!("expected sec-websocket-key"))?,
1026            };
1027
1028            let user_id = process_auth_header(&request).await?;
1029
1030            let mut response = Response::new(StatusCode::SwitchingProtocols);
1031            response.insert_header(UPGRADE, "websocket");
1032            response.insert_header(CONNECTION, "Upgrade");
1033            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1034            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1035            response.insert_header("Sec-Websocket-Version", "13");
1036
1037            let http_res: &mut tide::http::Response = response.as_mut();
1038            let upgrade_receiver = http_res.recv_upgrade().await;
1039            let addr = request.remote().unwrap_or("unknown").to_string();
1040            task::spawn(async move {
1041                if let Some(stream) = upgrade_receiver.await {
1042                    server
1043                        .handle_connection(
1044                            Connection::new(
1045                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1046                            ),
1047                            addr,
1048                            user_id,
1049                            None,
1050                            RealExecutor,
1051                        )
1052                        .await;
1053                }
1054            });
1055
1056            Ok(response)
1057        }
1058    });
1059}
1060
1061fn header_contains_ignore_case<T>(
1062    request: &tide::Request<T>,
1063    header_name: HeaderName,
1064    value: &str,
1065) -> bool {
1066    request
1067        .header(header_name)
1068        .map(|h| {
1069            h.as_str()
1070                .split(',')
1071                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1072        })
1073        .unwrap_or(false)
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::*;
1079    use crate::{
1080        auth,
1081        db::{tests::TestDb, UserId},
1082        github, AppState, Config,
1083    };
1084    use ::rpc::Peer;
1085    use gpui::{executor, ModelHandle, TestAppContext};
1086    use parking_lot::Mutex;
1087    use postage::{mpsc, watch};
1088    use rand::prelude::*;
1089    use rpc::PeerId;
1090    use serde_json::json;
1091    use sqlx::types::time::OffsetDateTime;
1092    use std::{
1093        ops::Deref,
1094        path::Path,
1095        rc::Rc,
1096        sync::{
1097            atomic::{AtomicBool, Ordering::SeqCst},
1098            Arc,
1099        },
1100        time::Duration,
1101    };
1102    use zed::{
1103        client::{
1104            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1105            EstablishConnectionError, UserStore,
1106        },
1107        editor::{
1108            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1109            Redo, ToggleCodeActions, Undo,
1110        },
1111        fs::{FakeFs, Fs as _},
1112        language::{
1113            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1114            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1115        },
1116        lsp,
1117        project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath},
1118        workspace::{Workspace, WorkspaceParams},
1119    };
1120
1121    #[cfg(test)]
1122    #[ctor::ctor]
1123    fn init_logger() {
1124        if std::env::var("RUST_LOG").is_ok() {
1125            env_logger::init();
1126        }
1127    }
1128
1129    #[gpui::test(iterations = 10)]
1130    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1131        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1132        let lang_registry = Arc::new(LanguageRegistry::new());
1133        let fs = Arc::new(FakeFs::new(cx_a.background()));
1134        cx_a.foreground().forbid_parking();
1135
1136        // Connect to a server as 2 clients.
1137        let mut server = TestServer::start(cx_a.foreground()).await;
1138        let client_a = server.create_client(&mut cx_a, "user_a").await;
1139        let client_b = server.create_client(&mut cx_b, "user_b").await;
1140
1141        // Share a project as client A
1142        fs.insert_tree(
1143            "/a",
1144            json!({
1145                ".zed.toml": r#"collaborators = ["user_b"]"#,
1146                "a.txt": "a-contents",
1147                "b.txt": "b-contents",
1148            }),
1149        )
1150        .await;
1151        let project_a = cx_a.update(|cx| {
1152            Project::local(
1153                client_a.clone(),
1154                client_a.user_store.clone(),
1155                lang_registry.clone(),
1156                fs.clone(),
1157                cx,
1158            )
1159        });
1160        let (worktree_a, _) = project_a
1161            .update(&mut cx_a, |p, cx| {
1162                p.find_or_create_local_worktree("/a", false, cx)
1163            })
1164            .await
1165            .unwrap();
1166        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1167        worktree_a
1168            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1169            .await;
1170        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1171        project_a
1172            .update(&mut cx_a, |p, cx| p.share(cx))
1173            .await
1174            .unwrap();
1175
1176        // Join that project as client B
1177        let project_b = Project::remote(
1178            project_id,
1179            client_b.clone(),
1180            client_b.user_store.clone(),
1181            lang_registry.clone(),
1182            fs.clone(),
1183            &mut cx_b.to_async(),
1184        )
1185        .await
1186        .unwrap();
1187
1188        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1189            assert_eq!(
1190                project
1191                    .collaborators()
1192                    .get(&client_a.peer_id)
1193                    .unwrap()
1194                    .user
1195                    .github_login,
1196                "user_a"
1197            );
1198            project.replica_id()
1199        });
1200        project_a
1201            .condition(&cx_a, |tree, _| {
1202                tree.collaborators()
1203                    .get(&client_b.peer_id)
1204                    .map_or(false, |collaborator| {
1205                        collaborator.replica_id == replica_id_b
1206                            && collaborator.user.github_login == "user_b"
1207                    })
1208            })
1209            .await;
1210
1211        // Open the same file as client B and client A.
1212        let buffer_b = project_b
1213            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1214            .await
1215            .unwrap();
1216        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1217        buffer_b.read_with(&cx_b, |buf, cx| {
1218            assert_eq!(buf.read(cx).text(), "b-contents")
1219        });
1220        project_a.read_with(&cx_a, |project, cx| {
1221            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1222        });
1223        let buffer_a = project_a
1224            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1225            .await
1226            .unwrap();
1227
1228        let editor_b = cx_b.add_view(window_b, |cx| {
1229            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1230        });
1231
1232        // TODO
1233        // // Create a selection set as client B and see that selection set as client A.
1234        // buffer_a
1235        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1236        //     .await;
1237
1238        // Edit the buffer as client B and see that edit as client A.
1239        editor_b.update(&mut cx_b, |editor, cx| {
1240            editor.handle_input(&Input("ok, ".into()), cx)
1241        });
1242        buffer_a
1243            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1244            .await;
1245
1246        // TODO
1247        // // Remove the selection set as client B, see those selections disappear as client A.
1248        cx_b.update(move |_| drop(editor_b));
1249        // buffer_a
1250        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1251        //     .await;
1252
1253        // Close the buffer as client A, see that the buffer is closed.
1254        cx_a.update(move |_| drop(buffer_a));
1255        project_a
1256            .condition(&cx_a, |project, cx| {
1257                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1258            })
1259            .await;
1260
1261        // Dropping the client B's project removes client B from client A's collaborators.
1262        cx_b.update(move |_| drop(project_b));
1263        project_a
1264            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1265            .await;
1266    }
1267
1268    #[gpui::test(iterations = 10)]
1269    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1270        let lang_registry = Arc::new(LanguageRegistry::new());
1271        let fs = Arc::new(FakeFs::new(cx_a.background()));
1272        cx_a.foreground().forbid_parking();
1273
1274        // Connect to a server as 2 clients.
1275        let mut server = TestServer::start(cx_a.foreground()).await;
1276        let client_a = server.create_client(&mut cx_a, "user_a").await;
1277        let client_b = server.create_client(&mut cx_b, "user_b").await;
1278
1279        // Share a project as client A
1280        fs.insert_tree(
1281            "/a",
1282            json!({
1283                ".zed.toml": r#"collaborators = ["user_b"]"#,
1284                "a.txt": "a-contents",
1285                "b.txt": "b-contents",
1286            }),
1287        )
1288        .await;
1289        let project_a = cx_a.update(|cx| {
1290            Project::local(
1291                client_a.clone(),
1292                client_a.user_store.clone(),
1293                lang_registry.clone(),
1294                fs.clone(),
1295                cx,
1296            )
1297        });
1298        let (worktree_a, _) = project_a
1299            .update(&mut cx_a, |p, cx| {
1300                p.find_or_create_local_worktree("/a", false, cx)
1301            })
1302            .await
1303            .unwrap();
1304        worktree_a
1305            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1306            .await;
1307        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1308        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1309        project_a
1310            .update(&mut cx_a, |p, cx| p.share(cx))
1311            .await
1312            .unwrap();
1313        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1314
1315        // Join that project as client B
1316        let project_b = Project::remote(
1317            project_id,
1318            client_b.clone(),
1319            client_b.user_store.clone(),
1320            lang_registry.clone(),
1321            fs.clone(),
1322            &mut cx_b.to_async(),
1323        )
1324        .await
1325        .unwrap();
1326        project_b
1327            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1328            .await
1329            .unwrap();
1330
1331        // Unshare the project as client A
1332        project_a
1333            .update(&mut cx_a, |project, cx| project.unshare(cx))
1334            .await
1335            .unwrap();
1336        project_b
1337            .condition(&mut cx_b, |project, _| project.is_read_only())
1338            .await;
1339        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1340        drop(project_b);
1341
1342        // Share the project again and ensure guests can still join.
1343        project_a
1344            .update(&mut cx_a, |project, cx| project.share(cx))
1345            .await
1346            .unwrap();
1347        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1348
1349        let project_c = Project::remote(
1350            project_id,
1351            client_b.clone(),
1352            client_b.user_store.clone(),
1353            lang_registry.clone(),
1354            fs.clone(),
1355            &mut cx_b.to_async(),
1356        )
1357        .await
1358        .unwrap();
1359        project_c
1360            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1361            .await
1362            .unwrap();
1363    }
1364
1365    #[gpui::test(iterations = 10)]
1366    async fn test_propagate_saves_and_fs_changes(
1367        mut cx_a: TestAppContext,
1368        mut cx_b: TestAppContext,
1369        mut cx_c: TestAppContext,
1370    ) {
1371        let lang_registry = Arc::new(LanguageRegistry::new());
1372        let fs = Arc::new(FakeFs::new(cx_a.background()));
1373        cx_a.foreground().forbid_parking();
1374
1375        // Connect to a server as 3 clients.
1376        let mut server = TestServer::start(cx_a.foreground()).await;
1377        let client_a = server.create_client(&mut cx_a, "user_a").await;
1378        let client_b = server.create_client(&mut cx_b, "user_b").await;
1379        let client_c = server.create_client(&mut cx_c, "user_c").await;
1380
1381        // Share a worktree as client A.
1382        fs.insert_tree(
1383            "/a",
1384            json!({
1385                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1386                "file1": "",
1387                "file2": ""
1388            }),
1389        )
1390        .await;
1391        let project_a = cx_a.update(|cx| {
1392            Project::local(
1393                client_a.clone(),
1394                client_a.user_store.clone(),
1395                lang_registry.clone(),
1396                fs.clone(),
1397                cx,
1398            )
1399        });
1400        let (worktree_a, _) = project_a
1401            .update(&mut cx_a, |p, cx| {
1402                p.find_or_create_local_worktree("/a", false, cx)
1403            })
1404            .await
1405            .unwrap();
1406        worktree_a
1407            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1408            .await;
1409        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1410        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1411        project_a
1412            .update(&mut cx_a, |p, cx| p.share(cx))
1413            .await
1414            .unwrap();
1415
1416        // Join that worktree as clients B and C.
1417        let project_b = Project::remote(
1418            project_id,
1419            client_b.clone(),
1420            client_b.user_store.clone(),
1421            lang_registry.clone(),
1422            fs.clone(),
1423            &mut cx_b.to_async(),
1424        )
1425        .await
1426        .unwrap();
1427        let project_c = Project::remote(
1428            project_id,
1429            client_c.clone(),
1430            client_c.user_store.clone(),
1431            lang_registry.clone(),
1432            fs.clone(),
1433            &mut cx_c.to_async(),
1434        )
1435        .await
1436        .unwrap();
1437        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1438        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1439
1440        // Open and edit a buffer as both guests B and C.
1441        let buffer_b = project_b
1442            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1443            .await
1444            .unwrap();
1445        let buffer_c = project_c
1446            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1447            .await
1448            .unwrap();
1449        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1450        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1451
1452        // Open and edit that buffer as the host.
1453        let buffer_a = project_a
1454            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1455            .await
1456            .unwrap();
1457
1458        buffer_a
1459            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1460            .await;
1461        buffer_a.update(&mut cx_a, |buf, cx| {
1462            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1463        });
1464
1465        // Wait for edits to propagate
1466        buffer_a
1467            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1468            .await;
1469        buffer_b
1470            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1471            .await;
1472        buffer_c
1473            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1474            .await;
1475
1476        // Edit the buffer as the host and concurrently save as guest B.
1477        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1478        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1479        save_b.await.unwrap();
1480        assert_eq!(
1481            fs.load("/a/file1".as_ref()).await.unwrap(),
1482            "hi-a, i-am-c, i-am-b, i-am-a"
1483        );
1484        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1485        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1486        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1487
1488        // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise
1489        // when interpreting the change event it will mistakenly think that the file has been
1490        // deleted (because its path has changed) and will subsequently fail to detect the rename.
1491        worktree_a.flush_fs_events(&cx_a).await;
1492
1493        // Make changes on host's file system, see those changes on guest worktrees.
1494        fs.rename(
1495            "/a/file1".as_ref(),
1496            "/a/file1-renamed".as_ref(),
1497            Default::default(),
1498        )
1499        .await
1500        .unwrap();
1501        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1502            .await
1503            .unwrap();
1504        fs.insert_file(Path::new("/a/file4"), "4".into())
1505            .await
1506            .unwrap();
1507
1508        worktree_a
1509            .condition(&cx_a, |tree, _| {
1510                tree.paths()
1511                    .map(|p| p.to_string_lossy())
1512                    .collect::<Vec<_>>()
1513                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1514            })
1515            .await;
1516        worktree_b
1517            .condition(&cx_b, |tree, _| {
1518                tree.paths()
1519                    .map(|p| p.to_string_lossy())
1520                    .collect::<Vec<_>>()
1521                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1522            })
1523            .await;
1524        worktree_c
1525            .condition(&cx_c, |tree, _| {
1526                tree.paths()
1527                    .map(|p| p.to_string_lossy())
1528                    .collect::<Vec<_>>()
1529                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1530            })
1531            .await;
1532
1533        // Ensure buffer files are updated as well.
1534        buffer_a
1535            .condition(&cx_a, |buf, _| {
1536                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1537            })
1538            .await;
1539        buffer_b
1540            .condition(&cx_b, |buf, _| {
1541                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1542            })
1543            .await;
1544        buffer_c
1545            .condition(&cx_c, |buf, _| {
1546                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1547            })
1548            .await;
1549    }
1550
1551    #[gpui::test(iterations = 10)]
1552    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1553        cx_a.foreground().forbid_parking();
1554        let lang_registry = Arc::new(LanguageRegistry::new());
1555        let fs = Arc::new(FakeFs::new(cx_a.background()));
1556
1557        // Connect to a server as 2 clients.
1558        let mut server = TestServer::start(cx_a.foreground()).await;
1559        let client_a = server.create_client(&mut cx_a, "user_a").await;
1560        let client_b = server.create_client(&mut cx_b, "user_b").await;
1561
1562        // Share a project as client A
1563        fs.insert_tree(
1564            "/dir",
1565            json!({
1566                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1567                "a.txt": "a-contents",
1568            }),
1569        )
1570        .await;
1571
1572        let project_a = cx_a.update(|cx| {
1573            Project::local(
1574                client_a.clone(),
1575                client_a.user_store.clone(),
1576                lang_registry.clone(),
1577                fs.clone(),
1578                cx,
1579            )
1580        });
1581        let (worktree_a, _) = project_a
1582            .update(&mut cx_a, |p, cx| {
1583                p.find_or_create_local_worktree("/dir", false, cx)
1584            })
1585            .await
1586            .unwrap();
1587        worktree_a
1588            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1589            .await;
1590        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1591        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1592        project_a
1593            .update(&mut cx_a, |p, cx| p.share(cx))
1594            .await
1595            .unwrap();
1596
1597        // Join that project as client B
1598        let project_b = Project::remote(
1599            project_id,
1600            client_b.clone(),
1601            client_b.user_store.clone(),
1602            lang_registry.clone(),
1603            fs.clone(),
1604            &mut cx_b.to_async(),
1605        )
1606        .await
1607        .unwrap();
1608        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1609
1610        // Open a buffer as client B
1611        let buffer_b = project_b
1612            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1613            .await
1614            .unwrap();
1615        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1616
1617        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1618        buffer_b.read_with(&cx_b, |buf, _| {
1619            assert!(buf.is_dirty());
1620            assert!(!buf.has_conflict());
1621        });
1622
1623        buffer_b
1624            .update(&mut cx_b, |buf, cx| buf.save(cx))
1625            .await
1626            .unwrap();
1627        worktree_b
1628            .condition(&cx_b, |_, cx| {
1629                buffer_b.read(cx).file().unwrap().mtime() != mtime
1630            })
1631            .await;
1632        buffer_b.read_with(&cx_b, |buf, _| {
1633            assert!(!buf.is_dirty());
1634            assert!(!buf.has_conflict());
1635        });
1636
1637        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1638        buffer_b.read_with(&cx_b, |buf, _| {
1639            assert!(buf.is_dirty());
1640            assert!(!buf.has_conflict());
1641        });
1642    }
1643
1644    #[gpui::test(iterations = 10)]
1645    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1646        cx_a.foreground().forbid_parking();
1647        let lang_registry = Arc::new(LanguageRegistry::new());
1648        let fs = Arc::new(FakeFs::new(cx_a.background()));
1649
1650        // Connect to a server as 2 clients.
1651        let mut server = TestServer::start(cx_a.foreground()).await;
1652        let client_a = server.create_client(&mut cx_a, "user_a").await;
1653        let client_b = server.create_client(&mut cx_b, "user_b").await;
1654
1655        // Share a project as client A
1656        fs.insert_tree(
1657            "/dir",
1658            json!({
1659                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1660                "a.txt": "a-contents",
1661            }),
1662        )
1663        .await;
1664
1665        let project_a = cx_a.update(|cx| {
1666            Project::local(
1667                client_a.clone(),
1668                client_a.user_store.clone(),
1669                lang_registry.clone(),
1670                fs.clone(),
1671                cx,
1672            )
1673        });
1674        let (worktree_a, _) = project_a
1675            .update(&mut cx_a, |p, cx| {
1676                p.find_or_create_local_worktree("/dir", false, cx)
1677            })
1678            .await
1679            .unwrap();
1680        worktree_a
1681            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1682            .await;
1683        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1684        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1685        project_a
1686            .update(&mut cx_a, |p, cx| p.share(cx))
1687            .await
1688            .unwrap();
1689
1690        // Join that project as client B
1691        let project_b = Project::remote(
1692            project_id,
1693            client_b.clone(),
1694            client_b.user_store.clone(),
1695            lang_registry.clone(),
1696            fs.clone(),
1697            &mut cx_b.to_async(),
1698        )
1699        .await
1700        .unwrap();
1701        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1702
1703        // Open a buffer as client B
1704        let buffer_b = project_b
1705            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1706            .await
1707            .unwrap();
1708        buffer_b.read_with(&cx_b, |buf, _| {
1709            assert!(!buf.is_dirty());
1710            assert!(!buf.has_conflict());
1711        });
1712
1713        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1714            .await
1715            .unwrap();
1716        buffer_b
1717            .condition(&cx_b, |buf, _| {
1718                buf.text() == "new contents" && !buf.is_dirty()
1719            })
1720            .await;
1721        buffer_b.read_with(&cx_b, |buf, _| {
1722            assert!(!buf.has_conflict());
1723        });
1724    }
1725
1726    #[gpui::test(iterations = 10)]
1727    async fn test_editing_while_guest_opens_buffer(
1728        mut cx_a: TestAppContext,
1729        mut cx_b: TestAppContext,
1730    ) {
1731        cx_a.foreground().forbid_parking();
1732        let lang_registry = Arc::new(LanguageRegistry::new());
1733        let fs = Arc::new(FakeFs::new(cx_a.background()));
1734
1735        // Connect to a server as 2 clients.
1736        let mut server = TestServer::start(cx_a.foreground()).await;
1737        let client_a = server.create_client(&mut cx_a, "user_a").await;
1738        let client_b = server.create_client(&mut cx_b, "user_b").await;
1739
1740        // Share a project as client A
1741        fs.insert_tree(
1742            "/dir",
1743            json!({
1744                ".zed.toml": r#"collaborators = ["user_b"]"#,
1745                "a.txt": "a-contents",
1746            }),
1747        )
1748        .await;
1749        let project_a = cx_a.update(|cx| {
1750            Project::local(
1751                client_a.clone(),
1752                client_a.user_store.clone(),
1753                lang_registry.clone(),
1754                fs.clone(),
1755                cx,
1756            )
1757        });
1758        let (worktree_a, _) = project_a
1759            .update(&mut cx_a, |p, cx| {
1760                p.find_or_create_local_worktree("/dir", false, cx)
1761            })
1762            .await
1763            .unwrap();
1764        worktree_a
1765            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1766            .await;
1767        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1768        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1769        project_a
1770            .update(&mut cx_a, |p, cx| p.share(cx))
1771            .await
1772            .unwrap();
1773
1774        // Join that project as client B
1775        let project_b = Project::remote(
1776            project_id,
1777            client_b.clone(),
1778            client_b.user_store.clone(),
1779            lang_registry.clone(),
1780            fs.clone(),
1781            &mut cx_b.to_async(),
1782        )
1783        .await
1784        .unwrap();
1785
1786        // Open a buffer as client A
1787        let buffer_a = project_a
1788            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1789            .await
1790            .unwrap();
1791
1792        // Start opening the same buffer as client B
1793        let buffer_b = cx_b
1794            .background()
1795            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1796
1797        // Edit the buffer as client A while client B is still opening it.
1798        cx_b.background().simulate_random_delay().await;
1799        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1800        cx_b.background().simulate_random_delay().await;
1801        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1802
1803        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1804        let buffer_b = buffer_b.await.unwrap();
1805        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1806    }
1807
1808    #[gpui::test(iterations = 10)]
1809    async fn test_leaving_worktree_while_opening_buffer(
1810        mut cx_a: TestAppContext,
1811        mut cx_b: TestAppContext,
1812    ) {
1813        cx_a.foreground().forbid_parking();
1814        let lang_registry = Arc::new(LanguageRegistry::new());
1815        let fs = Arc::new(FakeFs::new(cx_a.background()));
1816
1817        // Connect to a server as 2 clients.
1818        let mut server = TestServer::start(cx_a.foreground()).await;
1819        let client_a = server.create_client(&mut cx_a, "user_a").await;
1820        let client_b = server.create_client(&mut cx_b, "user_b").await;
1821
1822        // Share a project as client A
1823        fs.insert_tree(
1824            "/dir",
1825            json!({
1826                ".zed.toml": r#"collaborators = ["user_b"]"#,
1827                "a.txt": "a-contents",
1828            }),
1829        )
1830        .await;
1831        let project_a = cx_a.update(|cx| {
1832            Project::local(
1833                client_a.clone(),
1834                client_a.user_store.clone(),
1835                lang_registry.clone(),
1836                fs.clone(),
1837                cx,
1838            )
1839        });
1840        let (worktree_a, _) = project_a
1841            .update(&mut cx_a, |p, cx| {
1842                p.find_or_create_local_worktree("/dir", false, cx)
1843            })
1844            .await
1845            .unwrap();
1846        worktree_a
1847            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1848            .await;
1849        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1850        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1851        project_a
1852            .update(&mut cx_a, |p, cx| p.share(cx))
1853            .await
1854            .unwrap();
1855
1856        // Join that project as client B
1857        let project_b = Project::remote(
1858            project_id,
1859            client_b.clone(),
1860            client_b.user_store.clone(),
1861            lang_registry.clone(),
1862            fs.clone(),
1863            &mut cx_b.to_async(),
1864        )
1865        .await
1866        .unwrap();
1867
1868        // See that a guest has joined as client A.
1869        project_a
1870            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1871            .await;
1872
1873        // Begin opening a buffer as client B, but leave the project before the open completes.
1874        let buffer_b = cx_b
1875            .background()
1876            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1877        cx_b.update(|_| drop(project_b));
1878        drop(buffer_b);
1879
1880        // See that the guest has left.
1881        project_a
1882            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1883            .await;
1884    }
1885
1886    #[gpui::test(iterations = 10)]
1887    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1888        cx_a.foreground().forbid_parking();
1889        let lang_registry = Arc::new(LanguageRegistry::new());
1890        let fs = Arc::new(FakeFs::new(cx_a.background()));
1891
1892        // Connect to a server as 2 clients.
1893        let mut server = TestServer::start(cx_a.foreground()).await;
1894        let client_a = server.create_client(&mut cx_a, "user_a").await;
1895        let client_b = server.create_client(&mut cx_b, "user_b").await;
1896
1897        // Share a project as client A
1898        fs.insert_tree(
1899            "/a",
1900            json!({
1901                ".zed.toml": r#"collaborators = ["user_b"]"#,
1902                "a.txt": "a-contents",
1903                "b.txt": "b-contents",
1904            }),
1905        )
1906        .await;
1907        let project_a = cx_a.update(|cx| {
1908            Project::local(
1909                client_a.clone(),
1910                client_a.user_store.clone(),
1911                lang_registry.clone(),
1912                fs.clone(),
1913                cx,
1914            )
1915        });
1916        let (worktree_a, _) = project_a
1917            .update(&mut cx_a, |p, cx| {
1918                p.find_or_create_local_worktree("/a", false, cx)
1919            })
1920            .await
1921            .unwrap();
1922        worktree_a
1923            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1924            .await;
1925        let project_id = project_a
1926            .update(&mut cx_a, |project, _| project.next_remote_id())
1927            .await;
1928        project_a
1929            .update(&mut cx_a, |project, cx| project.share(cx))
1930            .await
1931            .unwrap();
1932
1933        // Join that project as client B
1934        let _project_b = Project::remote(
1935            project_id,
1936            client_b.clone(),
1937            client_b.user_store.clone(),
1938            lang_registry.clone(),
1939            fs.clone(),
1940            &mut cx_b.to_async(),
1941        )
1942        .await
1943        .unwrap();
1944
1945        // See that a guest has joined as client A.
1946        project_a
1947            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1948            .await;
1949
1950        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1951        client_b.disconnect(&cx_b.to_async()).unwrap();
1952        project_a
1953            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1954            .await;
1955    }
1956
1957    #[gpui::test(iterations = 10)]
1958    async fn test_collaborating_with_diagnostics(
1959        mut cx_a: TestAppContext,
1960        mut cx_b: TestAppContext,
1961    ) {
1962        cx_a.foreground().forbid_parking();
1963        let mut lang_registry = Arc::new(LanguageRegistry::new());
1964        let fs = Arc::new(FakeFs::new(cx_a.background()));
1965
1966        // Set up a fake language server.
1967        let (language_server_config, mut fake_language_server) =
1968            LanguageServerConfig::fake(&cx_a).await;
1969        Arc::get_mut(&mut lang_registry)
1970            .unwrap()
1971            .add(Arc::new(Language::new(
1972                LanguageConfig {
1973                    name: "Rust".to_string(),
1974                    path_suffixes: vec!["rs".to_string()],
1975                    language_server: Some(language_server_config),
1976                    ..Default::default()
1977                },
1978                Some(tree_sitter_rust::language()),
1979            )));
1980
1981        // Connect to a server as 2 clients.
1982        let mut server = TestServer::start(cx_a.foreground()).await;
1983        let client_a = server.create_client(&mut cx_a, "user_a").await;
1984        let client_b = server.create_client(&mut cx_b, "user_b").await;
1985
1986        // Share a project as client A
1987        fs.insert_tree(
1988            "/a",
1989            json!({
1990                ".zed.toml": r#"collaborators = ["user_b"]"#,
1991                "a.rs": "let one = two",
1992                "other.rs": "",
1993            }),
1994        )
1995        .await;
1996        let project_a = cx_a.update(|cx| {
1997            Project::local(
1998                client_a.clone(),
1999                client_a.user_store.clone(),
2000                lang_registry.clone(),
2001                fs.clone(),
2002                cx,
2003            )
2004        });
2005        let (worktree_a, _) = project_a
2006            .update(&mut cx_a, |p, cx| {
2007                p.find_or_create_local_worktree("/a", false, cx)
2008            })
2009            .await
2010            .unwrap();
2011        worktree_a
2012            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2013            .await;
2014        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2015        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2016        project_a
2017            .update(&mut cx_a, |p, cx| p.share(cx))
2018            .await
2019            .unwrap();
2020
2021        // Cause the language server to start.
2022        let _ = cx_a
2023            .background()
2024            .spawn(project_a.update(&mut cx_a, |project, cx| {
2025                project.open_buffer(
2026                    ProjectPath {
2027                        worktree_id,
2028                        path: Path::new("other.rs").into(),
2029                    },
2030                    cx,
2031                )
2032            }))
2033            .await
2034            .unwrap();
2035
2036        // Simulate a language server reporting errors for a file.
2037        fake_language_server
2038            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2039                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2040                version: None,
2041                diagnostics: vec![lsp::Diagnostic {
2042                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2043                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2044                    message: "message 1".to_string(),
2045                    ..Default::default()
2046                }],
2047            })
2048            .await;
2049
2050        // Wait for server to see the diagnostics update.
2051        server
2052            .condition(|store| {
2053                let worktree = store
2054                    .project(project_id)
2055                    .unwrap()
2056                    .worktrees
2057                    .get(&worktree_id.to_proto())
2058                    .unwrap();
2059
2060                !worktree
2061                    .share
2062                    .as_ref()
2063                    .unwrap()
2064                    .diagnostic_summaries
2065                    .is_empty()
2066            })
2067            .await;
2068
2069        // Join the worktree as client B.
2070        let project_b = Project::remote(
2071            project_id,
2072            client_b.clone(),
2073            client_b.user_store.clone(),
2074            lang_registry.clone(),
2075            fs.clone(),
2076            &mut cx_b.to_async(),
2077        )
2078        .await
2079        .unwrap();
2080
2081        project_b.read_with(&cx_b, |project, cx| {
2082            assert_eq!(
2083                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2084                &[(
2085                    ProjectPath {
2086                        worktree_id,
2087                        path: Arc::from(Path::new("a.rs")),
2088                    },
2089                    DiagnosticSummary {
2090                        error_count: 1,
2091                        warning_count: 0,
2092                        ..Default::default()
2093                    },
2094                )]
2095            )
2096        });
2097
2098        // Simulate a language server reporting more errors for a file.
2099        fake_language_server
2100            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2101                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2102                version: None,
2103                diagnostics: vec![
2104                    lsp::Diagnostic {
2105                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2106                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2107                        message: "message 1".to_string(),
2108                        ..Default::default()
2109                    },
2110                    lsp::Diagnostic {
2111                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2112                        range: lsp::Range::new(
2113                            lsp::Position::new(0, 10),
2114                            lsp::Position::new(0, 13),
2115                        ),
2116                        message: "message 2".to_string(),
2117                        ..Default::default()
2118                    },
2119                ],
2120            })
2121            .await;
2122
2123        // Client b gets the updated summaries
2124        project_b
2125            .condition(&cx_b, |project, cx| {
2126                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2127                    == &[(
2128                        ProjectPath {
2129                            worktree_id,
2130                            path: Arc::from(Path::new("a.rs")),
2131                        },
2132                        DiagnosticSummary {
2133                            error_count: 1,
2134                            warning_count: 1,
2135                            ..Default::default()
2136                        },
2137                    )]
2138            })
2139            .await;
2140
2141        // Open the file with the errors on client B. They should be present.
2142        let buffer_b = cx_b
2143            .background()
2144            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2145            .await
2146            .unwrap();
2147
2148        buffer_b.read_with(&cx_b, |buffer, _| {
2149            assert_eq!(
2150                buffer
2151                    .snapshot()
2152                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2153                    .map(|entry| entry)
2154                    .collect::<Vec<_>>(),
2155                &[
2156                    DiagnosticEntry {
2157                        range: Point::new(0, 4)..Point::new(0, 7),
2158                        diagnostic: Diagnostic {
2159                            group_id: 0,
2160                            message: "message 1".to_string(),
2161                            severity: lsp::DiagnosticSeverity::ERROR,
2162                            is_primary: true,
2163                            ..Default::default()
2164                        }
2165                    },
2166                    DiagnosticEntry {
2167                        range: Point::new(0, 10)..Point::new(0, 13),
2168                        diagnostic: Diagnostic {
2169                            group_id: 1,
2170                            severity: lsp::DiagnosticSeverity::WARNING,
2171                            message: "message 2".to_string(),
2172                            is_primary: true,
2173                            ..Default::default()
2174                        }
2175                    }
2176                ]
2177            );
2178        });
2179    }
2180
2181    #[gpui::test(iterations = 10)]
2182    async fn test_collaborating_with_completion(
2183        mut cx_a: TestAppContext,
2184        mut cx_b: TestAppContext,
2185    ) {
2186        cx_a.foreground().forbid_parking();
2187        let mut lang_registry = Arc::new(LanguageRegistry::new());
2188        let fs = Arc::new(FakeFs::new(cx_a.background()));
2189
2190        // Set up a fake language server.
2191        let (language_server_config, mut fake_language_server) =
2192            LanguageServerConfig::fake_with_capabilities(
2193                lsp::ServerCapabilities {
2194                    completion_provider: Some(lsp::CompletionOptions {
2195                        trigger_characters: Some(vec![".".to_string()]),
2196                        ..Default::default()
2197                    }),
2198                    ..Default::default()
2199                },
2200                &cx_a,
2201            )
2202            .await;
2203        Arc::get_mut(&mut lang_registry)
2204            .unwrap()
2205            .add(Arc::new(Language::new(
2206                LanguageConfig {
2207                    name: "Rust".to_string(),
2208                    path_suffixes: vec!["rs".to_string()],
2209                    language_server: Some(language_server_config),
2210                    ..Default::default()
2211                },
2212                Some(tree_sitter_rust::language()),
2213            )));
2214
2215        // Connect to a server as 2 clients.
2216        let mut server = TestServer::start(cx_a.foreground()).await;
2217        let client_a = server.create_client(&mut cx_a, "user_a").await;
2218        let client_b = server.create_client(&mut cx_b, "user_b").await;
2219
2220        // Share a project as client A
2221        fs.insert_tree(
2222            "/a",
2223            json!({
2224                ".zed.toml": r#"collaborators = ["user_b"]"#,
2225                "main.rs": "fn main() { a }",
2226                "other.rs": "",
2227            }),
2228        )
2229        .await;
2230        let project_a = cx_a.update(|cx| {
2231            Project::local(
2232                client_a.clone(),
2233                client_a.user_store.clone(),
2234                lang_registry.clone(),
2235                fs.clone(),
2236                cx,
2237            )
2238        });
2239        let (worktree_a, _) = project_a
2240            .update(&mut cx_a, |p, cx| {
2241                p.find_or_create_local_worktree("/a", false, cx)
2242            })
2243            .await
2244            .unwrap();
2245        worktree_a
2246            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2247            .await;
2248        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2249        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2250        project_a
2251            .update(&mut cx_a, |p, cx| p.share(cx))
2252            .await
2253            .unwrap();
2254
2255        // Join the worktree as client B.
2256        let project_b = Project::remote(
2257            project_id,
2258            client_b.clone(),
2259            client_b.user_store.clone(),
2260            lang_registry.clone(),
2261            fs.clone(),
2262            &mut cx_b.to_async(),
2263        )
2264        .await
2265        .unwrap();
2266
2267        // Open a file in an editor as the guest.
2268        let buffer_b = project_b
2269            .update(&mut cx_b, |p, cx| {
2270                p.open_buffer((worktree_id, "main.rs"), cx)
2271            })
2272            .await
2273            .unwrap();
2274        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2275        let editor_b = cx_b.add_view(window_b, |cx| {
2276            Editor::for_buffer(
2277                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2278                Arc::new(|cx| EditorSettings::test(cx)),
2279                Some(project_b.clone()),
2280                cx,
2281            )
2282        });
2283
2284        // Type a completion trigger character as the guest.
2285        editor_b.update(&mut cx_b, |editor, cx| {
2286            editor.select_ranges([13..13], None, cx);
2287            editor.handle_input(&Input(".".into()), cx);
2288            cx.focus(&editor_b);
2289        });
2290
2291        // Receive a completion request as the host's language server.
2292        // Return some completions from the host's language server.
2293        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2294            assert_eq!(
2295                params.text_document_position.text_document.uri,
2296                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2297            );
2298            assert_eq!(
2299                params.text_document_position.position,
2300                lsp::Position::new(0, 14),
2301            );
2302
2303            Some(lsp::CompletionResponse::Array(vec![
2304                lsp::CompletionItem {
2305                    label: "first_method(…)".into(),
2306                    detail: Some("fn(&mut self, B) -> C".into()),
2307                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2308                        new_text: "first_method($1)".to_string(),
2309                        range: lsp::Range::new(
2310                            lsp::Position::new(0, 14),
2311                            lsp::Position::new(0, 14),
2312                        ),
2313                    })),
2314                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2315                    ..Default::default()
2316                },
2317                lsp::CompletionItem {
2318                    label: "second_method(…)".into(),
2319                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2320                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2321                        new_text: "second_method()".to_string(),
2322                        range: lsp::Range::new(
2323                            lsp::Position::new(0, 14),
2324                            lsp::Position::new(0, 14),
2325                        ),
2326                    })),
2327                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2328                    ..Default::default()
2329                },
2330            ]))
2331        });
2332
2333        // Open the buffer on the host.
2334        let buffer_a = project_a
2335            .update(&mut cx_a, |p, cx| {
2336                p.open_buffer((worktree_id, "main.rs"), cx)
2337            })
2338            .await
2339            .unwrap();
2340        buffer_a
2341            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2342            .await;
2343
2344        // Confirm a completion on the guest.
2345        editor_b.next_notification(&cx_b).await;
2346        editor_b.update(&mut cx_b, |editor, cx| {
2347            assert!(editor.context_menu_visible());
2348            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2349            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2350        });
2351
2352        // Return a resolved completion from the host's language server.
2353        // The resolved completion has an additional text edit.
2354        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2355            assert_eq!(params.label, "first_method(…)");
2356            lsp::CompletionItem {
2357                label: "first_method(…)".into(),
2358                detail: Some("fn(&mut self, B) -> C".into()),
2359                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2360                    new_text: "first_method($1)".to_string(),
2361                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2362                })),
2363                additional_text_edits: Some(vec![lsp::TextEdit {
2364                    new_text: "use d::SomeTrait;\n".to_string(),
2365                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2366                }]),
2367                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2368                ..Default::default()
2369            }
2370        });
2371
2372        buffer_a
2373            .condition(&cx_a, |buffer, _| {
2374                buffer.text() == "fn main() { a.first_method() }"
2375            })
2376            .await;
2377
2378        // The additional edit is applied.
2379        buffer_b
2380            .condition(&cx_b, |buffer, _| {
2381                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2382            })
2383            .await;
2384        assert_eq!(
2385            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2386            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2387        );
2388    }
2389
2390    #[gpui::test(iterations = 10)]
2391    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2392        cx_a.foreground().forbid_parking();
2393        let mut lang_registry = Arc::new(LanguageRegistry::new());
2394        let fs = Arc::new(FakeFs::new(cx_a.background()));
2395
2396        // Set up a fake language server.
2397        let (language_server_config, mut fake_language_server) =
2398            LanguageServerConfig::fake(&cx_a).await;
2399        Arc::get_mut(&mut lang_registry)
2400            .unwrap()
2401            .add(Arc::new(Language::new(
2402                LanguageConfig {
2403                    name: "Rust".to_string(),
2404                    path_suffixes: vec!["rs".to_string()],
2405                    language_server: Some(language_server_config),
2406                    ..Default::default()
2407                },
2408                Some(tree_sitter_rust::language()),
2409            )));
2410
2411        // Connect to a server as 2 clients.
2412        let mut server = TestServer::start(cx_a.foreground()).await;
2413        let client_a = server.create_client(&mut cx_a, "user_a").await;
2414        let client_b = server.create_client(&mut cx_b, "user_b").await;
2415
2416        // Share a project as client A
2417        fs.insert_tree(
2418            "/a",
2419            json!({
2420                ".zed.toml": r#"collaborators = ["user_b"]"#,
2421                "a.rs": "let one = two",
2422            }),
2423        )
2424        .await;
2425        let project_a = cx_a.update(|cx| {
2426            Project::local(
2427                client_a.clone(),
2428                client_a.user_store.clone(),
2429                lang_registry.clone(),
2430                fs.clone(),
2431                cx,
2432            )
2433        });
2434        let (worktree_a, _) = project_a
2435            .update(&mut cx_a, |p, cx| {
2436                p.find_or_create_local_worktree("/a", false, cx)
2437            })
2438            .await
2439            .unwrap();
2440        worktree_a
2441            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2442            .await;
2443        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2444        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2445        project_a
2446            .update(&mut cx_a, |p, cx| p.share(cx))
2447            .await
2448            .unwrap();
2449
2450        // Join the worktree as client B.
2451        let project_b = Project::remote(
2452            project_id,
2453            client_b.clone(),
2454            client_b.user_store.clone(),
2455            lang_registry.clone(),
2456            fs.clone(),
2457            &mut cx_b.to_async(),
2458        )
2459        .await
2460        .unwrap();
2461
2462        let buffer_b = cx_b
2463            .background()
2464            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2465            .await
2466            .unwrap();
2467
2468        let format = project_b.update(&mut cx_b, |project, cx| {
2469            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2470        });
2471
2472        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2473            Some(vec![
2474                lsp::TextEdit {
2475                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2476                    new_text: "h".to_string(),
2477                },
2478                lsp::TextEdit {
2479                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2480                    new_text: "y".to_string(),
2481                },
2482            ])
2483        });
2484
2485        format.await.unwrap();
2486        assert_eq!(
2487            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2488            "let honey = two"
2489        );
2490    }
2491
2492    #[gpui::test(iterations = 10)]
2493    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2494        cx_a.foreground().forbid_parking();
2495        let mut lang_registry = Arc::new(LanguageRegistry::new());
2496        let fs = Arc::new(FakeFs::new(cx_a.background()));
2497        fs.insert_tree(
2498            "/root-1",
2499            json!({
2500                ".zed.toml": r#"collaborators = ["user_b"]"#,
2501                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2502            }),
2503        )
2504        .await;
2505        fs.insert_tree(
2506            "/root-2",
2507            json!({
2508                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2509            }),
2510        )
2511        .await;
2512
2513        // Set up a fake language server.
2514        let (language_server_config, mut fake_language_server) =
2515            LanguageServerConfig::fake(&cx_a).await;
2516        Arc::get_mut(&mut lang_registry)
2517            .unwrap()
2518            .add(Arc::new(Language::new(
2519                LanguageConfig {
2520                    name: "Rust".to_string(),
2521                    path_suffixes: vec!["rs".to_string()],
2522                    language_server: Some(language_server_config),
2523                    ..Default::default()
2524                },
2525                Some(tree_sitter_rust::language()),
2526            )));
2527
2528        // Connect to a server as 2 clients.
2529        let mut server = TestServer::start(cx_a.foreground()).await;
2530        let client_a = server.create_client(&mut cx_a, "user_a").await;
2531        let client_b = server.create_client(&mut cx_b, "user_b").await;
2532
2533        // Share a project as client A
2534        let project_a = cx_a.update(|cx| {
2535            Project::local(
2536                client_a.clone(),
2537                client_a.user_store.clone(),
2538                lang_registry.clone(),
2539                fs.clone(),
2540                cx,
2541            )
2542        });
2543        let (worktree_a, _) = project_a
2544            .update(&mut cx_a, |p, cx| {
2545                p.find_or_create_local_worktree("/root-1", false, cx)
2546            })
2547            .await
2548            .unwrap();
2549        worktree_a
2550            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2551            .await;
2552        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2553        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2554        project_a
2555            .update(&mut cx_a, |p, cx| p.share(cx))
2556            .await
2557            .unwrap();
2558
2559        // Join the worktree as client B.
2560        let project_b = Project::remote(
2561            project_id,
2562            client_b.clone(),
2563            client_b.user_store.clone(),
2564            lang_registry.clone(),
2565            fs.clone(),
2566            &mut cx_b.to_async(),
2567        )
2568        .await
2569        .unwrap();
2570
2571        // Open the file on client B.
2572        let buffer_b = cx_b
2573            .background()
2574            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2575            .await
2576            .unwrap();
2577
2578        // Request the definition of a symbol as the guest.
2579        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2580        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2581            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2582                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2583                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2584            )))
2585        });
2586
2587        let definitions_1 = definitions_1.await.unwrap();
2588        cx_b.read(|cx| {
2589            assert_eq!(definitions_1.len(), 1);
2590            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2591            let target_buffer = definitions_1[0].target_buffer.read(cx);
2592            assert_eq!(
2593                target_buffer.text(),
2594                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2595            );
2596            assert_eq!(
2597                definitions_1[0].target_range.to_point(target_buffer),
2598                Point::new(0, 6)..Point::new(0, 9)
2599            );
2600        });
2601
2602        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2603        // the previous call to `definition`.
2604        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2605        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2606            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2607                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2608                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2609            )))
2610        });
2611
2612        let definitions_2 = definitions_2.await.unwrap();
2613        cx_b.read(|cx| {
2614            assert_eq!(definitions_2.len(), 1);
2615            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2616            let target_buffer = definitions_2[0].target_buffer.read(cx);
2617            assert_eq!(
2618                target_buffer.text(),
2619                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2620            );
2621            assert_eq!(
2622                definitions_2[0].target_range.to_point(target_buffer),
2623                Point::new(1, 6)..Point::new(1, 11)
2624            );
2625        });
2626        assert_eq!(
2627            definitions_1[0].target_buffer,
2628            definitions_2[0].target_buffer
2629        );
2630
2631        cx_b.update(|_| {
2632            drop(definitions_1);
2633            drop(definitions_2);
2634        });
2635        project_b
2636            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2637            .await;
2638    }
2639
2640    #[gpui::test(iterations = 10)]
2641    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2642        mut cx_a: TestAppContext,
2643        mut cx_b: TestAppContext,
2644        mut rng: StdRng,
2645    ) {
2646        cx_a.foreground().forbid_parking();
2647        let mut lang_registry = Arc::new(LanguageRegistry::new());
2648        let fs = Arc::new(FakeFs::new(cx_a.background()));
2649        fs.insert_tree(
2650            "/root",
2651            json!({
2652                ".zed.toml": r#"collaborators = ["user_b"]"#,
2653                "a.rs": "const ONE: usize = b::TWO;",
2654                "b.rs": "const TWO: usize = 2",
2655            }),
2656        )
2657        .await;
2658
2659        // Set up a fake language server.
2660        let (language_server_config, mut fake_language_server) =
2661            LanguageServerConfig::fake(&cx_a).await;
2662        Arc::get_mut(&mut lang_registry)
2663            .unwrap()
2664            .add(Arc::new(Language::new(
2665                LanguageConfig {
2666                    name: "Rust".to_string(),
2667                    path_suffixes: vec!["rs".to_string()],
2668                    language_server: Some(language_server_config),
2669                    ..Default::default()
2670                },
2671                Some(tree_sitter_rust::language()),
2672            )));
2673
2674        // Connect to a server as 2 clients.
2675        let mut server = TestServer::start(cx_a.foreground()).await;
2676        let client_a = server.create_client(&mut cx_a, "user_a").await;
2677        let client_b = server.create_client(&mut cx_b, "user_b").await;
2678
2679        // Share a project as client A
2680        let project_a = cx_a.update(|cx| {
2681            Project::local(
2682                client_a.clone(),
2683                client_a.user_store.clone(),
2684                lang_registry.clone(),
2685                fs.clone(),
2686                cx,
2687            )
2688        });
2689        let (worktree_a, _) = project_a
2690            .update(&mut cx_a, |p, cx| {
2691                p.find_or_create_local_worktree("/root", false, cx)
2692            })
2693            .await
2694            .unwrap();
2695        worktree_a
2696            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2697            .await;
2698        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2699        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2700        project_a
2701            .update(&mut cx_a, |p, cx| p.share(cx))
2702            .await
2703            .unwrap();
2704
2705        // Join the worktree as client B.
2706        let project_b = Project::remote(
2707            project_id,
2708            client_b.clone(),
2709            client_b.user_store.clone(),
2710            lang_registry.clone(),
2711            fs.clone(),
2712            &mut cx_b.to_async(),
2713        )
2714        .await
2715        .unwrap();
2716
2717        let buffer_b1 = cx_b
2718            .background()
2719            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2720            .await
2721            .unwrap();
2722
2723        let definitions;
2724        let buffer_b2;
2725        if rng.gen() {
2726            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2727            buffer_b2 =
2728                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2729        } else {
2730            buffer_b2 =
2731                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2732            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2733        }
2734
2735        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2736            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2737                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2738                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2739            )))
2740        });
2741
2742        let buffer_b2 = buffer_b2.await.unwrap();
2743        let definitions = definitions.await.unwrap();
2744        assert_eq!(definitions.len(), 1);
2745        assert_eq!(definitions[0].target_buffer, buffer_b2);
2746    }
2747
2748    #[gpui::test(iterations = 10)]
2749    async fn test_collaborating_with_code_actions(
2750        mut cx_a: TestAppContext,
2751        mut cx_b: TestAppContext,
2752    ) {
2753        cx_a.foreground().forbid_parking();
2754        let mut lang_registry = Arc::new(LanguageRegistry::new());
2755        let fs = Arc::new(FakeFs::new(cx_a.background()));
2756        let mut path_openers_b = Vec::new();
2757        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2758
2759        // Set up a fake language server.
2760        let (language_server_config, mut fake_language_server) =
2761            LanguageServerConfig::fake_with_capabilities(
2762                lsp::ServerCapabilities {
2763                    ..Default::default()
2764                },
2765                &cx_a,
2766            )
2767            .await;
2768        Arc::get_mut(&mut lang_registry)
2769            .unwrap()
2770            .add(Arc::new(Language::new(
2771                LanguageConfig {
2772                    name: "Rust".to_string(),
2773                    path_suffixes: vec!["rs".to_string()],
2774                    language_server: Some(language_server_config),
2775                    ..Default::default()
2776                },
2777                Some(tree_sitter_rust::language()),
2778            )));
2779
2780        // Connect to a server as 2 clients.
2781        let mut server = TestServer::start(cx_a.foreground()).await;
2782        let client_a = server.create_client(&mut cx_a, "user_a").await;
2783        let client_b = server.create_client(&mut cx_b, "user_b").await;
2784
2785        // Share a project as client A
2786        fs.insert_tree(
2787            "/a",
2788            json!({
2789                ".zed.toml": r#"collaborators = ["user_b"]"#,
2790                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2791                "other.rs": "pub fn foo() -> usize { 4 }",
2792            }),
2793        )
2794        .await;
2795        let project_a = cx_a.update(|cx| {
2796            Project::local(
2797                client_a.clone(),
2798                client_a.user_store.clone(),
2799                lang_registry.clone(),
2800                fs.clone(),
2801                cx,
2802            )
2803        });
2804        let (worktree_a, _) = project_a
2805            .update(&mut cx_a, |p, cx| {
2806                p.find_or_create_local_worktree("/a", false, cx)
2807            })
2808            .await
2809            .unwrap();
2810        worktree_a
2811            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2812            .await;
2813        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2814        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2815        project_a
2816            .update(&mut cx_a, |p, cx| p.share(cx))
2817            .await
2818            .unwrap();
2819
2820        // Join the worktree as client B.
2821        let project_b = Project::remote(
2822            project_id,
2823            client_b.clone(),
2824            client_b.user_store.clone(),
2825            lang_registry.clone(),
2826            fs.clone(),
2827            &mut cx_b.to_async(),
2828        )
2829        .await
2830        .unwrap();
2831        let mut params = cx_b.update(WorkspaceParams::test);
2832        params.languages = lang_registry.clone();
2833        params.client = client_b.client.clone();
2834        params.user_store = client_b.user_store.clone();
2835        params.project = project_b;
2836        params.path_openers = path_openers_b.into();
2837
2838        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2839        let editor_b = workspace_b
2840            .update(&mut cx_b, |workspace, cx| {
2841                workspace.open_path((worktree_id, "main.rs").into(), cx)
2842            })
2843            .await
2844            .unwrap()
2845            .downcast::<Editor>()
2846            .unwrap();
2847        fake_language_server
2848            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2849                assert_eq!(
2850                    params.text_document.uri,
2851                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2852                );
2853                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2854                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2855                None
2856            })
2857            .next()
2858            .await;
2859
2860        // Move cursor to a location that contains code actions.
2861        editor_b.update(&mut cx_b, |editor, cx| {
2862            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2863            cx.focus(&editor_b);
2864        });
2865        fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2866            assert_eq!(
2867                params.text_document.uri,
2868                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2869            );
2870            assert_eq!(params.range.start, lsp::Position::new(1, 31));
2871            assert_eq!(params.range.end, lsp::Position::new(1, 31));
2872
2873            Some(vec![lsp::CodeActionOrCommand::CodeAction(
2874                lsp::CodeAction {
2875                    title: "Inline into all callers".to_string(),
2876                    edit: Some(lsp::WorkspaceEdit {
2877                        changes: Some(
2878                            [
2879                                (
2880                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2881                                    vec![lsp::TextEdit::new(
2882                                        lsp::Range::new(
2883                                            lsp::Position::new(1, 22),
2884                                            lsp::Position::new(1, 34),
2885                                        ),
2886                                        "4".to_string(),
2887                                    )],
2888                                ),
2889                                (
2890                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
2891                                    vec![lsp::TextEdit::new(
2892                                        lsp::Range::new(
2893                                            lsp::Position::new(0, 0),
2894                                            lsp::Position::new(0, 27),
2895                                        ),
2896                                        "".to_string(),
2897                                    )],
2898                                ),
2899                            ]
2900                            .into_iter()
2901                            .collect(),
2902                        ),
2903                        ..Default::default()
2904                    }),
2905                    data: Some(json!({
2906                        "codeActionParams": {
2907                            "range": {
2908                                "start": {"line": 1, "column": 31},
2909                                "end": {"line": 1, "column": 31},
2910                            }
2911                        }
2912                    })),
2913                    ..Default::default()
2914                },
2915            )])
2916        });
2917
2918        // Toggle code actions and wait for them to display.
2919        editor_b.update(&mut cx_b, |editor, cx| {
2920            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2921        });
2922        editor_b
2923            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2924            .await;
2925
2926        // Confirming the code action will trigger a resolve request.
2927        let confirm_action = workspace_b
2928            .update(&mut cx_b, |workspace, cx| {
2929                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2930            })
2931            .unwrap();
2932        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2933            lsp::CodeAction {
2934                title: "Inline into all callers".to_string(),
2935                edit: Some(lsp::WorkspaceEdit {
2936                    changes: Some(
2937                        [
2938                            (
2939                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2940                                vec![lsp::TextEdit::new(
2941                                    lsp::Range::new(
2942                                        lsp::Position::new(1, 22),
2943                                        lsp::Position::new(1, 34),
2944                                    ),
2945                                    "4".to_string(),
2946                                )],
2947                            ),
2948                            (
2949                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2950                                vec![lsp::TextEdit::new(
2951                                    lsp::Range::new(
2952                                        lsp::Position::new(0, 0),
2953                                        lsp::Position::new(0, 27),
2954                                    ),
2955                                    "".to_string(),
2956                                )],
2957                            ),
2958                        ]
2959                        .into_iter()
2960                        .collect(),
2961                    ),
2962                    ..Default::default()
2963                }),
2964                ..Default::default()
2965            }
2966        });
2967
2968        // After the action is confirmed, an editor containing both modified files is opened.
2969        confirm_action.await.unwrap();
2970        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
2971            workspace
2972                .active_item(cx)
2973                .unwrap()
2974                .downcast::<Editor>()
2975                .unwrap()
2976        });
2977        code_action_editor.update(&mut cx_b, |editor, cx| {
2978            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2979            editor.undo(&Undo, cx);
2980            assert_eq!(
2981                editor.text(cx),
2982                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
2983            );
2984            editor.redo(&Redo, cx);
2985            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2986        });
2987    }
2988
2989    #[gpui::test(iterations = 10)]
2990    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2991        cx_a.foreground().forbid_parking();
2992
2993        // Connect to a server as 2 clients.
2994        let mut server = TestServer::start(cx_a.foreground()).await;
2995        let client_a = server.create_client(&mut cx_a, "user_a").await;
2996        let client_b = server.create_client(&mut cx_b, "user_b").await;
2997
2998        // Create an org that includes these 2 users.
2999        let db = &server.app_state.db;
3000        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3001        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3002            .await
3003            .unwrap();
3004        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3005            .await
3006            .unwrap();
3007
3008        // Create a channel that includes all the users.
3009        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3010        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3011            .await
3012            .unwrap();
3013        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3014            .await
3015            .unwrap();
3016        db.create_channel_message(
3017            channel_id,
3018            client_b.current_user_id(&cx_b),
3019            "hello A, it's B.",
3020            OffsetDateTime::now_utc(),
3021            1,
3022        )
3023        .await
3024        .unwrap();
3025
3026        let channels_a = cx_a
3027            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3028        channels_a
3029            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3030            .await;
3031        channels_a.read_with(&cx_a, |list, _| {
3032            assert_eq!(
3033                list.available_channels().unwrap(),
3034                &[ChannelDetails {
3035                    id: channel_id.to_proto(),
3036                    name: "test-channel".to_string()
3037                }]
3038            )
3039        });
3040        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3041            this.get_channel(channel_id.to_proto(), cx).unwrap()
3042        });
3043        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3044        channel_a
3045            .condition(&cx_a, |channel, _| {
3046                channel_messages(channel)
3047                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3048            })
3049            .await;
3050
3051        let channels_b = cx_b
3052            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3053        channels_b
3054            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3055            .await;
3056        channels_b.read_with(&cx_b, |list, _| {
3057            assert_eq!(
3058                list.available_channels().unwrap(),
3059                &[ChannelDetails {
3060                    id: channel_id.to_proto(),
3061                    name: "test-channel".to_string()
3062                }]
3063            )
3064        });
3065
3066        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3067            this.get_channel(channel_id.to_proto(), cx).unwrap()
3068        });
3069        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3070        channel_b
3071            .condition(&cx_b, |channel, _| {
3072                channel_messages(channel)
3073                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3074            })
3075            .await;
3076
3077        channel_a
3078            .update(&mut cx_a, |channel, cx| {
3079                channel
3080                    .send_message("oh, hi B.".to_string(), cx)
3081                    .unwrap()
3082                    .detach();
3083                let task = channel.send_message("sup".to_string(), cx).unwrap();
3084                assert_eq!(
3085                    channel_messages(channel),
3086                    &[
3087                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3088                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3089                        ("user_a".to_string(), "sup".to_string(), true)
3090                    ]
3091                );
3092                task
3093            })
3094            .await
3095            .unwrap();
3096
3097        channel_b
3098            .condition(&cx_b, |channel, _| {
3099                channel_messages(channel)
3100                    == [
3101                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3102                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3103                        ("user_a".to_string(), "sup".to_string(), false),
3104                    ]
3105            })
3106            .await;
3107
3108        assert_eq!(
3109            server
3110                .state()
3111                .await
3112                .channel(channel_id)
3113                .unwrap()
3114                .connection_ids
3115                .len(),
3116            2
3117        );
3118        cx_b.update(|_| drop(channel_b));
3119        server
3120            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3121            .await;
3122
3123        cx_a.update(|_| drop(channel_a));
3124        server
3125            .condition(|state| state.channel(channel_id).is_none())
3126            .await;
3127    }
3128
3129    #[gpui::test(iterations = 10)]
3130    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3131        cx_a.foreground().forbid_parking();
3132
3133        let mut server = TestServer::start(cx_a.foreground()).await;
3134        let client_a = server.create_client(&mut cx_a, "user_a").await;
3135
3136        let db = &server.app_state.db;
3137        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3138        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3139        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3140            .await
3141            .unwrap();
3142        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3143            .await
3144            .unwrap();
3145
3146        let channels_a = cx_a
3147            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3148        channels_a
3149            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3150            .await;
3151        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3152            this.get_channel(channel_id.to_proto(), cx).unwrap()
3153        });
3154
3155        // Messages aren't allowed to be too long.
3156        channel_a
3157            .update(&mut cx_a, |channel, cx| {
3158                let long_body = "this is long.\n".repeat(1024);
3159                channel.send_message(long_body, cx).unwrap()
3160            })
3161            .await
3162            .unwrap_err();
3163
3164        // Messages aren't allowed to be blank.
3165        channel_a.update(&mut cx_a, |channel, cx| {
3166            channel.send_message(String::new(), cx).unwrap_err()
3167        });
3168
3169        // Leading and trailing whitespace are trimmed.
3170        channel_a
3171            .update(&mut cx_a, |channel, cx| {
3172                channel
3173                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3174                    .unwrap()
3175            })
3176            .await
3177            .unwrap();
3178        assert_eq!(
3179            db.get_channel_messages(channel_id, 10, None)
3180                .await
3181                .unwrap()
3182                .iter()
3183                .map(|m| &m.body)
3184                .collect::<Vec<_>>(),
3185            &["surrounded by whitespace"]
3186        );
3187    }
3188
3189    #[gpui::test(iterations = 10)]
3190    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3191        cx_a.foreground().forbid_parking();
3192
3193        // Connect to a server as 2 clients.
3194        let mut server = TestServer::start(cx_a.foreground()).await;
3195        let client_a = server.create_client(&mut cx_a, "user_a").await;
3196        let client_b = server.create_client(&mut cx_b, "user_b").await;
3197        let mut status_b = client_b.status();
3198
3199        // Create an org that includes these 2 users.
3200        let db = &server.app_state.db;
3201        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3202        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3203            .await
3204            .unwrap();
3205        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3206            .await
3207            .unwrap();
3208
3209        // Create a channel that includes all the users.
3210        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3211        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3212            .await
3213            .unwrap();
3214        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3215            .await
3216            .unwrap();
3217        db.create_channel_message(
3218            channel_id,
3219            client_b.current_user_id(&cx_b),
3220            "hello A, it's B.",
3221            OffsetDateTime::now_utc(),
3222            2,
3223        )
3224        .await
3225        .unwrap();
3226
3227        let channels_a = cx_a
3228            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3229        channels_a
3230            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3231            .await;
3232
3233        channels_a.read_with(&cx_a, |list, _| {
3234            assert_eq!(
3235                list.available_channels().unwrap(),
3236                &[ChannelDetails {
3237                    id: channel_id.to_proto(),
3238                    name: "test-channel".to_string()
3239                }]
3240            )
3241        });
3242        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3243            this.get_channel(channel_id.to_proto(), cx).unwrap()
3244        });
3245        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3246        channel_a
3247            .condition(&cx_a, |channel, _| {
3248                channel_messages(channel)
3249                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3250            })
3251            .await;
3252
3253        let channels_b = cx_b
3254            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3255        channels_b
3256            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3257            .await;
3258        channels_b.read_with(&cx_b, |list, _| {
3259            assert_eq!(
3260                list.available_channels().unwrap(),
3261                &[ChannelDetails {
3262                    id: channel_id.to_proto(),
3263                    name: "test-channel".to_string()
3264                }]
3265            )
3266        });
3267
3268        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3269            this.get_channel(channel_id.to_proto(), cx).unwrap()
3270        });
3271        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3272        channel_b
3273            .condition(&cx_b, |channel, _| {
3274                channel_messages(channel)
3275                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3276            })
3277            .await;
3278
3279        // Disconnect client B, ensuring we can still access its cached channel data.
3280        server.forbid_connections();
3281        server.disconnect_client(client_b.current_user_id(&cx_b));
3282        while !matches!(
3283            status_b.next().await,
3284            Some(client::Status::ReconnectionError { .. })
3285        ) {}
3286
3287        channels_b.read_with(&cx_b, |channels, _| {
3288            assert_eq!(
3289                channels.available_channels().unwrap(),
3290                [ChannelDetails {
3291                    id: channel_id.to_proto(),
3292                    name: "test-channel".to_string()
3293                }]
3294            )
3295        });
3296        channel_b.read_with(&cx_b, |channel, _| {
3297            assert_eq!(
3298                channel_messages(channel),
3299                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3300            )
3301        });
3302
3303        // Send a message from client B while it is disconnected.
3304        channel_b
3305            .update(&mut cx_b, |channel, cx| {
3306                let task = channel
3307                    .send_message("can you see this?".to_string(), cx)
3308                    .unwrap();
3309                assert_eq!(
3310                    channel_messages(channel),
3311                    &[
3312                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3313                        ("user_b".to_string(), "can you see this?".to_string(), true)
3314                    ]
3315                );
3316                task
3317            })
3318            .await
3319            .unwrap_err();
3320
3321        // Send a message from client A while B is disconnected.
3322        channel_a
3323            .update(&mut cx_a, |channel, cx| {
3324                channel
3325                    .send_message("oh, hi B.".to_string(), cx)
3326                    .unwrap()
3327                    .detach();
3328                let task = channel.send_message("sup".to_string(), cx).unwrap();
3329                assert_eq!(
3330                    channel_messages(channel),
3331                    &[
3332                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3333                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3334                        ("user_a".to_string(), "sup".to_string(), true)
3335                    ]
3336                );
3337                task
3338            })
3339            .await
3340            .unwrap();
3341
3342        // Give client B a chance to reconnect.
3343        server.allow_connections();
3344        cx_b.foreground().advance_clock(Duration::from_secs(10));
3345
3346        // Verify that B sees the new messages upon reconnection, as well as the message client B
3347        // sent while offline.
3348        channel_b
3349            .condition(&cx_b, |channel, _| {
3350                channel_messages(channel)
3351                    == [
3352                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3353                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3354                        ("user_a".to_string(), "sup".to_string(), false),
3355                        ("user_b".to_string(), "can you see this?".to_string(), false),
3356                    ]
3357            })
3358            .await;
3359
3360        // Ensure client A and B can communicate normally after reconnection.
3361        channel_a
3362            .update(&mut cx_a, |channel, cx| {
3363                channel.send_message("you online?".to_string(), cx).unwrap()
3364            })
3365            .await
3366            .unwrap();
3367        channel_b
3368            .condition(&cx_b, |channel, _| {
3369                channel_messages(channel)
3370                    == [
3371                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3372                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3373                        ("user_a".to_string(), "sup".to_string(), false),
3374                        ("user_b".to_string(), "can you see this?".to_string(), false),
3375                        ("user_a".to_string(), "you online?".to_string(), false),
3376                    ]
3377            })
3378            .await;
3379
3380        channel_b
3381            .update(&mut cx_b, |channel, cx| {
3382                channel.send_message("yep".to_string(), cx).unwrap()
3383            })
3384            .await
3385            .unwrap();
3386        channel_a
3387            .condition(&cx_a, |channel, _| {
3388                channel_messages(channel)
3389                    == [
3390                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3391                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3392                        ("user_a".to_string(), "sup".to_string(), false),
3393                        ("user_b".to_string(), "can you see this?".to_string(), false),
3394                        ("user_a".to_string(), "you online?".to_string(), false),
3395                        ("user_b".to_string(), "yep".to_string(), false),
3396                    ]
3397            })
3398            .await;
3399    }
3400
3401    #[gpui::test(iterations = 10)]
3402    async fn test_contacts(
3403        mut cx_a: TestAppContext,
3404        mut cx_b: TestAppContext,
3405        mut cx_c: TestAppContext,
3406    ) {
3407        cx_a.foreground().forbid_parking();
3408        let lang_registry = Arc::new(LanguageRegistry::new());
3409        let fs = Arc::new(FakeFs::new(cx_a.background()));
3410
3411        // Connect to a server as 3 clients.
3412        let mut server = TestServer::start(cx_a.foreground()).await;
3413        let client_a = server.create_client(&mut cx_a, "user_a").await;
3414        let client_b = server.create_client(&mut cx_b, "user_b").await;
3415        let client_c = server.create_client(&mut cx_c, "user_c").await;
3416
3417        // Share a worktree as client A.
3418        fs.insert_tree(
3419            "/a",
3420            json!({
3421                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3422            }),
3423        )
3424        .await;
3425
3426        let project_a = cx_a.update(|cx| {
3427            Project::local(
3428                client_a.clone(),
3429                client_a.user_store.clone(),
3430                lang_registry.clone(),
3431                fs.clone(),
3432                cx,
3433            )
3434        });
3435        let (worktree_a, _) = project_a
3436            .update(&mut cx_a, |p, cx| {
3437                p.find_or_create_local_worktree("/a", false, cx)
3438            })
3439            .await
3440            .unwrap();
3441        worktree_a
3442            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3443            .await;
3444
3445        client_a
3446            .user_store
3447            .condition(&cx_a, |user_store, _| {
3448                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3449            })
3450            .await;
3451        client_b
3452            .user_store
3453            .condition(&cx_b, |user_store, _| {
3454                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3455            })
3456            .await;
3457        client_c
3458            .user_store
3459            .condition(&cx_c, |user_store, _| {
3460                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3461            })
3462            .await;
3463
3464        let project_id = project_a
3465            .update(&mut cx_a, |project, _| project.next_remote_id())
3466            .await;
3467        project_a
3468            .update(&mut cx_a, |project, cx| project.share(cx))
3469            .await
3470            .unwrap();
3471
3472        let _project_b = Project::remote(
3473            project_id,
3474            client_b.clone(),
3475            client_b.user_store.clone(),
3476            lang_registry.clone(),
3477            fs.clone(),
3478            &mut cx_b.to_async(),
3479        )
3480        .await
3481        .unwrap();
3482
3483        client_a
3484            .user_store
3485            .condition(&cx_a, |user_store, _| {
3486                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3487            })
3488            .await;
3489        client_b
3490            .user_store
3491            .condition(&cx_b, |user_store, _| {
3492                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3493            })
3494            .await;
3495        client_c
3496            .user_store
3497            .condition(&cx_c, |user_store, _| {
3498                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3499            })
3500            .await;
3501
3502        project_a
3503            .condition(&cx_a, |project, _| {
3504                project.collaborators().contains_key(&client_b.peer_id)
3505            })
3506            .await;
3507
3508        cx_a.update(move |_| drop(project_a));
3509        client_a
3510            .user_store
3511            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3512            .await;
3513        client_b
3514            .user_store
3515            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3516            .await;
3517        client_c
3518            .user_store
3519            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3520            .await;
3521
3522        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3523            user_store
3524                .contacts()
3525                .iter()
3526                .map(|contact| {
3527                    let worktrees = contact
3528                        .projects
3529                        .iter()
3530                        .map(|p| {
3531                            (
3532                                p.worktree_root_names[0].as_str(),
3533                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3534                            )
3535                        })
3536                        .collect();
3537                    (contact.user.github_login.as_str(), worktrees)
3538                })
3539                .collect()
3540        }
3541    }
3542
3543    struct TestServer {
3544        peer: Arc<Peer>,
3545        app_state: Arc<AppState>,
3546        server: Arc<Server>,
3547        foreground: Rc<executor::Foreground>,
3548        notifications: mpsc::Receiver<()>,
3549        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3550        forbid_connections: Arc<AtomicBool>,
3551        _test_db: TestDb,
3552    }
3553
3554    impl TestServer {
3555        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3556            let test_db = TestDb::new();
3557            let app_state = Self::build_app_state(&test_db).await;
3558            let peer = Peer::new();
3559            let notifications = mpsc::channel(128);
3560            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3561            Self {
3562                peer,
3563                app_state,
3564                server,
3565                foreground,
3566                notifications: notifications.1,
3567                connection_killers: Default::default(),
3568                forbid_connections: Default::default(),
3569                _test_db: test_db,
3570            }
3571        }
3572
3573        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3574            let http = FakeHttpClient::with_404_response();
3575            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3576            let client_name = name.to_string();
3577            let mut client = Client::new(http.clone());
3578            let server = self.server.clone();
3579            let connection_killers = self.connection_killers.clone();
3580            let forbid_connections = self.forbid_connections.clone();
3581            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3582
3583            Arc::get_mut(&mut client)
3584                .unwrap()
3585                .override_authenticate(move |cx| {
3586                    cx.spawn(|_| async move {
3587                        let access_token = "the-token".to_string();
3588                        Ok(Credentials {
3589                            user_id: user_id.0 as u64,
3590                            access_token,
3591                        })
3592                    })
3593                })
3594                .override_establish_connection(move |credentials, cx| {
3595                    assert_eq!(credentials.user_id, user_id.0 as u64);
3596                    assert_eq!(credentials.access_token, "the-token");
3597
3598                    let server = server.clone();
3599                    let connection_killers = connection_killers.clone();
3600                    let forbid_connections = forbid_connections.clone();
3601                    let client_name = client_name.clone();
3602                    let connection_id_tx = connection_id_tx.clone();
3603                    cx.spawn(move |cx| async move {
3604                        if forbid_connections.load(SeqCst) {
3605                            Err(EstablishConnectionError::other(anyhow!(
3606                                "server is forbidding connections"
3607                            )))
3608                        } else {
3609                            let (client_conn, server_conn, kill_conn) =
3610                                Connection::in_memory(cx.background());
3611                            connection_killers.lock().insert(user_id, kill_conn);
3612                            cx.background()
3613                                .spawn(server.handle_connection(
3614                                    server_conn,
3615                                    client_name,
3616                                    user_id,
3617                                    Some(connection_id_tx),
3618                                    cx.background(),
3619                                ))
3620                                .detach();
3621                            Ok(client_conn)
3622                        }
3623                    })
3624                });
3625
3626            client
3627                .authenticate_and_connect(&cx.to_async())
3628                .await
3629                .unwrap();
3630
3631            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3632            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3633            let mut authed_user =
3634                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3635            while authed_user.next().await.unwrap().is_none() {}
3636
3637            TestClient {
3638                client,
3639                peer_id,
3640                user_store,
3641            }
3642        }
3643
3644        fn disconnect_client(&self, user_id: UserId) {
3645            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3646                let _ = kill_conn.try_send(Some(()));
3647            }
3648        }
3649
3650        fn forbid_connections(&self) {
3651            self.forbid_connections.store(true, SeqCst);
3652        }
3653
3654        fn allow_connections(&self) {
3655            self.forbid_connections.store(false, SeqCst);
3656        }
3657
3658        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3659            let mut config = Config::default();
3660            config.session_secret = "a".repeat(32);
3661            config.database_url = test_db.url.clone();
3662            let github_client = github::AppClient::test();
3663            Arc::new(AppState {
3664                db: test_db.db().clone(),
3665                handlebars: Default::default(),
3666                auth_client: auth::build_client("", ""),
3667                repo_client: github::RepoClient::test(&github_client),
3668                github_client,
3669                config,
3670            })
3671        }
3672
3673        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3674            self.server.store.read()
3675        }
3676
3677        async fn condition<F>(&mut self, mut predicate: F)
3678        where
3679            F: FnMut(&Store) -> bool,
3680        {
3681            async_std::future::timeout(Duration::from_millis(500), async {
3682                while !(predicate)(&*self.server.store.read()) {
3683                    self.foreground.start_waiting();
3684                    self.notifications.next().await;
3685                    self.foreground.finish_waiting();
3686                }
3687            })
3688            .await
3689            .expect("condition timed out");
3690        }
3691    }
3692
3693    impl Drop for TestServer {
3694        fn drop(&mut self) {
3695            self.peer.reset();
3696        }
3697    }
3698
3699    struct TestClient {
3700        client: Arc<Client>,
3701        pub peer_id: PeerId,
3702        pub user_store: ModelHandle<UserStore>,
3703    }
3704
3705    impl Deref for TestClient {
3706        type Target = Arc<Client>;
3707
3708        fn deref(&self) -> &Self::Target {
3709            &self.client
3710        }
3711    }
3712
3713    impl TestClient {
3714        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3715            UserId::from_proto(
3716                self.user_store
3717                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3718            )
3719        }
3720    }
3721
3722    impl Executor for Arc<gpui::executor::Background> {
3723        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
3724            self.spawn(future).detach();
3725        }
3726    }
3727
3728    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3729        channel
3730            .messages()
3731            .cursor::<()>()
3732            .map(|m| {
3733                (
3734                    m.sender.github_login.clone(),
3735                    m.body.clone(),
3736                    m.is_pending(),
3737                )
3738            })
3739            .collect()
3740    }
3741
3742    struct EmptyView;
3743
3744    impl gpui::Entity for EmptyView {
3745        type Event = ();
3746    }
3747
3748    impl gpui::View for EmptyView {
3749        fn ui_name() -> &'static str {
3750            "empty view"
3751        }
3752
3753        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3754            gpui::Element::boxed(gpui::elements::Empty)
3755        }
3756    }
3757}