rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::share_worktree)
  77            .add_request_handler(Server::update_worktree)
  78            .add_message_handler(Server::update_diagnostic_summary)
  79            .add_message_handler(Server::disk_based_diagnostics_updating)
  80            .add_message_handler(Server::disk_based_diagnostics_updated)
  81            .add_request_handler(Server::get_definition)
  82            .add_request_handler(Server::get_project_symbols)
  83            .add_request_handler(Server::open_buffer)
  84            .add_message_handler(Server::close_buffer)
  85            .add_request_handler(Server::update_buffer)
  86            .add_message_handler(Server::update_buffer_file)
  87            .add_message_handler(Server::buffer_reloaded)
  88            .add_message_handler(Server::buffer_saved)
  89            .add_request_handler(Server::save_buffer)
  90            .add_request_handler(Server::format_buffers)
  91            .add_request_handler(Server::get_completions)
  92            .add_request_handler(Server::apply_additional_edits_for_completion)
  93            .add_request_handler(Server::get_code_actions)
  94            .add_request_handler(Server::apply_code_action)
  95            .add_request_handler(Server::prepare_rename)
  96            .add_request_handler(Server::perform_rename)
  97            .add_request_handler(Server::get_channels)
  98            .add_request_handler(Server::get_users)
  99            .add_request_handler(Server::join_channel)
 100            .add_message_handler(Server::leave_channel)
 101            .add_request_handler(Server::send_channel_message)
 102            .add_request_handler(Server::get_channel_messages);
 103
 104        Arc::new(server)
 105    }
 106
 107    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 108    where
 109        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 110        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 111        M: EnvelopedMessage,
 112    {
 113        let prev_handler = self.handlers.insert(
 114            TypeId::of::<M>(),
 115            Box::new(move |server, envelope| {
 116                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 117                (handler)(server, *envelope).boxed()
 118            }),
 119        );
 120        if prev_handler.is_some() {
 121            panic!("registered a handler for the same message twice");
 122        }
 123        self
 124    }
 125
 126    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 127    where
 128        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 129        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 130        M: RequestMessage,
 131    {
 132        self.add_message_handler(move |server, envelope| {
 133            let receipt = envelope.receipt();
 134            let response = (handler)(server.clone(), envelope);
 135            async move {
 136                match response.await {
 137                    Ok(response) => {
 138                        server.peer.respond(receipt, response)?;
 139                        Ok(())
 140                    }
 141                    Err(error) => {
 142                        server.peer.respond_with_error(
 143                            receipt,
 144                            proto::Error {
 145                                message: error.to_string(),
 146                            },
 147                        )?;
 148                        Err(error)
 149                    }
 150                }
 151            }
 152        })
 153    }
 154
 155    pub fn handle_connection<E: Executor>(
 156        self: &Arc<Self>,
 157        connection: Connection,
 158        addr: String,
 159        user_id: UserId,
 160        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 161        executor: E,
 162    ) -> impl Future<Output = ()> {
 163        let mut this = self.clone();
 164        async move {
 165            let (connection_id, handle_io, mut incoming_rx) =
 166                this.peer.add_connection(connection).await;
 167
 168            if let Some(send_connection_id) = send_connection_id.as_mut() {
 169                let _ = send_connection_id.send(connection_id).await;
 170            }
 171
 172            this.state_mut().add_connection(connection_id, user_id);
 173            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 174                log::error!("error updating contacts for {:?}: {}", user_id, err);
 175            }
 176
 177            let handle_io = handle_io.fuse();
 178            futures::pin_mut!(handle_io);
 179            loop {
 180                let next_message = incoming_rx.next().fuse();
 181                futures::pin_mut!(next_message);
 182                futures::select_biased! {
 183                    result = handle_io => {
 184                        if let Err(err) = result {
 185                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 186                        }
 187                        break;
 188                    }
 189                    message = next_message => {
 190                        if let Some(message) = message {
 191                            let start_time = Instant::now();
 192                            let type_name = message.payload_type_name();
 193                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 194                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 195                                let notifications = this.notifications.clone();
 196                                let is_background = message.is_background();
 197                                let handle_message = (handler)(this.clone(), message);
 198                                let handle_message = async move {
 199                                    if let Err(err) = handle_message.await {
 200                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 201                                    } else {
 202                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 203                                    }
 204                                    if let Some(mut notifications) = notifications {
 205                                        let _ = notifications.send(()).await;
 206                                    }
 207                                };
 208                                if is_background {
 209                                    executor.spawn_detached(handle_message);
 210                                } else {
 211                                    handle_message.await;
 212                                }
 213                            } else {
 214                                log::warn!("unhandled message: {}", type_name);
 215                            }
 216                        } else {
 217                            log::info!("rpc connection closed {:?}", addr);
 218                            break;
 219                        }
 220                    }
 221                }
 222            }
 223
 224            if let Err(err) = this.sign_out(connection_id).await {
 225                log::error!("error signing out connection {:?} - {:?}", addr, err);
 226            }
 227        }
 228    }
 229
 230    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 231        self.peer.disconnect(connection_id);
 232        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 233
 234        for (project_id, project) in removed_connection.hosted_projects {
 235            if let Some(share) = project.share {
 236                broadcast(
 237                    connection_id,
 238                    share.guests.keys().copied().collect(),
 239                    |conn_id| {
 240                        self.peer
 241                            .send(conn_id, proto::UnshareProject { project_id })
 242                    },
 243                )?;
 244            }
 245        }
 246
 247        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 248            broadcast(connection_id, peer_ids, |conn_id| {
 249                self.peer.send(
 250                    conn_id,
 251                    proto::RemoveProjectCollaborator {
 252                        project_id,
 253                        peer_id: connection_id.0,
 254                    },
 255                )
 256            })?;
 257        }
 258
 259        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 260        Ok(())
 261    }
 262
 263    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 264        Ok(proto::Ack {})
 265    }
 266
 267    async fn register_project(
 268        mut self: Arc<Server>,
 269        request: TypedEnvelope<proto::RegisterProject>,
 270    ) -> tide::Result<proto::RegisterProjectResponse> {
 271        let project_id = {
 272            let mut state = self.state_mut();
 273            let user_id = state.user_id_for_connection(request.sender_id)?;
 274            state.register_project(request.sender_id, user_id)
 275        };
 276        Ok(proto::RegisterProjectResponse { project_id })
 277    }
 278
 279    async fn unregister_project(
 280        mut self: Arc<Server>,
 281        request: TypedEnvelope<proto::UnregisterProject>,
 282    ) -> tide::Result<()> {
 283        let project = self
 284            .state_mut()
 285            .unregister_project(request.payload.project_id, request.sender_id)?;
 286        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 287        Ok(())
 288    }
 289
 290    async fn share_project(
 291        mut self: Arc<Server>,
 292        request: TypedEnvelope<proto::ShareProject>,
 293    ) -> tide::Result<proto::Ack> {
 294        self.state_mut()
 295            .share_project(request.payload.project_id, request.sender_id);
 296        Ok(proto::Ack {})
 297    }
 298
 299    async fn unshare_project(
 300        mut self: Arc<Server>,
 301        request: TypedEnvelope<proto::UnshareProject>,
 302    ) -> tide::Result<()> {
 303        let project_id = request.payload.project_id;
 304        let project = self
 305            .state_mut()
 306            .unshare_project(project_id, request.sender_id)?;
 307
 308        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 309            self.peer
 310                .send(conn_id, proto::UnshareProject { project_id })
 311        })?;
 312        self.update_contacts_for_users(&project.authorized_user_ids)?;
 313        Ok(())
 314    }
 315
 316    async fn join_project(
 317        mut self: Arc<Server>,
 318        request: TypedEnvelope<proto::JoinProject>,
 319    ) -> tide::Result<proto::JoinProjectResponse> {
 320        let project_id = request.payload.project_id;
 321
 322        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 323        let (response, connection_ids, contact_user_ids) = self
 324            .state_mut()
 325            .join_project(request.sender_id, user_id, project_id)
 326            .and_then(|joined| {
 327                let share = joined.project.share()?;
 328                let peer_count = share.guests.len();
 329                let mut collaborators = Vec::with_capacity(peer_count);
 330                collaborators.push(proto::Collaborator {
 331                    peer_id: joined.project.host_connection_id.0,
 332                    replica_id: 0,
 333                    user_id: joined.project.host_user_id.to_proto(),
 334                });
 335                let worktrees = joined
 336                    .project
 337                    .worktrees
 338                    .iter()
 339                    .filter_map(|(id, worktree)| {
 340                        worktree.share.as_ref().map(|share| proto::Worktree {
 341                            id: *id,
 342                            root_name: worktree.root_name.clone(),
 343                            entries: share.entries.values().cloned().collect(),
 344                            diagnostic_summaries: share
 345                                .diagnostic_summaries
 346                                .values()
 347                                .cloned()
 348                                .collect(),
 349                            weak: worktree.weak,
 350                            next_update_id: share.next_update_id as u64,
 351                        })
 352                    })
 353                    .collect();
 354                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 355                    if *peer_conn_id != request.sender_id {
 356                        collaborators.push(proto::Collaborator {
 357                            peer_id: peer_conn_id.0,
 358                            replica_id: *peer_replica_id as u32,
 359                            user_id: peer_user_id.to_proto(),
 360                        });
 361                    }
 362                }
 363                let response = proto::JoinProjectResponse {
 364                    worktrees,
 365                    replica_id: joined.replica_id as u32,
 366                    collaborators,
 367                };
 368                let connection_ids = joined.project.connection_ids();
 369                let contact_user_ids = joined.project.authorized_user_ids();
 370                Ok((response, connection_ids, contact_user_ids))
 371            })?;
 372
 373        broadcast(request.sender_id, connection_ids, |conn_id| {
 374            self.peer.send(
 375                conn_id,
 376                proto::AddProjectCollaborator {
 377                    project_id,
 378                    collaborator: Some(proto::Collaborator {
 379                        peer_id: request.sender_id.0,
 380                        replica_id: response.replica_id,
 381                        user_id: user_id.to_proto(),
 382                    }),
 383                },
 384            )
 385        })?;
 386        self.update_contacts_for_users(&contact_user_ids)?;
 387        Ok(response)
 388    }
 389
 390    async fn leave_project(
 391        mut self: Arc<Server>,
 392        request: TypedEnvelope<proto::LeaveProject>,
 393    ) -> tide::Result<()> {
 394        let sender_id = request.sender_id;
 395        let project_id = request.payload.project_id;
 396        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 397
 398        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 399            self.peer.send(
 400                conn_id,
 401                proto::RemoveProjectCollaborator {
 402                    project_id,
 403                    peer_id: sender_id.0,
 404                },
 405            )
 406        })?;
 407        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 408
 409        Ok(())
 410    }
 411
 412    async fn register_worktree(
 413        mut self: Arc<Server>,
 414        request: TypedEnvelope<proto::RegisterWorktree>,
 415    ) -> tide::Result<proto::Ack> {
 416        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 417
 418        let mut contact_user_ids = HashSet::default();
 419        contact_user_ids.insert(host_user_id);
 420        for github_login in request.payload.authorized_logins {
 421            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 422            contact_user_ids.insert(contact_user_id);
 423        }
 424
 425        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 426        self.state_mut().register_worktree(
 427            request.payload.project_id,
 428            request.payload.worktree_id,
 429            request.sender_id,
 430            Worktree {
 431                authorized_user_ids: contact_user_ids.clone(),
 432                root_name: request.payload.root_name,
 433                share: None,
 434                weak: false,
 435            },
 436        )?;
 437        self.update_contacts_for_users(&contact_user_ids)?;
 438        Ok(proto::Ack {})
 439    }
 440
 441    async fn unregister_worktree(
 442        mut self: Arc<Server>,
 443        request: TypedEnvelope<proto::UnregisterWorktree>,
 444    ) -> tide::Result<()> {
 445        let project_id = request.payload.project_id;
 446        let worktree_id = request.payload.worktree_id;
 447        let (worktree, guest_connection_ids) =
 448            self.state_mut()
 449                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 450        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 451            self.peer.send(
 452                conn_id,
 453                proto::UnregisterWorktree {
 454                    project_id,
 455                    worktree_id,
 456                },
 457            )
 458        })?;
 459        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 460        Ok(())
 461    }
 462
 463    async fn share_worktree(
 464        mut self: Arc<Server>,
 465        mut request: TypedEnvelope<proto::ShareWorktree>,
 466    ) -> tide::Result<proto::Ack> {
 467        let worktree = request
 468            .payload
 469            .worktree
 470            .as_mut()
 471            .ok_or_else(|| anyhow!("missing worktree"))?;
 472        let entries = worktree
 473            .entries
 474            .iter()
 475            .map(|entry| (entry.id, entry.clone()))
 476            .collect();
 477        let diagnostic_summaries = worktree
 478            .diagnostic_summaries
 479            .iter()
 480            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 481            .collect();
 482
 483        let shared_worktree = self.state_mut().share_worktree(
 484            request.payload.project_id,
 485            worktree.id,
 486            request.sender_id,
 487            entries,
 488            diagnostic_summaries,
 489            worktree.next_update_id,
 490        )?;
 491
 492        broadcast(
 493            request.sender_id,
 494            shared_worktree.connection_ids,
 495            |connection_id| {
 496                self.peer
 497                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 498            },
 499        )?;
 500        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 501
 502        Ok(proto::Ack {})
 503    }
 504
 505    async fn update_worktree(
 506        mut self: Arc<Server>,
 507        request: TypedEnvelope<proto::UpdateWorktree>,
 508    ) -> tide::Result<proto::Ack> {
 509        let connection_ids = self.state_mut().update_worktree(
 510            request.sender_id,
 511            request.payload.project_id,
 512            request.payload.worktree_id,
 513            request.payload.id,
 514            &request.payload.removed_entries,
 515            &request.payload.updated_entries,
 516        )?;
 517
 518        broadcast(request.sender_id, connection_ids, |connection_id| {
 519            self.peer
 520                .forward_send(request.sender_id, connection_id, request.payload.clone())
 521        })?;
 522
 523        Ok(proto::Ack {})
 524    }
 525
 526    async fn update_diagnostic_summary(
 527        mut self: Arc<Server>,
 528        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 529    ) -> tide::Result<()> {
 530        let summary = request
 531            .payload
 532            .summary
 533            .clone()
 534            .ok_or_else(|| anyhow!("invalid summary"))?;
 535        let receiver_ids = self.state_mut().update_diagnostic_summary(
 536            request.payload.project_id,
 537            request.payload.worktree_id,
 538            request.sender_id,
 539            summary,
 540        )?;
 541
 542        broadcast(request.sender_id, receiver_ids, |connection_id| {
 543            self.peer
 544                .forward_send(request.sender_id, connection_id, request.payload.clone())
 545        })?;
 546        Ok(())
 547    }
 548
 549    async fn disk_based_diagnostics_updating(
 550        self: Arc<Server>,
 551        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 552    ) -> tide::Result<()> {
 553        let receiver_ids = self
 554            .state()
 555            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 556        broadcast(request.sender_id, receiver_ids, |connection_id| {
 557            self.peer
 558                .forward_send(request.sender_id, connection_id, request.payload.clone())
 559        })?;
 560        Ok(())
 561    }
 562
 563    async fn disk_based_diagnostics_updated(
 564        self: Arc<Server>,
 565        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 566    ) -> tide::Result<()> {
 567        let receiver_ids = self
 568            .state()
 569            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 570        broadcast(request.sender_id, receiver_ids, |connection_id| {
 571            self.peer
 572                .forward_send(request.sender_id, connection_id, request.payload.clone())
 573        })?;
 574        Ok(())
 575    }
 576
 577    async fn get_definition(
 578        self: Arc<Server>,
 579        request: TypedEnvelope<proto::GetDefinition>,
 580    ) -> tide::Result<proto::GetDefinitionResponse> {
 581        let host_connection_id = self
 582            .state()
 583            .read_project(request.payload.project_id, request.sender_id)?
 584            .host_connection_id;
 585        Ok(self
 586            .peer
 587            .forward_request(request.sender_id, host_connection_id, request.payload)
 588            .await?)
 589    }
 590
 591    async fn get_project_symbols(
 592        self: Arc<Server>,
 593        request: TypedEnvelope<proto::GetProjectSymbols>,
 594    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 595        let host_connection_id = self
 596            .state()
 597            .read_project(request.payload.project_id, request.sender_id)?
 598            .host_connection_id;
 599        Ok(self
 600            .peer
 601            .forward_request(request.sender_id, host_connection_id, request.payload)
 602            .await?)
 603    }
 604
 605    async fn open_buffer(
 606        self: Arc<Server>,
 607        request: TypedEnvelope<proto::OpenBuffer>,
 608    ) -> tide::Result<proto::OpenBufferResponse> {
 609        let host_connection_id = self
 610            .state()
 611            .read_project(request.payload.project_id, request.sender_id)?
 612            .host_connection_id;
 613        Ok(self
 614            .peer
 615            .forward_request(request.sender_id, host_connection_id, request.payload)
 616            .await?)
 617    }
 618
 619    async fn close_buffer(
 620        self: Arc<Server>,
 621        request: TypedEnvelope<proto::CloseBuffer>,
 622    ) -> tide::Result<()> {
 623        let host_connection_id = self
 624            .state()
 625            .read_project(request.payload.project_id, request.sender_id)?
 626            .host_connection_id;
 627        self.peer
 628            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 629        Ok(())
 630    }
 631
 632    async fn save_buffer(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::SaveBuffer>,
 635    ) -> tide::Result<proto::BufferSaved> {
 636        let host;
 637        let mut guests;
 638        {
 639            let state = self.state();
 640            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 641            host = project.host_connection_id;
 642            guests = project.guest_connection_ids()
 643        }
 644
 645        let response = self
 646            .peer
 647            .forward_request(request.sender_id, host, request.payload.clone())
 648            .await?;
 649
 650        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 651        broadcast(host, guests, |conn_id| {
 652            self.peer.forward_send(host, conn_id, response.clone())
 653        })?;
 654
 655        Ok(response)
 656    }
 657
 658    async fn format_buffers(
 659        self: Arc<Server>,
 660        request: TypedEnvelope<proto::FormatBuffers>,
 661    ) -> tide::Result<proto::FormatBuffersResponse> {
 662        let host = self
 663            .state()
 664            .read_project(request.payload.project_id, request.sender_id)?
 665            .host_connection_id;
 666        Ok(self
 667            .peer
 668            .forward_request(request.sender_id, host, request.payload.clone())
 669            .await?)
 670    }
 671
 672    async fn get_completions(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::GetCompletions>,
 675    ) -> tide::Result<proto::GetCompletionsResponse> {
 676        let host = self
 677            .state()
 678            .read_project(request.payload.project_id, request.sender_id)?
 679            .host_connection_id;
 680        Ok(self
 681            .peer
 682            .forward_request(request.sender_id, host, request.payload.clone())
 683            .await?)
 684    }
 685
 686    async fn apply_additional_edits_for_completion(
 687        self: Arc<Server>,
 688        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 689    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 690        let host = self
 691            .state()
 692            .read_project(request.payload.project_id, request.sender_id)?
 693            .host_connection_id;
 694        Ok(self
 695            .peer
 696            .forward_request(request.sender_id, host, request.payload.clone())
 697            .await?)
 698    }
 699
 700    async fn get_code_actions(
 701        self: Arc<Server>,
 702        request: TypedEnvelope<proto::GetCodeActions>,
 703    ) -> tide::Result<proto::GetCodeActionsResponse> {
 704        let host = self
 705            .state()
 706            .read_project(request.payload.project_id, request.sender_id)?
 707            .host_connection_id;
 708        Ok(self
 709            .peer
 710            .forward_request(request.sender_id, host, request.payload.clone())
 711            .await?)
 712    }
 713
 714    async fn apply_code_action(
 715        self: Arc<Server>,
 716        request: TypedEnvelope<proto::ApplyCodeAction>,
 717    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 718        let host = self
 719            .state()
 720            .read_project(request.payload.project_id, request.sender_id)?
 721            .host_connection_id;
 722        Ok(self
 723            .peer
 724            .forward_request(request.sender_id, host, request.payload.clone())
 725            .await?)
 726    }
 727
 728    async fn prepare_rename(
 729        self: Arc<Server>,
 730        request: TypedEnvelope<proto::PrepareRename>,
 731    ) -> tide::Result<proto::PrepareRenameResponse> {
 732        let host = self
 733            .state()
 734            .read_project(request.payload.project_id, request.sender_id)?
 735            .host_connection_id;
 736        Ok(self
 737            .peer
 738            .forward_request(request.sender_id, host, request.payload.clone())
 739            .await?)
 740    }
 741
 742    async fn perform_rename(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::PerformRename>,
 745    ) -> tide::Result<proto::PerformRenameResponse> {
 746        let host = self
 747            .state()
 748            .read_project(request.payload.project_id, request.sender_id)?
 749            .host_connection_id;
 750        Ok(self
 751            .peer
 752            .forward_request(request.sender_id, host, request.payload.clone())
 753            .await?)
 754    }
 755
 756    async fn update_buffer(
 757        self: Arc<Server>,
 758        request: TypedEnvelope<proto::UpdateBuffer>,
 759    ) -> tide::Result<proto::Ack> {
 760        let receiver_ids = self
 761            .state()
 762            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 763        broadcast(request.sender_id, receiver_ids, |connection_id| {
 764            self.peer
 765                .forward_send(request.sender_id, connection_id, request.payload.clone())
 766        })?;
 767        Ok(proto::Ack {})
 768    }
 769
 770    async fn update_buffer_file(
 771        self: Arc<Server>,
 772        request: TypedEnvelope<proto::UpdateBufferFile>,
 773    ) -> tide::Result<()> {
 774        let receiver_ids = self
 775            .state()
 776            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 777        broadcast(request.sender_id, receiver_ids, |connection_id| {
 778            self.peer
 779                .forward_send(request.sender_id, connection_id, request.payload.clone())
 780        })?;
 781        Ok(())
 782    }
 783
 784    async fn buffer_reloaded(
 785        self: Arc<Server>,
 786        request: TypedEnvelope<proto::BufferReloaded>,
 787    ) -> tide::Result<()> {
 788        let receiver_ids = self
 789            .state()
 790            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 791        broadcast(request.sender_id, receiver_ids, |connection_id| {
 792            self.peer
 793                .forward_send(request.sender_id, connection_id, request.payload.clone())
 794        })?;
 795        Ok(())
 796    }
 797
 798    async fn buffer_saved(
 799        self: Arc<Server>,
 800        request: TypedEnvelope<proto::BufferSaved>,
 801    ) -> tide::Result<()> {
 802        let receiver_ids = self
 803            .state()
 804            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 805        broadcast(request.sender_id, receiver_ids, |connection_id| {
 806            self.peer
 807                .forward_send(request.sender_id, connection_id, request.payload.clone())
 808        })?;
 809        Ok(())
 810    }
 811
 812    async fn get_channels(
 813        self: Arc<Server>,
 814        request: TypedEnvelope<proto::GetChannels>,
 815    ) -> tide::Result<proto::GetChannelsResponse> {
 816        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 817        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 818        Ok(proto::GetChannelsResponse {
 819            channels: channels
 820                .into_iter()
 821                .map(|chan| proto::Channel {
 822                    id: chan.id.to_proto(),
 823                    name: chan.name,
 824                })
 825                .collect(),
 826        })
 827    }
 828
 829    async fn get_users(
 830        self: Arc<Server>,
 831        request: TypedEnvelope<proto::GetUsers>,
 832    ) -> tide::Result<proto::GetUsersResponse> {
 833        let user_ids = request
 834            .payload
 835            .user_ids
 836            .into_iter()
 837            .map(UserId::from_proto)
 838            .collect();
 839        let users = self
 840            .app_state
 841            .db
 842            .get_users_by_ids(user_ids)
 843            .await?
 844            .into_iter()
 845            .map(|user| proto::User {
 846                id: user.id.to_proto(),
 847                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 848                github_login: user.github_login,
 849            })
 850            .collect();
 851        Ok(proto::GetUsersResponse { users })
 852    }
 853
 854    fn update_contacts_for_users<'a>(
 855        self: &Arc<Server>,
 856        user_ids: impl IntoIterator<Item = &'a UserId>,
 857    ) -> anyhow::Result<()> {
 858        let mut result = Ok(());
 859        let state = self.state();
 860        for user_id in user_ids {
 861            let contacts = state.contacts_for_user(*user_id);
 862            for connection_id in state.connection_ids_for_user(*user_id) {
 863                if let Err(error) = self.peer.send(
 864                    connection_id,
 865                    proto::UpdateContacts {
 866                        contacts: contacts.clone(),
 867                    },
 868                ) {
 869                    result = Err(error);
 870                }
 871            }
 872        }
 873        result
 874    }
 875
 876    async fn join_channel(
 877        mut self: Arc<Self>,
 878        request: TypedEnvelope<proto::JoinChannel>,
 879    ) -> tide::Result<proto::JoinChannelResponse> {
 880        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 881        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 882        if !self
 883            .app_state
 884            .db
 885            .can_user_access_channel(user_id, channel_id)
 886            .await?
 887        {
 888            Err(anyhow!("access denied"))?;
 889        }
 890
 891        self.state_mut().join_channel(request.sender_id, channel_id);
 892        let messages = self
 893            .app_state
 894            .db
 895            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 896            .await?
 897            .into_iter()
 898            .map(|msg| proto::ChannelMessage {
 899                id: msg.id.to_proto(),
 900                body: msg.body,
 901                timestamp: msg.sent_at.unix_timestamp() as u64,
 902                sender_id: msg.sender_id.to_proto(),
 903                nonce: Some(msg.nonce.as_u128().into()),
 904            })
 905            .collect::<Vec<_>>();
 906        Ok(proto::JoinChannelResponse {
 907            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 908            messages,
 909        })
 910    }
 911
 912    async fn leave_channel(
 913        mut self: Arc<Self>,
 914        request: TypedEnvelope<proto::LeaveChannel>,
 915    ) -> tide::Result<()> {
 916        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 917        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 918        if !self
 919            .app_state
 920            .db
 921            .can_user_access_channel(user_id, channel_id)
 922            .await?
 923        {
 924            Err(anyhow!("access denied"))?;
 925        }
 926
 927        self.state_mut()
 928            .leave_channel(request.sender_id, channel_id);
 929
 930        Ok(())
 931    }
 932
 933    async fn send_channel_message(
 934        self: Arc<Self>,
 935        request: TypedEnvelope<proto::SendChannelMessage>,
 936    ) -> tide::Result<proto::SendChannelMessageResponse> {
 937        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 938        let user_id;
 939        let connection_ids;
 940        {
 941            let state = self.state();
 942            user_id = state.user_id_for_connection(request.sender_id)?;
 943            connection_ids = state.channel_connection_ids(channel_id)?;
 944        }
 945
 946        // Validate the message body.
 947        let body = request.payload.body.trim().to_string();
 948        if body.len() > MAX_MESSAGE_LEN {
 949            return Err(anyhow!("message is too long"))?;
 950        }
 951        if body.is_empty() {
 952            return Err(anyhow!("message can't be blank"))?;
 953        }
 954
 955        let timestamp = OffsetDateTime::now_utc();
 956        let nonce = request
 957            .payload
 958            .nonce
 959            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 960
 961        let message_id = self
 962            .app_state
 963            .db
 964            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 965            .await?
 966            .to_proto();
 967        let message = proto::ChannelMessage {
 968            sender_id: user_id.to_proto(),
 969            id: message_id,
 970            body,
 971            timestamp: timestamp.unix_timestamp() as u64,
 972            nonce: Some(nonce),
 973        };
 974        broadcast(request.sender_id, connection_ids, |conn_id| {
 975            self.peer.send(
 976                conn_id,
 977                proto::ChannelMessageSent {
 978                    channel_id: channel_id.to_proto(),
 979                    message: Some(message.clone()),
 980                },
 981            )
 982        })?;
 983        Ok(proto::SendChannelMessageResponse {
 984            message: Some(message),
 985        })
 986    }
 987
 988    async fn get_channel_messages(
 989        self: Arc<Self>,
 990        request: TypedEnvelope<proto::GetChannelMessages>,
 991    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 992        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 993        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 994        if !self
 995            .app_state
 996            .db
 997            .can_user_access_channel(user_id, channel_id)
 998            .await?
 999        {
1000            Err(anyhow!("access denied"))?;
1001        }
1002
1003        let messages = self
1004            .app_state
1005            .db
1006            .get_channel_messages(
1007                channel_id,
1008                MESSAGE_COUNT_PER_PAGE,
1009                Some(MessageId::from_proto(request.payload.before_message_id)),
1010            )
1011            .await?
1012            .into_iter()
1013            .map(|msg| proto::ChannelMessage {
1014                id: msg.id.to_proto(),
1015                body: msg.body,
1016                timestamp: msg.sent_at.unix_timestamp() as u64,
1017                sender_id: msg.sender_id.to_proto(),
1018                nonce: Some(msg.nonce.as_u128().into()),
1019            })
1020            .collect::<Vec<_>>();
1021
1022        Ok(proto::GetChannelMessagesResponse {
1023            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1024            messages,
1025        })
1026    }
1027
1028    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1029        self.store.read()
1030    }
1031
1032    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1033        self.store.write()
1034    }
1035}
1036
1037impl Executor for RealExecutor {
1038    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1039        task::spawn(future);
1040    }
1041}
1042
1043fn broadcast<F>(
1044    sender_id: ConnectionId,
1045    receiver_ids: Vec<ConnectionId>,
1046    mut f: F,
1047) -> anyhow::Result<()>
1048where
1049    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1050{
1051    let mut result = Ok(());
1052    for receiver_id in receiver_ids {
1053        if receiver_id != sender_id {
1054            if let Err(error) = f(receiver_id) {
1055                if result.is_ok() {
1056                    result = Err(error);
1057                }
1058            }
1059        }
1060    }
1061    result
1062}
1063
1064pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1065    let server = Server::new(app.state().clone(), rpc.clone(), None);
1066    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1067        let server = server.clone();
1068        async move {
1069            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1070
1071            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1072            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1073            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1074            let client_protocol_version: Option<u32> = request
1075                .header("X-Zed-Protocol-Version")
1076                .and_then(|v| v.as_str().parse().ok());
1077
1078            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1079                return Ok(Response::new(StatusCode::UpgradeRequired));
1080            }
1081
1082            let header = match request.header("Sec-Websocket-Key") {
1083                Some(h) => h.as_str(),
1084                None => return Err(anyhow!("expected sec-websocket-key"))?,
1085            };
1086
1087            let user_id = process_auth_header(&request).await?;
1088
1089            let mut response = Response::new(StatusCode::SwitchingProtocols);
1090            response.insert_header(UPGRADE, "websocket");
1091            response.insert_header(CONNECTION, "Upgrade");
1092            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1093            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1094            response.insert_header("Sec-Websocket-Version", "13");
1095
1096            let http_res: &mut tide::http::Response = response.as_mut();
1097            let upgrade_receiver = http_res.recv_upgrade().await;
1098            let addr = request.remote().unwrap_or("unknown").to_string();
1099            task::spawn(async move {
1100                if let Some(stream) = upgrade_receiver.await {
1101                    server
1102                        .handle_connection(
1103                            Connection::new(
1104                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1105                            ),
1106                            addr,
1107                            user_id,
1108                            None,
1109                            RealExecutor,
1110                        )
1111                        .await;
1112                }
1113            });
1114
1115            Ok(response)
1116        }
1117    });
1118}
1119
1120fn header_contains_ignore_case<T>(
1121    request: &tide::Request<T>,
1122    header_name: HeaderName,
1123    value: &str,
1124) -> bool {
1125    request
1126        .header(header_name)
1127        .map(|h| {
1128            h.as_str()
1129                .split(',')
1130                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1131        })
1132        .unwrap_or(false)
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137    use super::*;
1138    use crate::{
1139        auth,
1140        db::{tests::TestDb, UserId},
1141        github, AppState, Config,
1142    };
1143    use ::rpc::Peer;
1144    use collections::BTreeMap;
1145    use gpui::{executor, ModelHandle, TestAppContext};
1146    use parking_lot::Mutex;
1147    use postage::{sink::Sink, watch};
1148    use rand::prelude::*;
1149    use rpc::PeerId;
1150    use serde_json::json;
1151    use sqlx::types::time::OffsetDateTime;
1152    use std::{
1153        cell::{Cell, RefCell},
1154        env,
1155        ops::Deref,
1156        path::Path,
1157        rc::Rc,
1158        sync::{
1159            atomic::{AtomicBool, Ordering::SeqCst},
1160            Arc,
1161        },
1162        time::Duration,
1163    };
1164    use zed::{
1165        client::{
1166            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1167            EstablishConnectionError, UserStore,
1168        },
1169        editor::{
1170            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1171            Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1172        },
1173        fs::{FakeFs, Fs as _},
1174        language::{
1175            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1176            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1177        },
1178        lsp,
1179        project::{DiagnosticSummary, Project, ProjectPath},
1180        workspace::{Workspace, WorkspaceParams},
1181    };
1182
1183    #[cfg(test)]
1184    #[ctor::ctor]
1185    fn init_logger() {
1186        if std::env::var("RUST_LOG").is_ok() {
1187            env_logger::init();
1188        }
1189    }
1190
1191    #[gpui::test(iterations = 10)]
1192    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1193        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1194        let lang_registry = Arc::new(LanguageRegistry::new());
1195        let fs = FakeFs::new(cx_a.background());
1196        cx_a.foreground().forbid_parking();
1197
1198        // Connect to a server as 2 clients.
1199        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1200        let client_a = server.create_client(&mut cx_a, "user_a").await;
1201        let client_b = server.create_client(&mut cx_b, "user_b").await;
1202
1203        // Share a project as client A
1204        fs.insert_tree(
1205            "/a",
1206            json!({
1207                ".zed.toml": r#"collaborators = ["user_b"]"#,
1208                "a.txt": "a-contents",
1209                "b.txt": "b-contents",
1210            }),
1211        )
1212        .await;
1213        let project_a = cx_a.update(|cx| {
1214            Project::local(
1215                client_a.clone(),
1216                client_a.user_store.clone(),
1217                lang_registry.clone(),
1218                fs.clone(),
1219                cx,
1220            )
1221        });
1222        let (worktree_a, _) = project_a
1223            .update(&mut cx_a, |p, cx| {
1224                p.find_or_create_local_worktree("/a", false, cx)
1225            })
1226            .await
1227            .unwrap();
1228        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1229        worktree_a
1230            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1231            .await;
1232        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1233        project_a
1234            .update(&mut cx_a, |p, cx| p.share(cx))
1235            .await
1236            .unwrap();
1237
1238        // Join that project as client B
1239        let project_b = Project::remote(
1240            project_id,
1241            client_b.clone(),
1242            client_b.user_store.clone(),
1243            lang_registry.clone(),
1244            fs.clone(),
1245            &mut cx_b.to_async(),
1246        )
1247        .await
1248        .unwrap();
1249
1250        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1251            assert_eq!(
1252                project
1253                    .collaborators()
1254                    .get(&client_a.peer_id)
1255                    .unwrap()
1256                    .user
1257                    .github_login,
1258                "user_a"
1259            );
1260            project.replica_id()
1261        });
1262        project_a
1263            .condition(&cx_a, |tree, _| {
1264                tree.collaborators()
1265                    .get(&client_b.peer_id)
1266                    .map_or(false, |collaborator| {
1267                        collaborator.replica_id == replica_id_b
1268                            && collaborator.user.github_login == "user_b"
1269                    })
1270            })
1271            .await;
1272
1273        // Open the same file as client B and client A.
1274        let buffer_b = project_b
1275            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1276            .await
1277            .unwrap();
1278        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1279        buffer_b.read_with(&cx_b, |buf, cx| {
1280            assert_eq!(buf.read(cx).text(), "b-contents")
1281        });
1282        project_a.read_with(&cx_a, |project, cx| {
1283            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1284        });
1285        let buffer_a = project_a
1286            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1287            .await
1288            .unwrap();
1289
1290        let editor_b = cx_b.add_view(window_b, |cx| {
1291            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1292        });
1293
1294        // TODO
1295        // // Create a selection set as client B and see that selection set as client A.
1296        // buffer_a
1297        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1298        //     .await;
1299
1300        // Edit the buffer as client B and see that edit as client A.
1301        editor_b.update(&mut cx_b, |editor, cx| {
1302            editor.handle_input(&Input("ok, ".into()), cx)
1303        });
1304        buffer_a
1305            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1306            .await;
1307
1308        // TODO
1309        // // Remove the selection set as client B, see those selections disappear as client A.
1310        cx_b.update(move |_| drop(editor_b));
1311        // buffer_a
1312        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1313        //     .await;
1314
1315        // Close the buffer as client A, see that the buffer is closed.
1316        cx_a.update(move |_| drop(buffer_a));
1317        project_a
1318            .condition(&cx_a, |project, cx| {
1319                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1320            })
1321            .await;
1322
1323        // Dropping the client B's project removes client B from client A's collaborators.
1324        cx_b.update(move |_| drop(project_b));
1325        project_a
1326            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1327            .await;
1328    }
1329
1330    #[gpui::test(iterations = 10)]
1331    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1332        let lang_registry = Arc::new(LanguageRegistry::new());
1333        let fs = FakeFs::new(cx_a.background());
1334        cx_a.foreground().forbid_parking();
1335
1336        // Connect to a server as 2 clients.
1337        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1338        let client_a = server.create_client(&mut cx_a, "user_a").await;
1339        let client_b = server.create_client(&mut cx_b, "user_b").await;
1340
1341        // Share a project as client A
1342        fs.insert_tree(
1343            "/a",
1344            json!({
1345                ".zed.toml": r#"collaborators = ["user_b"]"#,
1346                "a.txt": "a-contents",
1347                "b.txt": "b-contents",
1348            }),
1349        )
1350        .await;
1351        let project_a = cx_a.update(|cx| {
1352            Project::local(
1353                client_a.clone(),
1354                client_a.user_store.clone(),
1355                lang_registry.clone(),
1356                fs.clone(),
1357                cx,
1358            )
1359        });
1360        let (worktree_a, _) = project_a
1361            .update(&mut cx_a, |p, cx| {
1362                p.find_or_create_local_worktree("/a", false, cx)
1363            })
1364            .await
1365            .unwrap();
1366        worktree_a
1367            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1368            .await;
1369        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1370        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1371        project_a
1372            .update(&mut cx_a, |p, cx| p.share(cx))
1373            .await
1374            .unwrap();
1375        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1376
1377        // Join that project as client B
1378        let project_b = Project::remote(
1379            project_id,
1380            client_b.clone(),
1381            client_b.user_store.clone(),
1382            lang_registry.clone(),
1383            fs.clone(),
1384            &mut cx_b.to_async(),
1385        )
1386        .await
1387        .unwrap();
1388        project_b
1389            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1390            .await
1391            .unwrap();
1392
1393        // Unshare the project as client A
1394        project_a
1395            .update(&mut cx_a, |project, cx| project.unshare(cx))
1396            .await
1397            .unwrap();
1398        project_b
1399            .condition(&mut cx_b, |project, _| project.is_read_only())
1400            .await;
1401        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1402        drop(project_b);
1403
1404        // Share the project again and ensure guests can still join.
1405        project_a
1406            .update(&mut cx_a, |project, cx| project.share(cx))
1407            .await
1408            .unwrap();
1409        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1410
1411        let project_c = Project::remote(
1412            project_id,
1413            client_b.clone(),
1414            client_b.user_store.clone(),
1415            lang_registry.clone(),
1416            fs.clone(),
1417            &mut cx_b.to_async(),
1418        )
1419        .await
1420        .unwrap();
1421        project_c
1422            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1423            .await
1424            .unwrap();
1425    }
1426
1427    #[gpui::test(iterations = 10)]
1428    async fn test_propagate_saves_and_fs_changes(
1429        mut cx_a: TestAppContext,
1430        mut cx_b: TestAppContext,
1431        mut cx_c: TestAppContext,
1432    ) {
1433        let lang_registry = Arc::new(LanguageRegistry::new());
1434        let fs = FakeFs::new(cx_a.background());
1435        cx_a.foreground().forbid_parking();
1436
1437        // Connect to a server as 3 clients.
1438        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1439        let client_a = server.create_client(&mut cx_a, "user_a").await;
1440        let client_b = server.create_client(&mut cx_b, "user_b").await;
1441        let client_c = server.create_client(&mut cx_c, "user_c").await;
1442
1443        // Share a worktree as client A.
1444        fs.insert_tree(
1445            "/a",
1446            json!({
1447                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1448                "file1": "",
1449                "file2": ""
1450            }),
1451        )
1452        .await;
1453        let project_a = cx_a.update(|cx| {
1454            Project::local(
1455                client_a.clone(),
1456                client_a.user_store.clone(),
1457                lang_registry.clone(),
1458                fs.clone(),
1459                cx,
1460            )
1461        });
1462        let (worktree_a, _) = project_a
1463            .update(&mut cx_a, |p, cx| {
1464                p.find_or_create_local_worktree("/a", false, cx)
1465            })
1466            .await
1467            .unwrap();
1468        worktree_a
1469            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1470            .await;
1471        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1472        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1473        project_a
1474            .update(&mut cx_a, |p, cx| p.share(cx))
1475            .await
1476            .unwrap();
1477
1478        // Join that worktree as clients B and C.
1479        let project_b = Project::remote(
1480            project_id,
1481            client_b.clone(),
1482            client_b.user_store.clone(),
1483            lang_registry.clone(),
1484            fs.clone(),
1485            &mut cx_b.to_async(),
1486        )
1487        .await
1488        .unwrap();
1489        let project_c = Project::remote(
1490            project_id,
1491            client_c.clone(),
1492            client_c.user_store.clone(),
1493            lang_registry.clone(),
1494            fs.clone(),
1495            &mut cx_c.to_async(),
1496        )
1497        .await
1498        .unwrap();
1499        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1500        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1501
1502        // Open and edit a buffer as both guests B and C.
1503        let buffer_b = project_b
1504            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1505            .await
1506            .unwrap();
1507        let buffer_c = project_c
1508            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1509            .await
1510            .unwrap();
1511        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1512        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1513
1514        // Open and edit that buffer as the host.
1515        let buffer_a = project_a
1516            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1517            .await
1518            .unwrap();
1519
1520        buffer_a
1521            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1522            .await;
1523        buffer_a.update(&mut cx_a, |buf, cx| {
1524            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1525        });
1526
1527        // Wait for edits to propagate
1528        buffer_a
1529            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1530            .await;
1531        buffer_b
1532            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1533            .await;
1534        buffer_c
1535            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1536            .await;
1537
1538        // Edit the buffer as the host and concurrently save as guest B.
1539        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1540        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1541        save_b.await.unwrap();
1542        assert_eq!(
1543            fs.load("/a/file1".as_ref()).await.unwrap(),
1544            "hi-a, i-am-c, i-am-b, i-am-a"
1545        );
1546        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1547        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1548        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1549
1550        // Make changes on host's file system, see those changes on guest worktrees.
1551        fs.rename(
1552            "/a/file1".as_ref(),
1553            "/a/file1-renamed".as_ref(),
1554            Default::default(),
1555        )
1556        .await
1557        .unwrap();
1558
1559        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1560            .await
1561            .unwrap();
1562        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1563
1564        worktree_a
1565            .condition(&cx_a, |tree, _| {
1566                tree.paths()
1567                    .map(|p| p.to_string_lossy())
1568                    .collect::<Vec<_>>()
1569                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1570            })
1571            .await;
1572        worktree_b
1573            .condition(&cx_b, |tree, _| {
1574                tree.paths()
1575                    .map(|p| p.to_string_lossy())
1576                    .collect::<Vec<_>>()
1577                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1578            })
1579            .await;
1580        worktree_c
1581            .condition(&cx_c, |tree, _| {
1582                tree.paths()
1583                    .map(|p| p.to_string_lossy())
1584                    .collect::<Vec<_>>()
1585                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1586            })
1587            .await;
1588
1589        // Ensure buffer files are updated as well.
1590        buffer_a
1591            .condition(&cx_a, |buf, _| {
1592                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1593            })
1594            .await;
1595        buffer_b
1596            .condition(&cx_b, |buf, _| {
1597                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1598            })
1599            .await;
1600        buffer_c
1601            .condition(&cx_c, |buf, _| {
1602                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1603            })
1604            .await;
1605    }
1606
1607    #[gpui::test(iterations = 10)]
1608    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1609        cx_a.foreground().forbid_parking();
1610        let lang_registry = Arc::new(LanguageRegistry::new());
1611        let fs = FakeFs::new(cx_a.background());
1612
1613        // Connect to a server as 2 clients.
1614        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1615        let client_a = server.create_client(&mut cx_a, "user_a").await;
1616        let client_b = server.create_client(&mut cx_b, "user_b").await;
1617
1618        // Share a project as client A
1619        fs.insert_tree(
1620            "/dir",
1621            json!({
1622                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1623                "a.txt": "a-contents",
1624            }),
1625        )
1626        .await;
1627
1628        let project_a = cx_a.update(|cx| {
1629            Project::local(
1630                client_a.clone(),
1631                client_a.user_store.clone(),
1632                lang_registry.clone(),
1633                fs.clone(),
1634                cx,
1635            )
1636        });
1637        let (worktree_a, _) = project_a
1638            .update(&mut cx_a, |p, cx| {
1639                p.find_or_create_local_worktree("/dir", false, cx)
1640            })
1641            .await
1642            .unwrap();
1643        worktree_a
1644            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1645            .await;
1646        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1647        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1648        project_a
1649            .update(&mut cx_a, |p, cx| p.share(cx))
1650            .await
1651            .unwrap();
1652
1653        // Join that project as client B
1654        let project_b = Project::remote(
1655            project_id,
1656            client_b.clone(),
1657            client_b.user_store.clone(),
1658            lang_registry.clone(),
1659            fs.clone(),
1660            &mut cx_b.to_async(),
1661        )
1662        .await
1663        .unwrap();
1664
1665        // Open a buffer as client B
1666        let buffer_b = project_b
1667            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1668            .await
1669            .unwrap();
1670
1671        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1672        buffer_b.read_with(&cx_b, |buf, _| {
1673            assert!(buf.is_dirty());
1674            assert!(!buf.has_conflict());
1675        });
1676
1677        buffer_b
1678            .update(&mut cx_b, |buf, cx| buf.save(cx))
1679            .await
1680            .unwrap();
1681        buffer_b
1682            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1683            .await;
1684        buffer_b.read_with(&cx_b, |buf, _| {
1685            assert!(!buf.has_conflict());
1686        });
1687
1688        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1689        buffer_b.read_with(&cx_b, |buf, _| {
1690            assert!(buf.is_dirty());
1691            assert!(!buf.has_conflict());
1692        });
1693    }
1694
1695    #[gpui::test(iterations = 10)]
1696    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1697        cx_a.foreground().forbid_parking();
1698        let lang_registry = Arc::new(LanguageRegistry::new());
1699        let fs = FakeFs::new(cx_a.background());
1700
1701        // Connect to a server as 2 clients.
1702        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1703        let client_a = server.create_client(&mut cx_a, "user_a").await;
1704        let client_b = server.create_client(&mut cx_b, "user_b").await;
1705
1706        // Share a project as client A
1707        fs.insert_tree(
1708            "/dir",
1709            json!({
1710                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1711                "a.txt": "a-contents",
1712            }),
1713        )
1714        .await;
1715
1716        let project_a = cx_a.update(|cx| {
1717            Project::local(
1718                client_a.clone(),
1719                client_a.user_store.clone(),
1720                lang_registry.clone(),
1721                fs.clone(),
1722                cx,
1723            )
1724        });
1725        let (worktree_a, _) = project_a
1726            .update(&mut cx_a, |p, cx| {
1727                p.find_or_create_local_worktree("/dir", false, cx)
1728            })
1729            .await
1730            .unwrap();
1731        worktree_a
1732            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1733            .await;
1734        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1735        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1736        project_a
1737            .update(&mut cx_a, |p, cx| p.share(cx))
1738            .await
1739            .unwrap();
1740
1741        // Join that project as client B
1742        let project_b = Project::remote(
1743            project_id,
1744            client_b.clone(),
1745            client_b.user_store.clone(),
1746            lang_registry.clone(),
1747            fs.clone(),
1748            &mut cx_b.to_async(),
1749        )
1750        .await
1751        .unwrap();
1752        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1753
1754        // Open a buffer as client B
1755        let buffer_b = project_b
1756            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1757            .await
1758            .unwrap();
1759        buffer_b.read_with(&cx_b, |buf, _| {
1760            assert!(!buf.is_dirty());
1761            assert!(!buf.has_conflict());
1762        });
1763
1764        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1765            .await
1766            .unwrap();
1767        buffer_b
1768            .condition(&cx_b, |buf, _| {
1769                buf.text() == "new contents" && !buf.is_dirty()
1770            })
1771            .await;
1772        buffer_b.read_with(&cx_b, |buf, _| {
1773            assert!(!buf.has_conflict());
1774        });
1775    }
1776
1777    #[gpui::test(iterations = 10)]
1778    async fn test_editing_while_guest_opens_buffer(
1779        mut cx_a: TestAppContext,
1780        mut cx_b: TestAppContext,
1781    ) {
1782        cx_a.foreground().forbid_parking();
1783        let lang_registry = Arc::new(LanguageRegistry::new());
1784        let fs = FakeFs::new(cx_a.background());
1785
1786        // Connect to a server as 2 clients.
1787        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1788        let client_a = server.create_client(&mut cx_a, "user_a").await;
1789        let client_b = server.create_client(&mut cx_b, "user_b").await;
1790
1791        // Share a project as client A
1792        fs.insert_tree(
1793            "/dir",
1794            json!({
1795                ".zed.toml": r#"collaborators = ["user_b"]"#,
1796                "a.txt": "a-contents",
1797            }),
1798        )
1799        .await;
1800        let project_a = cx_a.update(|cx| {
1801            Project::local(
1802                client_a.clone(),
1803                client_a.user_store.clone(),
1804                lang_registry.clone(),
1805                fs.clone(),
1806                cx,
1807            )
1808        });
1809        let (worktree_a, _) = project_a
1810            .update(&mut cx_a, |p, cx| {
1811                p.find_or_create_local_worktree("/dir", false, cx)
1812            })
1813            .await
1814            .unwrap();
1815        worktree_a
1816            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1817            .await;
1818        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1819        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1820        project_a
1821            .update(&mut cx_a, |p, cx| p.share(cx))
1822            .await
1823            .unwrap();
1824
1825        // Join that project as client B
1826        let project_b = Project::remote(
1827            project_id,
1828            client_b.clone(),
1829            client_b.user_store.clone(),
1830            lang_registry.clone(),
1831            fs.clone(),
1832            &mut cx_b.to_async(),
1833        )
1834        .await
1835        .unwrap();
1836
1837        // Open a buffer as client A
1838        let buffer_a = project_a
1839            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1840            .await
1841            .unwrap();
1842
1843        // Start opening the same buffer as client B
1844        let buffer_b = cx_b
1845            .background()
1846            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1847
1848        // Edit the buffer as client A while client B is still opening it.
1849        cx_b.background().simulate_random_delay().await;
1850        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1851        cx_b.background().simulate_random_delay().await;
1852        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1853
1854        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1855        let buffer_b = buffer_b.await.unwrap();
1856        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1857    }
1858
1859    #[gpui::test(iterations = 10)]
1860    async fn test_leaving_worktree_while_opening_buffer(
1861        mut cx_a: TestAppContext,
1862        mut cx_b: TestAppContext,
1863    ) {
1864        cx_a.foreground().forbid_parking();
1865        let lang_registry = Arc::new(LanguageRegistry::new());
1866        let fs = FakeFs::new(cx_a.background());
1867
1868        // Connect to a server as 2 clients.
1869        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1870        let client_a = server.create_client(&mut cx_a, "user_a").await;
1871        let client_b = server.create_client(&mut cx_b, "user_b").await;
1872
1873        // Share a project as client A
1874        fs.insert_tree(
1875            "/dir",
1876            json!({
1877                ".zed.toml": r#"collaborators = ["user_b"]"#,
1878                "a.txt": "a-contents",
1879            }),
1880        )
1881        .await;
1882        let project_a = cx_a.update(|cx| {
1883            Project::local(
1884                client_a.clone(),
1885                client_a.user_store.clone(),
1886                lang_registry.clone(),
1887                fs.clone(),
1888                cx,
1889            )
1890        });
1891        let (worktree_a, _) = project_a
1892            .update(&mut cx_a, |p, cx| {
1893                p.find_or_create_local_worktree("/dir", false, cx)
1894            })
1895            .await
1896            .unwrap();
1897        worktree_a
1898            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1899            .await;
1900        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1901        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1902        project_a
1903            .update(&mut cx_a, |p, cx| p.share(cx))
1904            .await
1905            .unwrap();
1906
1907        // Join that project as client B
1908        let project_b = Project::remote(
1909            project_id,
1910            client_b.clone(),
1911            client_b.user_store.clone(),
1912            lang_registry.clone(),
1913            fs.clone(),
1914            &mut cx_b.to_async(),
1915        )
1916        .await
1917        .unwrap();
1918
1919        // See that a guest has joined as client A.
1920        project_a
1921            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1922            .await;
1923
1924        // Begin opening a buffer as client B, but leave the project before the open completes.
1925        let buffer_b = cx_b
1926            .background()
1927            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1928        cx_b.update(|_| drop(project_b));
1929        drop(buffer_b);
1930
1931        // See that the guest has left.
1932        project_a
1933            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1934            .await;
1935    }
1936
1937    #[gpui::test(iterations = 10)]
1938    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1939        cx_a.foreground().forbid_parking();
1940        let lang_registry = Arc::new(LanguageRegistry::new());
1941        let fs = FakeFs::new(cx_a.background());
1942
1943        // Connect to a server as 2 clients.
1944        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1945        let client_a = server.create_client(&mut cx_a, "user_a").await;
1946        let client_b = server.create_client(&mut cx_b, "user_b").await;
1947
1948        // Share a project as client A
1949        fs.insert_tree(
1950            "/a",
1951            json!({
1952                ".zed.toml": r#"collaborators = ["user_b"]"#,
1953                "a.txt": "a-contents",
1954                "b.txt": "b-contents",
1955            }),
1956        )
1957        .await;
1958        let project_a = cx_a.update(|cx| {
1959            Project::local(
1960                client_a.clone(),
1961                client_a.user_store.clone(),
1962                lang_registry.clone(),
1963                fs.clone(),
1964                cx,
1965            )
1966        });
1967        let (worktree_a, _) = project_a
1968            .update(&mut cx_a, |p, cx| {
1969                p.find_or_create_local_worktree("/a", false, cx)
1970            })
1971            .await
1972            .unwrap();
1973        worktree_a
1974            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1975            .await;
1976        let project_id = project_a
1977            .update(&mut cx_a, |project, _| project.next_remote_id())
1978            .await;
1979        project_a
1980            .update(&mut cx_a, |project, cx| project.share(cx))
1981            .await
1982            .unwrap();
1983
1984        // Join that project as client B
1985        let _project_b = Project::remote(
1986            project_id,
1987            client_b.clone(),
1988            client_b.user_store.clone(),
1989            lang_registry.clone(),
1990            fs.clone(),
1991            &mut cx_b.to_async(),
1992        )
1993        .await
1994        .unwrap();
1995
1996        // See that a guest has joined as client A.
1997        project_a
1998            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1999            .await;
2000
2001        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2002        client_b.disconnect(&cx_b.to_async()).unwrap();
2003        project_a
2004            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2005            .await;
2006    }
2007
2008    #[gpui::test(iterations = 10)]
2009    async fn test_collaborating_with_diagnostics(
2010        mut cx_a: TestAppContext,
2011        mut cx_b: TestAppContext,
2012    ) {
2013        cx_a.foreground().forbid_parking();
2014        let mut lang_registry = Arc::new(LanguageRegistry::new());
2015        let fs = FakeFs::new(cx_a.background());
2016
2017        // Set up a fake language server.
2018        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2019        Arc::get_mut(&mut lang_registry)
2020            .unwrap()
2021            .add(Arc::new(Language::new(
2022                LanguageConfig {
2023                    name: "Rust".to_string(),
2024                    path_suffixes: vec!["rs".to_string()],
2025                    language_server: Some(language_server_config),
2026                    ..Default::default()
2027                },
2028                Some(tree_sitter_rust::language()),
2029            )));
2030
2031        // Connect to a server as 2 clients.
2032        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2033        let client_a = server.create_client(&mut cx_a, "user_a").await;
2034        let client_b = server.create_client(&mut cx_b, "user_b").await;
2035
2036        // Share a project as client A
2037        fs.insert_tree(
2038            "/a",
2039            json!({
2040                ".zed.toml": r#"collaborators = ["user_b"]"#,
2041                "a.rs": "let one = two",
2042                "other.rs": "",
2043            }),
2044        )
2045        .await;
2046        let project_a = cx_a.update(|cx| {
2047            Project::local(
2048                client_a.clone(),
2049                client_a.user_store.clone(),
2050                lang_registry.clone(),
2051                fs.clone(),
2052                cx,
2053            )
2054        });
2055        let (worktree_a, _) = project_a
2056            .update(&mut cx_a, |p, cx| {
2057                p.find_or_create_local_worktree("/a", false, cx)
2058            })
2059            .await
2060            .unwrap();
2061        worktree_a
2062            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2063            .await;
2064        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2065        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2066        project_a
2067            .update(&mut cx_a, |p, cx| p.share(cx))
2068            .await
2069            .unwrap();
2070
2071        // Cause the language server to start.
2072        let _ = cx_a
2073            .background()
2074            .spawn(project_a.update(&mut cx_a, |project, cx| {
2075                project.open_buffer(
2076                    ProjectPath {
2077                        worktree_id,
2078                        path: Path::new("other.rs").into(),
2079                    },
2080                    cx,
2081                )
2082            }))
2083            .await
2084            .unwrap();
2085
2086        // Simulate a language server reporting errors for a file.
2087        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2088        fake_language_server
2089            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2090                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2091                version: None,
2092                diagnostics: vec![lsp::Diagnostic {
2093                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2094                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2095                    message: "message 1".to_string(),
2096                    ..Default::default()
2097                }],
2098            })
2099            .await;
2100
2101        // Wait for server to see the diagnostics update.
2102        server
2103            .condition(|store| {
2104                let worktree = store
2105                    .project(project_id)
2106                    .unwrap()
2107                    .worktrees
2108                    .get(&worktree_id.to_proto())
2109                    .unwrap();
2110
2111                !worktree
2112                    .share
2113                    .as_ref()
2114                    .unwrap()
2115                    .diagnostic_summaries
2116                    .is_empty()
2117            })
2118            .await;
2119
2120        // Join the worktree as client B.
2121        let project_b = Project::remote(
2122            project_id,
2123            client_b.clone(),
2124            client_b.user_store.clone(),
2125            lang_registry.clone(),
2126            fs.clone(),
2127            &mut cx_b.to_async(),
2128        )
2129        .await
2130        .unwrap();
2131
2132        project_b.read_with(&cx_b, |project, cx| {
2133            assert_eq!(
2134                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2135                &[(
2136                    ProjectPath {
2137                        worktree_id,
2138                        path: Arc::from(Path::new("a.rs")),
2139                    },
2140                    DiagnosticSummary {
2141                        error_count: 1,
2142                        warning_count: 0,
2143                        ..Default::default()
2144                    },
2145                )]
2146            )
2147        });
2148
2149        // Simulate a language server reporting more errors for a file.
2150        fake_language_server
2151            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2152                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2153                version: None,
2154                diagnostics: vec![
2155                    lsp::Diagnostic {
2156                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2157                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2158                        message: "message 1".to_string(),
2159                        ..Default::default()
2160                    },
2161                    lsp::Diagnostic {
2162                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2163                        range: lsp::Range::new(
2164                            lsp::Position::new(0, 10),
2165                            lsp::Position::new(0, 13),
2166                        ),
2167                        message: "message 2".to_string(),
2168                        ..Default::default()
2169                    },
2170                ],
2171            })
2172            .await;
2173
2174        // Client b gets the updated summaries
2175        project_b
2176            .condition(&cx_b, |project, cx| {
2177                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2178                    == &[(
2179                        ProjectPath {
2180                            worktree_id,
2181                            path: Arc::from(Path::new("a.rs")),
2182                        },
2183                        DiagnosticSummary {
2184                            error_count: 1,
2185                            warning_count: 1,
2186                            ..Default::default()
2187                        },
2188                    )]
2189            })
2190            .await;
2191
2192        // Open the file with the errors on client B. They should be present.
2193        let buffer_b = cx_b
2194            .background()
2195            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2196            .await
2197            .unwrap();
2198
2199        buffer_b.read_with(&cx_b, |buffer, _| {
2200            assert_eq!(
2201                buffer
2202                    .snapshot()
2203                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2204                    .map(|entry| entry)
2205                    .collect::<Vec<_>>(),
2206                &[
2207                    DiagnosticEntry {
2208                        range: Point::new(0, 4)..Point::new(0, 7),
2209                        diagnostic: Diagnostic {
2210                            group_id: 0,
2211                            message: "message 1".to_string(),
2212                            severity: lsp::DiagnosticSeverity::ERROR,
2213                            is_primary: true,
2214                            ..Default::default()
2215                        }
2216                    },
2217                    DiagnosticEntry {
2218                        range: Point::new(0, 10)..Point::new(0, 13),
2219                        diagnostic: Diagnostic {
2220                            group_id: 1,
2221                            severity: lsp::DiagnosticSeverity::WARNING,
2222                            message: "message 2".to_string(),
2223                            is_primary: true,
2224                            ..Default::default()
2225                        }
2226                    }
2227                ]
2228            );
2229        });
2230    }
2231
2232    #[gpui::test(iterations = 10)]
2233    async fn test_collaborating_with_completion(
2234        mut cx_a: TestAppContext,
2235        mut cx_b: TestAppContext,
2236    ) {
2237        cx_a.foreground().forbid_parking();
2238        let mut lang_registry = Arc::new(LanguageRegistry::new());
2239        let fs = FakeFs::new(cx_a.background());
2240
2241        // Set up a fake language server.
2242        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2243        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2244            completion_provider: Some(lsp::CompletionOptions {
2245                trigger_characters: Some(vec![".".to_string()]),
2246                ..Default::default()
2247            }),
2248            ..Default::default()
2249        });
2250        Arc::get_mut(&mut lang_registry)
2251            .unwrap()
2252            .add(Arc::new(Language::new(
2253                LanguageConfig {
2254                    name: "Rust".to_string(),
2255                    path_suffixes: vec!["rs".to_string()],
2256                    language_server: Some(language_server_config),
2257                    ..Default::default()
2258                },
2259                Some(tree_sitter_rust::language()),
2260            )));
2261
2262        // Connect to a server as 2 clients.
2263        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2264        let client_a = server.create_client(&mut cx_a, "user_a").await;
2265        let client_b = server.create_client(&mut cx_b, "user_b").await;
2266
2267        // Share a project as client A
2268        fs.insert_tree(
2269            "/a",
2270            json!({
2271                ".zed.toml": r#"collaborators = ["user_b"]"#,
2272                "main.rs": "fn main() { a }",
2273                "other.rs": "",
2274            }),
2275        )
2276        .await;
2277        let project_a = cx_a.update(|cx| {
2278            Project::local(
2279                client_a.clone(),
2280                client_a.user_store.clone(),
2281                lang_registry.clone(),
2282                fs.clone(),
2283                cx,
2284            )
2285        });
2286        let (worktree_a, _) = project_a
2287            .update(&mut cx_a, |p, cx| {
2288                p.find_or_create_local_worktree("/a", false, cx)
2289            })
2290            .await
2291            .unwrap();
2292        worktree_a
2293            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2294            .await;
2295        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2296        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2297        project_a
2298            .update(&mut cx_a, |p, cx| p.share(cx))
2299            .await
2300            .unwrap();
2301
2302        // Join the worktree as client B.
2303        let project_b = Project::remote(
2304            project_id,
2305            client_b.clone(),
2306            client_b.user_store.clone(),
2307            lang_registry.clone(),
2308            fs.clone(),
2309            &mut cx_b.to_async(),
2310        )
2311        .await
2312        .unwrap();
2313
2314        // Open a file in an editor as the guest.
2315        let buffer_b = project_b
2316            .update(&mut cx_b, |p, cx| {
2317                p.open_buffer((worktree_id, "main.rs"), cx)
2318            })
2319            .await
2320            .unwrap();
2321        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2322        let editor_b = cx_b.add_view(window_b, |cx| {
2323            Editor::for_buffer(
2324                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2325                Arc::new(|cx| EditorSettings::test(cx)),
2326                Some(project_b.clone()),
2327                cx,
2328            )
2329        });
2330
2331        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2332        buffer_b
2333            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2334            .await;
2335
2336        // Type a completion trigger character as the guest.
2337        editor_b.update(&mut cx_b, |editor, cx| {
2338            editor.select_ranges([13..13], None, cx);
2339            editor.handle_input(&Input(".".into()), cx);
2340            cx.focus(&editor_b);
2341        });
2342
2343        // Receive a completion request as the host's language server.
2344        // Return some completions from the host's language server.
2345        cx_a.foreground().start_waiting();
2346        fake_language_server
2347            .handle_request::<lsp::request::Completion, _>(|params| {
2348                assert_eq!(
2349                    params.text_document_position.text_document.uri,
2350                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2351                );
2352                assert_eq!(
2353                    params.text_document_position.position,
2354                    lsp::Position::new(0, 14),
2355                );
2356
2357                Some(lsp::CompletionResponse::Array(vec![
2358                    lsp::CompletionItem {
2359                        label: "first_method(…)".into(),
2360                        detail: Some("fn(&mut self, B) -> C".into()),
2361                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2362                            new_text: "first_method($1)".to_string(),
2363                            range: lsp::Range::new(
2364                                lsp::Position::new(0, 14),
2365                                lsp::Position::new(0, 14),
2366                            ),
2367                        })),
2368                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2369                        ..Default::default()
2370                    },
2371                    lsp::CompletionItem {
2372                        label: "second_method(…)".into(),
2373                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2374                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2375                            new_text: "second_method()".to_string(),
2376                            range: lsp::Range::new(
2377                                lsp::Position::new(0, 14),
2378                                lsp::Position::new(0, 14),
2379                            ),
2380                        })),
2381                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2382                        ..Default::default()
2383                    },
2384                ]))
2385            })
2386            .next()
2387            .await
2388            .unwrap();
2389        cx_a.foreground().finish_waiting();
2390
2391        // Open the buffer on the host.
2392        let buffer_a = project_a
2393            .update(&mut cx_a, |p, cx| {
2394                p.open_buffer((worktree_id, "main.rs"), cx)
2395            })
2396            .await
2397            .unwrap();
2398        buffer_a
2399            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2400            .await;
2401
2402        // Confirm a completion on the guest.
2403        editor_b
2404            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2405            .await;
2406        editor_b.update(&mut cx_b, |editor, cx| {
2407            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2408            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2409        });
2410
2411        // Return a resolved completion from the host's language server.
2412        // The resolved completion has an additional text edit.
2413        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2414            assert_eq!(params.label, "first_method(…)");
2415            lsp::CompletionItem {
2416                label: "first_method(…)".into(),
2417                detail: Some("fn(&mut self, B) -> C".into()),
2418                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2419                    new_text: "first_method($1)".to_string(),
2420                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2421                })),
2422                additional_text_edits: Some(vec![lsp::TextEdit {
2423                    new_text: "use d::SomeTrait;\n".to_string(),
2424                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2425                }]),
2426                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2427                ..Default::default()
2428            }
2429        });
2430
2431        // The additional edit is applied.
2432        buffer_a
2433            .condition(&cx_a, |buffer, _| {
2434                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2435            })
2436            .await;
2437        buffer_b
2438            .condition(&cx_b, |buffer, _| {
2439                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2440            })
2441            .await;
2442    }
2443
2444    #[gpui::test(iterations = 10)]
2445    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2446        cx_a.foreground().forbid_parking();
2447        let mut lang_registry = Arc::new(LanguageRegistry::new());
2448        let fs = FakeFs::new(cx_a.background());
2449
2450        // Set up a fake language server.
2451        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2452        Arc::get_mut(&mut lang_registry)
2453            .unwrap()
2454            .add(Arc::new(Language::new(
2455                LanguageConfig {
2456                    name: "Rust".to_string(),
2457                    path_suffixes: vec!["rs".to_string()],
2458                    language_server: Some(language_server_config),
2459                    ..Default::default()
2460                },
2461                Some(tree_sitter_rust::language()),
2462            )));
2463
2464        // Connect to a server as 2 clients.
2465        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2466        let client_a = server.create_client(&mut cx_a, "user_a").await;
2467        let client_b = server.create_client(&mut cx_b, "user_b").await;
2468
2469        // Share a project as client A
2470        fs.insert_tree(
2471            "/a",
2472            json!({
2473                ".zed.toml": r#"collaborators = ["user_b"]"#,
2474                "a.rs": "let one = two",
2475            }),
2476        )
2477        .await;
2478        let project_a = cx_a.update(|cx| {
2479            Project::local(
2480                client_a.clone(),
2481                client_a.user_store.clone(),
2482                lang_registry.clone(),
2483                fs.clone(),
2484                cx,
2485            )
2486        });
2487        let (worktree_a, _) = project_a
2488            .update(&mut cx_a, |p, cx| {
2489                p.find_or_create_local_worktree("/a", false, cx)
2490            })
2491            .await
2492            .unwrap();
2493        worktree_a
2494            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2495            .await;
2496        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2497        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2498        project_a
2499            .update(&mut cx_a, |p, cx| p.share(cx))
2500            .await
2501            .unwrap();
2502
2503        // Join the worktree as client B.
2504        let project_b = Project::remote(
2505            project_id,
2506            client_b.clone(),
2507            client_b.user_store.clone(),
2508            lang_registry.clone(),
2509            fs.clone(),
2510            &mut cx_b.to_async(),
2511        )
2512        .await
2513        .unwrap();
2514
2515        let buffer_b = cx_b
2516            .background()
2517            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2518            .await
2519            .unwrap();
2520
2521        let format = project_b.update(&mut cx_b, |project, cx| {
2522            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2523        });
2524
2525        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2526        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2527            Some(vec![
2528                lsp::TextEdit {
2529                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2530                    new_text: "h".to_string(),
2531                },
2532                lsp::TextEdit {
2533                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2534                    new_text: "y".to_string(),
2535                },
2536            ])
2537        });
2538
2539        format.await.unwrap();
2540        assert_eq!(
2541            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2542            "let honey = two"
2543        );
2544    }
2545
2546    #[gpui::test(iterations = 10)]
2547    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2548        cx_a.foreground().forbid_parking();
2549        let mut lang_registry = Arc::new(LanguageRegistry::new());
2550        let fs = FakeFs::new(cx_a.background());
2551        fs.insert_tree(
2552            "/root-1",
2553            json!({
2554                ".zed.toml": r#"collaborators = ["user_b"]"#,
2555                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2556            }),
2557        )
2558        .await;
2559        fs.insert_tree(
2560            "/root-2",
2561            json!({
2562                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2563            }),
2564        )
2565        .await;
2566
2567        // Set up a fake language server.
2568        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2569        Arc::get_mut(&mut lang_registry)
2570            .unwrap()
2571            .add(Arc::new(Language::new(
2572                LanguageConfig {
2573                    name: "Rust".to_string(),
2574                    path_suffixes: vec!["rs".to_string()],
2575                    language_server: Some(language_server_config),
2576                    ..Default::default()
2577                },
2578                Some(tree_sitter_rust::language()),
2579            )));
2580
2581        // Connect to a server as 2 clients.
2582        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2583        let client_a = server.create_client(&mut cx_a, "user_a").await;
2584        let client_b = server.create_client(&mut cx_b, "user_b").await;
2585
2586        // Share a project as client A
2587        let project_a = cx_a.update(|cx| {
2588            Project::local(
2589                client_a.clone(),
2590                client_a.user_store.clone(),
2591                lang_registry.clone(),
2592                fs.clone(),
2593                cx,
2594            )
2595        });
2596        let (worktree_a, _) = project_a
2597            .update(&mut cx_a, |p, cx| {
2598                p.find_or_create_local_worktree("/root-1", false, cx)
2599            })
2600            .await
2601            .unwrap();
2602        worktree_a
2603            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2604            .await;
2605        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2606        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2607        project_a
2608            .update(&mut cx_a, |p, cx| p.share(cx))
2609            .await
2610            .unwrap();
2611
2612        // Join the worktree as client B.
2613        let project_b = Project::remote(
2614            project_id,
2615            client_b.clone(),
2616            client_b.user_store.clone(),
2617            lang_registry.clone(),
2618            fs.clone(),
2619            &mut cx_b.to_async(),
2620        )
2621        .await
2622        .unwrap();
2623
2624        // Open the file on client B.
2625        let buffer_b = cx_b
2626            .background()
2627            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2628            .await
2629            .unwrap();
2630
2631        // Request the definition of a symbol as the guest.
2632        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2633
2634        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2635        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2636            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2637                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2638                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2639            )))
2640        });
2641
2642        let definitions_1 = definitions_1.await.unwrap();
2643        cx_b.read(|cx| {
2644            assert_eq!(definitions_1.len(), 1);
2645            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2646            let target_buffer = definitions_1[0].target_buffer.read(cx);
2647            assert_eq!(
2648                target_buffer.text(),
2649                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2650            );
2651            assert_eq!(
2652                definitions_1[0].target_range.to_point(target_buffer),
2653                Point::new(0, 6)..Point::new(0, 9)
2654            );
2655        });
2656
2657        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2658        // the previous call to `definition`.
2659        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2660        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2661            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2662                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2663                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2664            )))
2665        });
2666
2667        let definitions_2 = definitions_2.await.unwrap();
2668        cx_b.read(|cx| {
2669            assert_eq!(definitions_2.len(), 1);
2670            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2671            let target_buffer = definitions_2[0].target_buffer.read(cx);
2672            assert_eq!(
2673                target_buffer.text(),
2674                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2675            );
2676            assert_eq!(
2677                definitions_2[0].target_range.to_point(target_buffer),
2678                Point::new(1, 6)..Point::new(1, 11)
2679            );
2680        });
2681        assert_eq!(
2682            definitions_1[0].target_buffer,
2683            definitions_2[0].target_buffer
2684        );
2685
2686        cx_b.update(|_| {
2687            drop(definitions_1);
2688            drop(definitions_2);
2689        });
2690        project_b
2691            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2692            .await;
2693    }
2694
2695    #[gpui::test(iterations = 10)]
2696    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2697        mut cx_a: TestAppContext,
2698        mut cx_b: TestAppContext,
2699        mut rng: StdRng,
2700    ) {
2701        cx_a.foreground().forbid_parking();
2702        let mut lang_registry = Arc::new(LanguageRegistry::new());
2703        let fs = FakeFs::new(cx_a.background());
2704        fs.insert_tree(
2705            "/root",
2706            json!({
2707                ".zed.toml": r#"collaborators = ["user_b"]"#,
2708                "a.rs": "const ONE: usize = b::TWO;",
2709                "b.rs": "const TWO: usize = 2",
2710            }),
2711        )
2712        .await;
2713
2714        // Set up a fake language server.
2715        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2716
2717        Arc::get_mut(&mut lang_registry)
2718            .unwrap()
2719            .add(Arc::new(Language::new(
2720                LanguageConfig {
2721                    name: "Rust".to_string(),
2722                    path_suffixes: vec!["rs".to_string()],
2723                    language_server: Some(language_server_config),
2724                    ..Default::default()
2725                },
2726                Some(tree_sitter_rust::language()),
2727            )));
2728
2729        // Connect to a server as 2 clients.
2730        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2731        let client_a = server.create_client(&mut cx_a, "user_a").await;
2732        let client_b = server.create_client(&mut cx_b, "user_b").await;
2733
2734        // Share a project as client A
2735        let project_a = cx_a.update(|cx| {
2736            Project::local(
2737                client_a.clone(),
2738                client_a.user_store.clone(),
2739                lang_registry.clone(),
2740                fs.clone(),
2741                cx,
2742            )
2743        });
2744
2745        let (worktree_a, _) = project_a
2746            .update(&mut cx_a, |p, cx| {
2747                p.find_or_create_local_worktree("/root", false, cx)
2748            })
2749            .await
2750            .unwrap();
2751        worktree_a
2752            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2753            .await;
2754        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2755        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2756        project_a
2757            .update(&mut cx_a, |p, cx| p.share(cx))
2758            .await
2759            .unwrap();
2760
2761        // Join the worktree as client B.
2762        let project_b = Project::remote(
2763            project_id,
2764            client_b.clone(),
2765            client_b.user_store.clone(),
2766            lang_registry.clone(),
2767            fs.clone(),
2768            &mut cx_b.to_async(),
2769        )
2770        .await
2771        .unwrap();
2772
2773        let buffer_b1 = cx_b
2774            .background()
2775            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2776            .await
2777            .unwrap();
2778
2779        let definitions;
2780        let buffer_b2;
2781        if rng.gen() {
2782            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2783            buffer_b2 =
2784                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2785        } else {
2786            buffer_b2 =
2787                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2788            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2789        }
2790
2791        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2792        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2793            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2794                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2795                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2796            )))
2797        });
2798
2799        let buffer_b2 = buffer_b2.await.unwrap();
2800        let definitions = definitions.await.unwrap();
2801        assert_eq!(definitions.len(), 1);
2802        assert_eq!(definitions[0].target_buffer, buffer_b2);
2803    }
2804
2805    #[gpui::test(iterations = 10)]
2806    async fn test_collaborating_with_code_actions(
2807        mut cx_a: TestAppContext,
2808        mut cx_b: TestAppContext,
2809    ) {
2810        cx_a.foreground().forbid_parking();
2811        let mut lang_registry = Arc::new(LanguageRegistry::new());
2812        let fs = FakeFs::new(cx_a.background());
2813        let mut path_openers_b = Vec::new();
2814        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2815
2816        // Set up a fake language server.
2817        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2818        Arc::get_mut(&mut lang_registry)
2819            .unwrap()
2820            .add(Arc::new(Language::new(
2821                LanguageConfig {
2822                    name: "Rust".to_string(),
2823                    path_suffixes: vec!["rs".to_string()],
2824                    language_server: Some(language_server_config),
2825                    ..Default::default()
2826                },
2827                Some(tree_sitter_rust::language()),
2828            )));
2829
2830        // Connect to a server as 2 clients.
2831        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2832        let client_a = server.create_client(&mut cx_a, "user_a").await;
2833        let client_b = server.create_client(&mut cx_b, "user_b").await;
2834
2835        // Share a project as client A
2836        fs.insert_tree(
2837            "/a",
2838            json!({
2839                ".zed.toml": r#"collaborators = ["user_b"]"#,
2840                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2841                "other.rs": "pub fn foo() -> usize { 4 }",
2842            }),
2843        )
2844        .await;
2845        let project_a = cx_a.update(|cx| {
2846            Project::local(
2847                client_a.clone(),
2848                client_a.user_store.clone(),
2849                lang_registry.clone(),
2850                fs.clone(),
2851                cx,
2852            )
2853        });
2854        let (worktree_a, _) = project_a
2855            .update(&mut cx_a, |p, cx| {
2856                p.find_or_create_local_worktree("/a", false, cx)
2857            })
2858            .await
2859            .unwrap();
2860        worktree_a
2861            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2862            .await;
2863        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2864        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2865        project_a
2866            .update(&mut cx_a, |p, cx| p.share(cx))
2867            .await
2868            .unwrap();
2869
2870        // Join the worktree as client B.
2871        let project_b = Project::remote(
2872            project_id,
2873            client_b.clone(),
2874            client_b.user_store.clone(),
2875            lang_registry.clone(),
2876            fs.clone(),
2877            &mut cx_b.to_async(),
2878        )
2879        .await
2880        .unwrap();
2881        let mut params = cx_b.update(WorkspaceParams::test);
2882        params.languages = lang_registry.clone();
2883        params.client = client_b.client.clone();
2884        params.user_store = client_b.user_store.clone();
2885        params.project = project_b;
2886        params.path_openers = path_openers_b.into();
2887
2888        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2889        let editor_b = workspace_b
2890            .update(&mut cx_b, |workspace, cx| {
2891                workspace.open_path((worktree_id, "main.rs").into(), cx)
2892            })
2893            .await
2894            .unwrap()
2895            .downcast::<Editor>()
2896            .unwrap();
2897
2898        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2899        fake_language_server
2900            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2901                assert_eq!(
2902                    params.text_document.uri,
2903                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2904                );
2905                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2906                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2907                None
2908            })
2909            .next()
2910            .await;
2911
2912        // Move cursor to a location that contains code actions.
2913        editor_b.update(&mut cx_b, |editor, cx| {
2914            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2915            cx.focus(&editor_b);
2916        });
2917
2918        fake_language_server
2919            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2920                assert_eq!(
2921                    params.text_document.uri,
2922                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2923                );
2924                assert_eq!(params.range.start, lsp::Position::new(1, 31));
2925                assert_eq!(params.range.end, lsp::Position::new(1, 31));
2926
2927                Some(vec![lsp::CodeActionOrCommand::CodeAction(
2928                    lsp::CodeAction {
2929                        title: "Inline into all callers".to_string(),
2930                        edit: Some(lsp::WorkspaceEdit {
2931                            changes: Some(
2932                                [
2933                                    (
2934                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
2935                                        vec![lsp::TextEdit::new(
2936                                            lsp::Range::new(
2937                                                lsp::Position::new(1, 22),
2938                                                lsp::Position::new(1, 34),
2939                                            ),
2940                                            "4".to_string(),
2941                                        )],
2942                                    ),
2943                                    (
2944                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
2945                                        vec![lsp::TextEdit::new(
2946                                            lsp::Range::new(
2947                                                lsp::Position::new(0, 0),
2948                                                lsp::Position::new(0, 27),
2949                                            ),
2950                                            "".to_string(),
2951                                        )],
2952                                    ),
2953                                ]
2954                                .into_iter()
2955                                .collect(),
2956                            ),
2957                            ..Default::default()
2958                        }),
2959                        data: Some(json!({
2960                            "codeActionParams": {
2961                                "range": {
2962                                    "start": {"line": 1, "column": 31},
2963                                    "end": {"line": 1, "column": 31},
2964                                }
2965                            }
2966                        })),
2967                        ..Default::default()
2968                    },
2969                )])
2970            })
2971            .next()
2972            .await;
2973
2974        // Toggle code actions and wait for them to display.
2975        editor_b.update(&mut cx_b, |editor, cx| {
2976            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2977        });
2978        editor_b
2979            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2980            .await;
2981
2982        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
2983
2984        // Confirming the code action will trigger a resolve request.
2985        let confirm_action = workspace_b
2986            .update(&mut cx_b, |workspace, cx| {
2987                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2988            })
2989            .unwrap();
2990        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2991            lsp::CodeAction {
2992                title: "Inline into all callers".to_string(),
2993                edit: Some(lsp::WorkspaceEdit {
2994                    changes: Some(
2995                        [
2996                            (
2997                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2998                                vec![lsp::TextEdit::new(
2999                                    lsp::Range::new(
3000                                        lsp::Position::new(1, 22),
3001                                        lsp::Position::new(1, 34),
3002                                    ),
3003                                    "4".to_string(),
3004                                )],
3005                            ),
3006                            (
3007                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3008                                vec![lsp::TextEdit::new(
3009                                    lsp::Range::new(
3010                                        lsp::Position::new(0, 0),
3011                                        lsp::Position::new(0, 27),
3012                                    ),
3013                                    "".to_string(),
3014                                )],
3015                            ),
3016                        ]
3017                        .into_iter()
3018                        .collect(),
3019                    ),
3020                    ..Default::default()
3021                }),
3022                ..Default::default()
3023            }
3024        });
3025
3026        // After the action is confirmed, an editor containing both modified files is opened.
3027        confirm_action.await.unwrap();
3028        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3029            workspace
3030                .active_item(cx)
3031                .unwrap()
3032                .downcast::<Editor>()
3033                .unwrap()
3034        });
3035        code_action_editor.update(&mut cx_b, |editor, cx| {
3036            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3037            editor.undo(&Undo, cx);
3038            assert_eq!(
3039                editor.text(cx),
3040                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3041            );
3042            editor.redo(&Redo, cx);
3043            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3044        });
3045    }
3046
3047    #[gpui::test(iterations = 10)]
3048    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3049        cx_a.foreground().forbid_parking();
3050        let mut lang_registry = Arc::new(LanguageRegistry::new());
3051        let fs = FakeFs::new(cx_a.background());
3052        let mut path_openers_b = Vec::new();
3053        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3054
3055        // Set up a fake language server.
3056        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3057        Arc::get_mut(&mut lang_registry)
3058            .unwrap()
3059            .add(Arc::new(Language::new(
3060                LanguageConfig {
3061                    name: "Rust".to_string(),
3062                    path_suffixes: vec!["rs".to_string()],
3063                    language_server: Some(language_server_config),
3064                    ..Default::default()
3065                },
3066                Some(tree_sitter_rust::language()),
3067            )));
3068
3069        // Connect to a server as 2 clients.
3070        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3071        let client_a = server.create_client(&mut cx_a, "user_a").await;
3072        let client_b = server.create_client(&mut cx_b, "user_b").await;
3073
3074        // Share a project as client A
3075        fs.insert_tree(
3076            "/dir",
3077            json!({
3078                ".zed.toml": r#"collaborators = ["user_b"]"#,
3079                "one.rs": "const ONE: usize = 1;",
3080                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3081            }),
3082        )
3083        .await;
3084        let project_a = cx_a.update(|cx| {
3085            Project::local(
3086                client_a.clone(),
3087                client_a.user_store.clone(),
3088                lang_registry.clone(),
3089                fs.clone(),
3090                cx,
3091            )
3092        });
3093        let (worktree_a, _) = project_a
3094            .update(&mut cx_a, |p, cx| {
3095                p.find_or_create_local_worktree("/dir", false, cx)
3096            })
3097            .await
3098            .unwrap();
3099        worktree_a
3100            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3101            .await;
3102        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3103        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3104        project_a
3105            .update(&mut cx_a, |p, cx| p.share(cx))
3106            .await
3107            .unwrap();
3108
3109        // Join the worktree as client B.
3110        let project_b = Project::remote(
3111            project_id,
3112            client_b.clone(),
3113            client_b.user_store.clone(),
3114            lang_registry.clone(),
3115            fs.clone(),
3116            &mut cx_b.to_async(),
3117        )
3118        .await
3119        .unwrap();
3120        let mut params = cx_b.update(WorkspaceParams::test);
3121        params.languages = lang_registry.clone();
3122        params.client = client_b.client.clone();
3123        params.user_store = client_b.user_store.clone();
3124        params.project = project_b;
3125        params.path_openers = path_openers_b.into();
3126
3127        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3128        let editor_b = workspace_b
3129            .update(&mut cx_b, |workspace, cx| {
3130                workspace.open_path((worktree_id, "one.rs").into(), cx)
3131            })
3132            .await
3133            .unwrap()
3134            .downcast::<Editor>()
3135            .unwrap();
3136        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3137
3138        // Move cursor to a location that can be renamed.
3139        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3140            editor.select_ranges([7..7], None, cx);
3141            editor.rename(&Rename, cx).unwrap()
3142        });
3143
3144        fake_language_server
3145            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
3146                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3147                assert_eq!(params.position, lsp::Position::new(0, 7));
3148                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3149                    lsp::Position::new(0, 6),
3150                    lsp::Position::new(0, 9),
3151                )))
3152            })
3153            .next()
3154            .await
3155            .unwrap();
3156        prepare_rename.await.unwrap();
3157        editor_b.update(&mut cx_b, |editor, cx| {
3158            let rename = editor.pending_rename().unwrap();
3159            let buffer = editor.buffer().read(cx).snapshot(cx);
3160            assert_eq!(
3161                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3162                6..9
3163            );
3164            rename.editor.update(cx, |rename_editor, cx| {
3165                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3166                    rename_buffer.edit([0..3], "THREE", cx);
3167                });
3168            });
3169        });
3170
3171        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3172            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3173        });
3174        fake_language_server
3175            .handle_request::<lsp::request::Rename, _>(|params| {
3176                assert_eq!(
3177                    params.text_document_position.text_document.uri.as_str(),
3178                    "file:///dir/one.rs"
3179                );
3180                assert_eq!(
3181                    params.text_document_position.position,
3182                    lsp::Position::new(0, 6)
3183                );
3184                assert_eq!(params.new_name, "THREE");
3185                Some(lsp::WorkspaceEdit {
3186                    changes: Some(
3187                        [
3188                            (
3189                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3190                                vec![lsp::TextEdit::new(
3191                                    lsp::Range::new(
3192                                        lsp::Position::new(0, 6),
3193                                        lsp::Position::new(0, 9),
3194                                    ),
3195                                    "THREE".to_string(),
3196                                )],
3197                            ),
3198                            (
3199                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3200                                vec![
3201                                    lsp::TextEdit::new(
3202                                        lsp::Range::new(
3203                                            lsp::Position::new(0, 24),
3204                                            lsp::Position::new(0, 27),
3205                                        ),
3206                                        "THREE".to_string(),
3207                                    ),
3208                                    lsp::TextEdit::new(
3209                                        lsp::Range::new(
3210                                            lsp::Position::new(0, 35),
3211                                            lsp::Position::new(0, 38),
3212                                        ),
3213                                        "THREE".to_string(),
3214                                    ),
3215                                ],
3216                            ),
3217                        ]
3218                        .into_iter()
3219                        .collect(),
3220                    ),
3221                    ..Default::default()
3222                })
3223            })
3224            .next()
3225            .await
3226            .unwrap();
3227        confirm_rename.await.unwrap();
3228
3229        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3230            workspace
3231                .active_item(cx)
3232                .unwrap()
3233                .downcast::<Editor>()
3234                .unwrap()
3235        });
3236        rename_editor.update(&mut cx_b, |editor, cx| {
3237            assert_eq!(
3238                editor.text(cx),
3239                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3240            );
3241            editor.undo(&Undo, cx);
3242            assert_eq!(
3243                editor.text(cx),
3244                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3245            );
3246            editor.redo(&Redo, cx);
3247            assert_eq!(
3248                editor.text(cx),
3249                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3250            );
3251        });
3252
3253        // Ensure temporary rename edits cannot be undone/redone.
3254        editor_b.update(&mut cx_b, |editor, cx| {
3255            editor.undo(&Undo, cx);
3256            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3257            editor.undo(&Undo, cx);
3258            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3259            editor.redo(&Redo, cx);
3260            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3261        })
3262    }
3263
3264    #[gpui::test(iterations = 10)]
3265    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3266        cx_a.foreground().forbid_parking();
3267
3268        // Connect to a server as 2 clients.
3269        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3270        let client_a = server.create_client(&mut cx_a, "user_a").await;
3271        let client_b = server.create_client(&mut cx_b, "user_b").await;
3272
3273        // Create an org that includes these 2 users.
3274        let db = &server.app_state.db;
3275        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3276        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3277            .await
3278            .unwrap();
3279        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3280            .await
3281            .unwrap();
3282
3283        // Create a channel that includes all the users.
3284        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3285        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3286            .await
3287            .unwrap();
3288        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3289            .await
3290            .unwrap();
3291        db.create_channel_message(
3292            channel_id,
3293            client_b.current_user_id(&cx_b),
3294            "hello A, it's B.",
3295            OffsetDateTime::now_utc(),
3296            1,
3297        )
3298        .await
3299        .unwrap();
3300
3301        let channels_a = cx_a
3302            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3303        channels_a
3304            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3305            .await;
3306        channels_a.read_with(&cx_a, |list, _| {
3307            assert_eq!(
3308                list.available_channels().unwrap(),
3309                &[ChannelDetails {
3310                    id: channel_id.to_proto(),
3311                    name: "test-channel".to_string()
3312                }]
3313            )
3314        });
3315        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3316            this.get_channel(channel_id.to_proto(), cx).unwrap()
3317        });
3318        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3319        channel_a
3320            .condition(&cx_a, |channel, _| {
3321                channel_messages(channel)
3322                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3323            })
3324            .await;
3325
3326        let channels_b = cx_b
3327            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3328        channels_b
3329            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3330            .await;
3331        channels_b.read_with(&cx_b, |list, _| {
3332            assert_eq!(
3333                list.available_channels().unwrap(),
3334                &[ChannelDetails {
3335                    id: channel_id.to_proto(),
3336                    name: "test-channel".to_string()
3337                }]
3338            )
3339        });
3340
3341        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3342            this.get_channel(channel_id.to_proto(), cx).unwrap()
3343        });
3344        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3345        channel_b
3346            .condition(&cx_b, |channel, _| {
3347                channel_messages(channel)
3348                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3349            })
3350            .await;
3351
3352        channel_a
3353            .update(&mut cx_a, |channel, cx| {
3354                channel
3355                    .send_message("oh, hi B.".to_string(), cx)
3356                    .unwrap()
3357                    .detach();
3358                let task = channel.send_message("sup".to_string(), cx).unwrap();
3359                assert_eq!(
3360                    channel_messages(channel),
3361                    &[
3362                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3363                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3364                        ("user_a".to_string(), "sup".to_string(), true)
3365                    ]
3366                );
3367                task
3368            })
3369            .await
3370            .unwrap();
3371
3372        channel_b
3373            .condition(&cx_b, |channel, _| {
3374                channel_messages(channel)
3375                    == [
3376                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3377                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3378                        ("user_a".to_string(), "sup".to_string(), false),
3379                    ]
3380            })
3381            .await;
3382
3383        assert_eq!(
3384            server
3385                .state()
3386                .await
3387                .channel(channel_id)
3388                .unwrap()
3389                .connection_ids
3390                .len(),
3391            2
3392        );
3393        cx_b.update(|_| drop(channel_b));
3394        server
3395            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3396            .await;
3397
3398        cx_a.update(|_| drop(channel_a));
3399        server
3400            .condition(|state| state.channel(channel_id).is_none())
3401            .await;
3402    }
3403
3404    #[gpui::test(iterations = 10)]
3405    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3406        cx_a.foreground().forbid_parking();
3407
3408        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3409        let client_a = server.create_client(&mut cx_a, "user_a").await;
3410
3411        let db = &server.app_state.db;
3412        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3413        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3414        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3415            .await
3416            .unwrap();
3417        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3418            .await
3419            .unwrap();
3420
3421        let channels_a = cx_a
3422            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3423        channels_a
3424            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3425            .await;
3426        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3427            this.get_channel(channel_id.to_proto(), cx).unwrap()
3428        });
3429
3430        // Messages aren't allowed to be too long.
3431        channel_a
3432            .update(&mut cx_a, |channel, cx| {
3433                let long_body = "this is long.\n".repeat(1024);
3434                channel.send_message(long_body, cx).unwrap()
3435            })
3436            .await
3437            .unwrap_err();
3438
3439        // Messages aren't allowed to be blank.
3440        channel_a.update(&mut cx_a, |channel, cx| {
3441            channel.send_message(String::new(), cx).unwrap_err()
3442        });
3443
3444        // Leading and trailing whitespace are trimmed.
3445        channel_a
3446            .update(&mut cx_a, |channel, cx| {
3447                channel
3448                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3449                    .unwrap()
3450            })
3451            .await
3452            .unwrap();
3453        assert_eq!(
3454            db.get_channel_messages(channel_id, 10, None)
3455                .await
3456                .unwrap()
3457                .iter()
3458                .map(|m| &m.body)
3459                .collect::<Vec<_>>(),
3460            &["surrounded by whitespace"]
3461        );
3462    }
3463
3464    #[gpui::test(iterations = 10)]
3465    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3466        cx_a.foreground().forbid_parking();
3467
3468        // Connect to a server as 2 clients.
3469        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3470        let client_a = server.create_client(&mut cx_a, "user_a").await;
3471        let client_b = server.create_client(&mut cx_b, "user_b").await;
3472        let mut status_b = client_b.status();
3473
3474        // Create an org that includes these 2 users.
3475        let db = &server.app_state.db;
3476        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3477        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3478            .await
3479            .unwrap();
3480        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3481            .await
3482            .unwrap();
3483
3484        // Create a channel that includes all the users.
3485        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3486        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3487            .await
3488            .unwrap();
3489        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3490            .await
3491            .unwrap();
3492        db.create_channel_message(
3493            channel_id,
3494            client_b.current_user_id(&cx_b),
3495            "hello A, it's B.",
3496            OffsetDateTime::now_utc(),
3497            2,
3498        )
3499        .await
3500        .unwrap();
3501
3502        let channels_a = cx_a
3503            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3504        channels_a
3505            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3506            .await;
3507
3508        channels_a.read_with(&cx_a, |list, _| {
3509            assert_eq!(
3510                list.available_channels().unwrap(),
3511                &[ChannelDetails {
3512                    id: channel_id.to_proto(),
3513                    name: "test-channel".to_string()
3514                }]
3515            )
3516        });
3517        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3518            this.get_channel(channel_id.to_proto(), cx).unwrap()
3519        });
3520        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3521        channel_a
3522            .condition(&cx_a, |channel, _| {
3523                channel_messages(channel)
3524                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3525            })
3526            .await;
3527
3528        let channels_b = cx_b
3529            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3530        channels_b
3531            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3532            .await;
3533        channels_b.read_with(&cx_b, |list, _| {
3534            assert_eq!(
3535                list.available_channels().unwrap(),
3536                &[ChannelDetails {
3537                    id: channel_id.to_proto(),
3538                    name: "test-channel".to_string()
3539                }]
3540            )
3541        });
3542
3543        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3544            this.get_channel(channel_id.to_proto(), cx).unwrap()
3545        });
3546        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3547        channel_b
3548            .condition(&cx_b, |channel, _| {
3549                channel_messages(channel)
3550                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3551            })
3552            .await;
3553
3554        // Disconnect client B, ensuring we can still access its cached channel data.
3555        server.forbid_connections();
3556        server.disconnect_client(client_b.current_user_id(&cx_b));
3557        while !matches!(
3558            status_b.next().await,
3559            Some(client::Status::ReconnectionError { .. })
3560        ) {}
3561
3562        channels_b.read_with(&cx_b, |channels, _| {
3563            assert_eq!(
3564                channels.available_channels().unwrap(),
3565                [ChannelDetails {
3566                    id: channel_id.to_proto(),
3567                    name: "test-channel".to_string()
3568                }]
3569            )
3570        });
3571        channel_b.read_with(&cx_b, |channel, _| {
3572            assert_eq!(
3573                channel_messages(channel),
3574                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3575            )
3576        });
3577
3578        // Send a message from client B while it is disconnected.
3579        channel_b
3580            .update(&mut cx_b, |channel, cx| {
3581                let task = channel
3582                    .send_message("can you see this?".to_string(), cx)
3583                    .unwrap();
3584                assert_eq!(
3585                    channel_messages(channel),
3586                    &[
3587                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3588                        ("user_b".to_string(), "can you see this?".to_string(), true)
3589                    ]
3590                );
3591                task
3592            })
3593            .await
3594            .unwrap_err();
3595
3596        // Send a message from client A while B is disconnected.
3597        channel_a
3598            .update(&mut cx_a, |channel, cx| {
3599                channel
3600                    .send_message("oh, hi B.".to_string(), cx)
3601                    .unwrap()
3602                    .detach();
3603                let task = channel.send_message("sup".to_string(), cx).unwrap();
3604                assert_eq!(
3605                    channel_messages(channel),
3606                    &[
3607                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3608                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3609                        ("user_a".to_string(), "sup".to_string(), true)
3610                    ]
3611                );
3612                task
3613            })
3614            .await
3615            .unwrap();
3616
3617        // Give client B a chance to reconnect.
3618        server.allow_connections();
3619        cx_b.foreground().advance_clock(Duration::from_secs(10));
3620
3621        // Verify that B sees the new messages upon reconnection, as well as the message client B
3622        // sent while offline.
3623        channel_b
3624            .condition(&cx_b, |channel, _| {
3625                channel_messages(channel)
3626                    == [
3627                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3628                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3629                        ("user_a".to_string(), "sup".to_string(), false),
3630                        ("user_b".to_string(), "can you see this?".to_string(), false),
3631                    ]
3632            })
3633            .await;
3634
3635        // Ensure client A and B can communicate normally after reconnection.
3636        channel_a
3637            .update(&mut cx_a, |channel, cx| {
3638                channel.send_message("you online?".to_string(), cx).unwrap()
3639            })
3640            .await
3641            .unwrap();
3642        channel_b
3643            .condition(&cx_b, |channel, _| {
3644                channel_messages(channel)
3645                    == [
3646                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3647                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3648                        ("user_a".to_string(), "sup".to_string(), false),
3649                        ("user_b".to_string(), "can you see this?".to_string(), false),
3650                        ("user_a".to_string(), "you online?".to_string(), false),
3651                    ]
3652            })
3653            .await;
3654
3655        channel_b
3656            .update(&mut cx_b, |channel, cx| {
3657                channel.send_message("yep".to_string(), cx).unwrap()
3658            })
3659            .await
3660            .unwrap();
3661        channel_a
3662            .condition(&cx_a, |channel, _| {
3663                channel_messages(channel)
3664                    == [
3665                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3666                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3667                        ("user_a".to_string(), "sup".to_string(), false),
3668                        ("user_b".to_string(), "can you see this?".to_string(), false),
3669                        ("user_a".to_string(), "you online?".to_string(), false),
3670                        ("user_b".to_string(), "yep".to_string(), false),
3671                    ]
3672            })
3673            .await;
3674    }
3675
3676    #[gpui::test(iterations = 10)]
3677    async fn test_contacts(
3678        mut cx_a: TestAppContext,
3679        mut cx_b: TestAppContext,
3680        mut cx_c: TestAppContext,
3681    ) {
3682        cx_a.foreground().forbid_parking();
3683        let lang_registry = Arc::new(LanguageRegistry::new());
3684        let fs = FakeFs::new(cx_a.background());
3685
3686        // Connect to a server as 3 clients.
3687        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3688        let client_a = server.create_client(&mut cx_a, "user_a").await;
3689        let client_b = server.create_client(&mut cx_b, "user_b").await;
3690        let client_c = server.create_client(&mut cx_c, "user_c").await;
3691
3692        // Share a worktree as client A.
3693        fs.insert_tree(
3694            "/a",
3695            json!({
3696                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3697            }),
3698        )
3699        .await;
3700
3701        let project_a = cx_a.update(|cx| {
3702            Project::local(
3703                client_a.clone(),
3704                client_a.user_store.clone(),
3705                lang_registry.clone(),
3706                fs.clone(),
3707                cx,
3708            )
3709        });
3710        let (worktree_a, _) = project_a
3711            .update(&mut cx_a, |p, cx| {
3712                p.find_or_create_local_worktree("/a", false, cx)
3713            })
3714            .await
3715            .unwrap();
3716        worktree_a
3717            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3718            .await;
3719
3720        client_a
3721            .user_store
3722            .condition(&cx_a, |user_store, _| {
3723                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3724            })
3725            .await;
3726        client_b
3727            .user_store
3728            .condition(&cx_b, |user_store, _| {
3729                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3730            })
3731            .await;
3732        client_c
3733            .user_store
3734            .condition(&cx_c, |user_store, _| {
3735                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3736            })
3737            .await;
3738
3739        let project_id = project_a
3740            .update(&mut cx_a, |project, _| project.next_remote_id())
3741            .await;
3742        project_a
3743            .update(&mut cx_a, |project, cx| project.share(cx))
3744            .await
3745            .unwrap();
3746
3747        let _project_b = Project::remote(
3748            project_id,
3749            client_b.clone(),
3750            client_b.user_store.clone(),
3751            lang_registry.clone(),
3752            fs.clone(),
3753            &mut cx_b.to_async(),
3754        )
3755        .await
3756        .unwrap();
3757
3758        client_a
3759            .user_store
3760            .condition(&cx_a, |user_store, _| {
3761                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3762            })
3763            .await;
3764        client_b
3765            .user_store
3766            .condition(&cx_b, |user_store, _| {
3767                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3768            })
3769            .await;
3770        client_c
3771            .user_store
3772            .condition(&cx_c, |user_store, _| {
3773                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3774            })
3775            .await;
3776
3777        project_a
3778            .condition(&cx_a, |project, _| {
3779                project.collaborators().contains_key(&client_b.peer_id)
3780            })
3781            .await;
3782
3783        cx_a.update(move |_| drop(project_a));
3784        client_a
3785            .user_store
3786            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3787            .await;
3788        client_b
3789            .user_store
3790            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3791            .await;
3792        client_c
3793            .user_store
3794            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3795            .await;
3796
3797        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3798            user_store
3799                .contacts()
3800                .iter()
3801                .map(|contact| {
3802                    let worktrees = contact
3803                        .projects
3804                        .iter()
3805                        .map(|p| {
3806                            (
3807                                p.worktree_root_names[0].as_str(),
3808                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3809                            )
3810                        })
3811                        .collect();
3812                    (contact.user.github_login.as_str(), worktrees)
3813                })
3814                .collect()
3815        }
3816    }
3817
3818    #[gpui::test(iterations = 100)]
3819    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
3820        cx.foreground().forbid_parking();
3821        let max_peers = env::var("MAX_PEERS")
3822            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3823            .unwrap_or(5);
3824        let max_operations = env::var("OPERATIONS")
3825            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3826            .unwrap_or(10);
3827
3828        let rng = Rc::new(RefCell::new(rng));
3829
3830        let mut host_lang_registry = Arc::new(LanguageRegistry::new());
3831        let guest_lang_registry = Arc::new(LanguageRegistry::new());
3832
3833        // Set up a fake language server.
3834        let (mut language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
3835        language_server_config.set_fake_initializer(|fake_server| {
3836            fake_server.handle_request::<lsp::request::Completion, _>(|_| {
3837                Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
3838                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3839                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3840                        new_text: "the-new-text".to_string(),
3841                    })),
3842                    ..Default::default()
3843                }]))
3844            });
3845
3846            fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
3847                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3848                    lsp::CodeAction {
3849                        title: "the-code-action".to_string(),
3850                        ..Default::default()
3851                    },
3852                )])
3853            });
3854
3855            fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
3856                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3857                    params.position,
3858                    params.position,
3859                )))
3860            });
3861        });
3862
3863        Arc::get_mut(&mut host_lang_registry)
3864            .unwrap()
3865            .add(Arc::new(Language::new(
3866                LanguageConfig {
3867                    name: "Rust".to_string(),
3868                    path_suffixes: vec!["rs".to_string()],
3869                    language_server: Some(language_server_config),
3870                    ..Default::default()
3871                },
3872                None,
3873            )));
3874
3875        let fs = FakeFs::new(cx.background());
3876        fs.insert_tree(
3877            "/_collab",
3878            json!({
3879                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3880            }),
3881        )
3882        .await;
3883
3884        let operations = Rc::new(Cell::new(0));
3885        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
3886        let mut clients = Vec::new();
3887
3888        let mut next_entity_id = 100000;
3889        let mut host_cx = TestAppContext::new(
3890            cx.foreground_platform(),
3891            cx.platform(),
3892            cx.foreground(),
3893            cx.background(),
3894            cx.font_cache(),
3895            next_entity_id,
3896        );
3897        let host = server.create_client(&mut host_cx, "host").await;
3898        let host_project = host_cx.update(|cx| {
3899            Project::local(
3900                host.client.clone(),
3901                host.user_store.clone(),
3902                host_lang_registry.clone(),
3903                fs.clone(),
3904                cx,
3905            )
3906        });
3907        let host_project_id = host_project
3908            .update(&mut host_cx, |p, _| p.next_remote_id())
3909            .await;
3910
3911        let (collab_worktree, _) = host_project
3912            .update(&mut host_cx, |project, cx| {
3913                project.find_or_create_local_worktree("/_collab", false, cx)
3914            })
3915            .await
3916            .unwrap();
3917        collab_worktree
3918            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3919            .await;
3920        host_project
3921            .update(&mut host_cx, |project, cx| project.share(cx))
3922            .await
3923            .unwrap();
3924
3925        clients.push(cx.foreground().spawn(host.simulate_host(
3926            host_project.clone(),
3927            operations.clone(),
3928            max_operations,
3929            rng.clone(),
3930            host_cx.clone(),
3931        )));
3932
3933        while operations.get() < max_operations {
3934            cx.background().simulate_random_delay().await;
3935            if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3936                operations.set(operations.get() + 1);
3937
3938                let guest_id = clients.len();
3939                log::info!("Adding guest {}", guest_id);
3940                next_entity_id += 100000;
3941                let mut guest_cx = TestAppContext::new(
3942                    cx.foreground_platform(),
3943                    cx.platform(),
3944                    cx.foreground(),
3945                    cx.background(),
3946                    cx.font_cache(),
3947                    next_entity_id,
3948                );
3949                let guest = server
3950                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3951                    .await;
3952                let guest_project = Project::remote(
3953                    host_project_id,
3954                    guest.client.clone(),
3955                    guest.user_store.clone(),
3956                    guest_lang_registry.clone(),
3957                    fs.clone(),
3958                    &mut guest_cx.to_async(),
3959                )
3960                .await
3961                .unwrap();
3962                clients.push(cx.foreground().spawn(guest.simulate_guest(
3963                    guest_id,
3964                    guest_project,
3965                    operations.clone(),
3966                    max_operations,
3967                    rng.clone(),
3968                    guest_cx,
3969                )));
3970
3971                log::info!("Guest {} added", guest_id);
3972            }
3973        }
3974
3975        let clients = futures::future::join_all(clients).await;
3976        cx.foreground().run_until_parked();
3977
3978        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3979            project
3980                .worktrees(cx)
3981                .map(|worktree| {
3982                    let snapshot = worktree.read(cx).snapshot();
3983                    (snapshot.id(), snapshot)
3984                })
3985                .collect::<BTreeMap<_, _>>()
3986        });
3987
3988        for (guest_client, guest_cx) in clients.iter().skip(1) {
3989            let guest_id = guest_client.client.id();
3990            let worktree_snapshots =
3991                guest_client
3992                    .project
3993                    .as_ref()
3994                    .unwrap()
3995                    .read_with(guest_cx, |project, cx| {
3996                        project
3997                            .worktrees(cx)
3998                            .map(|worktree| {
3999                                let worktree = worktree.read(cx);
4000                                assert!(
4001                                    !worktree.as_remote().unwrap().has_pending_updates(),
4002                                    "Guest {} worktree {:?} contains deferred updates",
4003                                    guest_id,
4004                                    worktree.id()
4005                                );
4006                                (worktree.id(), worktree.snapshot())
4007                            })
4008                            .collect::<BTreeMap<_, _>>()
4009                    });
4010
4011            assert_eq!(
4012                worktree_snapshots.keys().collect::<Vec<_>>(),
4013                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4014                "guest {} has different worktrees than the host",
4015                guest_id
4016            );
4017            for (id, host_snapshot) in &host_worktree_snapshots {
4018                let guest_snapshot = &worktree_snapshots[id];
4019                assert_eq!(
4020                    guest_snapshot.root_name(),
4021                    host_snapshot.root_name(),
4022                    "guest {} has different root name than the host for worktree {}",
4023                    guest_id,
4024                    id
4025                );
4026                assert_eq!(
4027                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4028                    host_snapshot.entries(false).collect::<Vec<_>>(),
4029                    "guest {} has different snapshot than the host for worktree {}",
4030                    guest_id,
4031                    id
4032                );
4033            }
4034
4035            guest_client
4036                .project
4037                .as_ref()
4038                .unwrap()
4039                .read_with(guest_cx, |project, _| {
4040                    assert!(
4041                        !project.has_buffered_operations(),
4042                        "guest {} has buffered operations ",
4043                        guest_id,
4044                    );
4045                });
4046
4047            for guest_buffer in &guest_client.buffers {
4048                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4049                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4050                    project
4051                        .shared_buffer(guest_client.peer_id, buffer_id)
4052                        .expect(&format!(
4053                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4054                            guest_id, guest_client.peer_id, buffer_id
4055                        ))
4056                });
4057                assert_eq!(
4058                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4059                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4060                    "guest {} buffer {} differs from the host's buffer",
4061                    guest_id,
4062                    buffer_id,
4063                );
4064            }
4065        }
4066    }
4067
4068    struct TestServer {
4069        peer: Arc<Peer>,
4070        app_state: Arc<AppState>,
4071        server: Arc<Server>,
4072        foreground: Rc<executor::Foreground>,
4073        notifications: mpsc::UnboundedReceiver<()>,
4074        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4075        forbid_connections: Arc<AtomicBool>,
4076        _test_db: TestDb,
4077    }
4078
4079    impl TestServer {
4080        async fn start(
4081            foreground: Rc<executor::Foreground>,
4082            background: Arc<executor::Background>,
4083        ) -> Self {
4084            let test_db = TestDb::fake(background);
4085            let app_state = Self::build_app_state(&test_db).await;
4086            let peer = Peer::new();
4087            let notifications = mpsc::unbounded();
4088            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4089            Self {
4090                peer,
4091                app_state,
4092                server,
4093                foreground,
4094                notifications: notifications.1,
4095                connection_killers: Default::default(),
4096                forbid_connections: Default::default(),
4097                _test_db: test_db,
4098            }
4099        }
4100
4101        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4102            let http = FakeHttpClient::with_404_response();
4103            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4104            let client_name = name.to_string();
4105            let mut client = Client::new(http.clone());
4106            let server = self.server.clone();
4107            let connection_killers = self.connection_killers.clone();
4108            let forbid_connections = self.forbid_connections.clone();
4109            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4110
4111            Arc::get_mut(&mut client)
4112                .unwrap()
4113                .override_authenticate(move |cx| {
4114                    cx.spawn(|_| async move {
4115                        let access_token = "the-token".to_string();
4116                        Ok(Credentials {
4117                            user_id: user_id.0 as u64,
4118                            access_token,
4119                        })
4120                    })
4121                })
4122                .override_establish_connection(move |credentials, cx| {
4123                    assert_eq!(credentials.user_id, user_id.0 as u64);
4124                    assert_eq!(credentials.access_token, "the-token");
4125
4126                    let server = server.clone();
4127                    let connection_killers = connection_killers.clone();
4128                    let forbid_connections = forbid_connections.clone();
4129                    let client_name = client_name.clone();
4130                    let connection_id_tx = connection_id_tx.clone();
4131                    cx.spawn(move |cx| async move {
4132                        if forbid_connections.load(SeqCst) {
4133                            Err(EstablishConnectionError::other(anyhow!(
4134                                "server is forbidding connections"
4135                            )))
4136                        } else {
4137                            let (client_conn, server_conn, kill_conn) =
4138                                Connection::in_memory(cx.background());
4139                            connection_killers.lock().insert(user_id, kill_conn);
4140                            cx.background()
4141                                .spawn(server.handle_connection(
4142                                    server_conn,
4143                                    client_name,
4144                                    user_id,
4145                                    Some(connection_id_tx),
4146                                    cx.background(),
4147                                ))
4148                                .detach();
4149                            Ok(client_conn)
4150                        }
4151                    })
4152                });
4153
4154            client
4155                .authenticate_and_connect(&cx.to_async())
4156                .await
4157                .unwrap();
4158
4159            Channel::init(&client);
4160            Project::init(&client);
4161
4162            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4163            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4164            let mut authed_user =
4165                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4166            while authed_user.next().await.unwrap().is_none() {}
4167
4168            TestClient {
4169                client,
4170                peer_id,
4171                user_store,
4172                project: Default::default(),
4173                buffers: Default::default(),
4174            }
4175        }
4176
4177        fn disconnect_client(&self, user_id: UserId) {
4178            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4179                let _ = kill_conn.try_send(Some(()));
4180            }
4181        }
4182
4183        fn forbid_connections(&self) {
4184            self.forbid_connections.store(true, SeqCst);
4185        }
4186
4187        fn allow_connections(&self) {
4188            self.forbid_connections.store(false, SeqCst);
4189        }
4190
4191        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4192            let mut config = Config::default();
4193            config.session_secret = "a".repeat(32);
4194            config.database_url = test_db.url.clone();
4195            let github_client = github::AppClient::test();
4196            Arc::new(AppState {
4197                db: test_db.db().clone(),
4198                handlebars: Default::default(),
4199                auth_client: auth::build_client("", ""),
4200                repo_client: github::RepoClient::test(&github_client),
4201                github_client,
4202                config,
4203            })
4204        }
4205
4206        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4207            self.server.store.read()
4208        }
4209
4210        async fn condition<F>(&mut self, mut predicate: F)
4211        where
4212            F: FnMut(&Store) -> bool,
4213        {
4214            async_std::future::timeout(Duration::from_millis(500), async {
4215                while !(predicate)(&*self.server.store.read()) {
4216                    self.foreground.start_waiting();
4217                    self.notifications.next().await;
4218                    self.foreground.finish_waiting();
4219                }
4220            })
4221            .await
4222            .expect("condition timed out");
4223        }
4224    }
4225
4226    impl Drop for TestServer {
4227        fn drop(&mut self) {
4228            self.peer.reset();
4229        }
4230    }
4231
4232    struct TestClient {
4233        client: Arc<Client>,
4234        pub peer_id: PeerId,
4235        pub user_store: ModelHandle<UserStore>,
4236        project: Option<ModelHandle<Project>>,
4237        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4238    }
4239
4240    impl Deref for TestClient {
4241        type Target = Arc<Client>;
4242
4243        fn deref(&self) -> &Self::Target {
4244            &self.client
4245        }
4246    }
4247
4248    impl TestClient {
4249        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4250            UserId::from_proto(
4251                self.user_store
4252                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4253            )
4254        }
4255
4256        async fn simulate_host(
4257            mut self,
4258            project: ModelHandle<Project>,
4259            operations: Rc<Cell<usize>>,
4260            max_operations: usize,
4261            rng: Rc<RefCell<StdRng>>,
4262            mut cx: TestAppContext,
4263        ) -> (Self, TestAppContext) {
4264            let fs = project.read_with(&cx, |project, _| project.fs().clone());
4265            let mut files: Vec<PathBuf> = Default::default();
4266            while operations.get() < max_operations {
4267                operations.set(operations.get() + 1);
4268
4269                let distribution = rng.borrow_mut().gen_range(0..100);
4270                match distribution {
4271                    0..=20 if !files.is_empty() => {
4272                        let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
4273                        while let Some(parent_path) = path.parent() {
4274                            path = parent_path;
4275                            if rng.borrow_mut().gen() {
4276                                break;
4277                            }
4278                        }
4279
4280                        log::info!("Host: find/create local worktree {:?}", path);
4281                        project
4282                            .update(&mut cx, |project, cx| {
4283                                project.find_or_create_local_worktree(path, false, cx)
4284                            })
4285                            .await
4286                            .unwrap();
4287                    }
4288                    10..=80 if !files.is_empty() => {
4289                        let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4290                            let file = files.choose(&mut *rng.borrow_mut()).unwrap();
4291                            let (worktree, path) = project
4292                                .update(&mut cx, |project, cx| {
4293                                    project.find_or_create_local_worktree(file, false, cx)
4294                                })
4295                                .await
4296                                .unwrap();
4297                            let project_path =
4298                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4299                            log::info!("Host: opening path {:?}", project_path);
4300                            let buffer = project
4301                                .update(&mut cx, |project, cx| {
4302                                    project.open_buffer(project_path, cx)
4303                                })
4304                                .await
4305                                .unwrap();
4306                            self.buffers.insert(buffer.clone());
4307                            buffer
4308                        } else {
4309                            self.buffers
4310                                .iter()
4311                                .choose(&mut *rng.borrow_mut())
4312                                .unwrap()
4313                                .clone()
4314                        };
4315
4316                        if rng.borrow_mut().gen_bool(0.1) {
4317                            cx.update(|cx| {
4318                                log::info!(
4319                                    "Host: dropping buffer {:?}",
4320                                    buffer.read(cx).file().unwrap().full_path(cx)
4321                                );
4322                                self.buffers.remove(&buffer);
4323                                drop(buffer);
4324                            });
4325                        } else {
4326                            buffer.update(&mut cx, |buffer, cx| {
4327                                log::info!(
4328                                    "Host: updating buffer {:?}",
4329                                    buffer.file().unwrap().full_path(cx)
4330                                );
4331                                buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4332                            });
4333                        }
4334                    }
4335                    _ => loop {
4336                        let path_component_count = rng.borrow_mut().gen_range(1..=5);
4337                        let mut path = PathBuf::new();
4338                        path.push("/");
4339                        for _ in 0..path_component_count {
4340                            let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4341                            path.push(std::str::from_utf8(&[letter]).unwrap());
4342                        }
4343                        path.set_extension("rs");
4344                        let parent_path = path.parent().unwrap();
4345
4346                        log::info!("Host: creating file {:?}", path);
4347                        if fs.create_dir(&parent_path).await.is_ok()
4348                            && fs.create_file(&path, Default::default()).await.is_ok()
4349                        {
4350                            files.push(path);
4351                            break;
4352                        } else {
4353                            log::info!("Host: cannot create file");
4354                        }
4355                    },
4356                }
4357
4358                cx.background().simulate_random_delay().await;
4359            }
4360
4361            self.project = Some(project);
4362            (self, cx)
4363        }
4364
4365        pub async fn simulate_guest(
4366            mut self,
4367            guest_id: usize,
4368            project: ModelHandle<Project>,
4369            operations: Rc<Cell<usize>>,
4370            max_operations: usize,
4371            rng: Rc<RefCell<StdRng>>,
4372            mut cx: TestAppContext,
4373        ) -> (Self, TestAppContext) {
4374            while operations.get() < max_operations {
4375                let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4376                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4377                        project
4378                            .worktrees(&cx)
4379                            .filter(|worktree| {
4380                                worktree.read(cx).entries(false).any(|e| e.is_file())
4381                            })
4382                            .choose(&mut *rng.borrow_mut())
4383                    }) {
4384                        worktree
4385                    } else {
4386                        cx.background().simulate_random_delay().await;
4387                        continue;
4388                    };
4389
4390                    operations.set(operations.get() + 1);
4391                    let project_path = worktree.read_with(&cx, |worktree, _| {
4392                        let entry = worktree
4393                            .entries(false)
4394                            .filter(|e| e.is_file())
4395                            .choose(&mut *rng.borrow_mut())
4396                            .unwrap();
4397                        (worktree.id(), entry.path.clone())
4398                    });
4399                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4400                    let buffer = project
4401                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4402                        .await
4403                        .unwrap();
4404                    self.buffers.insert(buffer.clone());
4405                    buffer
4406                } else {
4407                    operations.set(operations.get() + 1);
4408
4409                    self.buffers
4410                        .iter()
4411                        .choose(&mut *rng.borrow_mut())
4412                        .unwrap()
4413                        .clone()
4414                };
4415
4416                let choice = rng.borrow_mut().gen_range(0..100);
4417                match choice {
4418                    0..=9 => {
4419                        cx.update(|cx| {
4420                            log::info!(
4421                                "Guest {}: dropping buffer {:?}",
4422                                guest_id,
4423                                buffer.read(cx).file().unwrap().full_path(cx)
4424                            );
4425                            self.buffers.remove(&buffer);
4426                            drop(buffer);
4427                        });
4428                    }
4429                    10..=19 => {
4430                        let completions = project.update(&mut cx, |project, cx| {
4431                            log::info!(
4432                                "Guest {}: requesting completions for buffer {:?}",
4433                                guest_id,
4434                                buffer.read(cx).file().unwrap().full_path(cx)
4435                            );
4436                            let offset = rng.borrow_mut().gen_range(0..=buffer.read(cx).len());
4437                            project.completions(&buffer, offset, cx)
4438                        });
4439                        let completions = cx.background().spawn(async move {
4440                            completions.await.expect("completions request failed");
4441                        });
4442                        if rng.borrow_mut().gen_bool(0.3) {
4443                            log::info!("Guest {}: detaching completions request", guest_id);
4444                            completions.detach();
4445                        } else {
4446                            completions.await;
4447                        }
4448                    }
4449                    20..=29 => {
4450                        let code_actions = project.update(&mut cx, |project, cx| {
4451                            log::info!(
4452                                "Guest {}: requesting code actions for buffer {:?}",
4453                                guest_id,
4454                                buffer.read(cx).file().unwrap().full_path(cx)
4455                            );
4456                            let range =
4457                                buffer.read(cx).random_byte_range(0, &mut *rng.borrow_mut());
4458                            project.code_actions(&buffer, range, cx)
4459                        });
4460                        let code_actions = cx.background().spawn(async move {
4461                            code_actions.await.expect("code actions request failed");
4462                        });
4463                        if rng.borrow_mut().gen_bool(0.3) {
4464                            log::info!("Guest {}: detaching code actions request", guest_id);
4465                            code_actions.detach();
4466                        } else {
4467                            code_actions.await;
4468                        }
4469                    }
4470                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4471                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4472                            log::info!(
4473                                "Guest {}: saving buffer {:?}",
4474                                guest_id,
4475                                buffer.file().unwrap().full_path(cx)
4476                            );
4477                            (buffer.version(), buffer.save(cx))
4478                        });
4479                        let save = cx.spawn(|cx| async move {
4480                            let (saved_version, _) = save.await.expect("save request failed");
4481                            buffer.read_with(&cx, |buffer, _| {
4482                                assert!(buffer.version().observed_all(&saved_version));
4483                                assert!(saved_version.observed_all(&requested_version));
4484                            });
4485                        });
4486                        if rng.borrow_mut().gen_bool(0.3) {
4487                            log::info!("Guest {}: detaching save request", guest_id);
4488                            save.detach();
4489                        } else {
4490                            save.await;
4491                        }
4492                    }
4493                    40..=45 => {
4494                        let prepare_rename = project.update(&mut cx, |project, cx| {
4495                            log::info!(
4496                                "Guest {}: preparing rename for buffer {:?}",
4497                                guest_id,
4498                                buffer.read(cx).file().unwrap().full_path(cx)
4499                            );
4500                            let offset = rng.borrow_mut().gen_range(0..=buffer.read(cx).len());
4501                            project.prepare_rename(buffer, offset, cx)
4502                        });
4503                        let prepare_rename = cx.background().spawn(async move {
4504                            prepare_rename.await.expect("prepare rename request failed");
4505                        });
4506                        if rng.borrow_mut().gen_bool(0.3) {
4507                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4508                            prepare_rename.detach();
4509                        } else {
4510                            prepare_rename.await;
4511                        }
4512                    }
4513                    _ => {
4514                        buffer.update(&mut cx, |buffer, cx| {
4515                            log::info!(
4516                                "Guest {}: updating buffer {:?}",
4517                                guest_id,
4518                                buffer.file().unwrap().full_path(cx)
4519                            );
4520                            buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4521                        });
4522                    }
4523                }
4524                cx.background().simulate_random_delay().await;
4525            }
4526
4527            self.project = Some(project);
4528            (self, cx)
4529        }
4530    }
4531
4532    impl Executor for Arc<gpui::executor::Background> {
4533        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4534            self.spawn(future).detach();
4535        }
4536    }
4537
4538    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4539        channel
4540            .messages()
4541            .cursor::<()>()
4542            .map(|m| {
4543                (
4544                    m.sender.github_login.clone(),
4545                    m.body.clone(),
4546                    m.is_pending(),
4547                )
4548            })
4549            .collect()
4550    }
4551
4552    struct EmptyView;
4553
4554    impl gpui::Entity for EmptyView {
4555        type Event = ();
4556    }
4557
4558    impl gpui::View for EmptyView {
4559        fn ui_name() -> &'static str {
4560            "empty view"
4561        }
4562
4563        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4564            gpui::Element::boxed(gpui::elements::Empty)
4565        }
4566    }
4567}