rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::share_worktree)
  77            .add_request_handler(Server::update_worktree)
  78            .add_message_handler(Server::update_diagnostic_summary)
  79            .add_message_handler(Server::disk_based_diagnostics_updating)
  80            .add_message_handler(Server::disk_based_diagnostics_updated)
  81            .add_request_handler(Server::get_definition)
  82            .add_request_handler(Server::get_project_symbols)
  83            .add_request_handler(Server::open_buffer_for_symbol)
  84            .add_request_handler(Server::open_buffer)
  85            .add_message_handler(Server::close_buffer)
  86            .add_request_handler(Server::update_buffer)
  87            .add_message_handler(Server::update_buffer_file)
  88            .add_message_handler(Server::buffer_reloaded)
  89            .add_message_handler(Server::buffer_saved)
  90            .add_request_handler(Server::save_buffer)
  91            .add_request_handler(Server::format_buffers)
  92            .add_request_handler(Server::get_completions)
  93            .add_request_handler(Server::apply_additional_edits_for_completion)
  94            .add_request_handler(Server::get_code_actions)
  95            .add_request_handler(Server::apply_code_action)
  96            .add_request_handler(Server::prepare_rename)
  97            .add_request_handler(Server::perform_rename)
  98            .add_request_handler(Server::get_channels)
  99            .add_request_handler(Server::get_users)
 100            .add_request_handler(Server::join_channel)
 101            .add_message_handler(Server::leave_channel)
 102            .add_request_handler(Server::send_channel_message)
 103            .add_request_handler(Server::get_channel_messages);
 104
 105        Arc::new(server)
 106    }
 107
 108    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 109    where
 110        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 111        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 112        M: EnvelopedMessage,
 113    {
 114        let prev_handler = self.handlers.insert(
 115            TypeId::of::<M>(),
 116            Box::new(move |server, envelope| {
 117                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 118                (handler)(server, *envelope).boxed()
 119            }),
 120        );
 121        if prev_handler.is_some() {
 122            panic!("registered a handler for the same message twice");
 123        }
 124        self
 125    }
 126
 127    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 128    where
 129        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 130        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 131        M: RequestMessage,
 132    {
 133        self.add_message_handler(move |server, envelope| {
 134            let receipt = envelope.receipt();
 135            let response = (handler)(server.clone(), envelope);
 136            async move {
 137                match response.await {
 138                    Ok(response) => {
 139                        server.peer.respond(receipt, response)?;
 140                        Ok(())
 141                    }
 142                    Err(error) => {
 143                        server.peer.respond_with_error(
 144                            receipt,
 145                            proto::Error {
 146                                message: error.to_string(),
 147                            },
 148                        )?;
 149                        Err(error)
 150                    }
 151                }
 152            }
 153        })
 154    }
 155
 156    pub fn handle_connection<E: Executor>(
 157        self: &Arc<Self>,
 158        connection: Connection,
 159        addr: String,
 160        user_id: UserId,
 161        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 162        executor: E,
 163    ) -> impl Future<Output = ()> {
 164        let mut this = self.clone();
 165        async move {
 166            let (connection_id, handle_io, mut incoming_rx) =
 167                this.peer.add_connection(connection).await;
 168
 169            if let Some(send_connection_id) = send_connection_id.as_mut() {
 170                let _ = send_connection_id.send(connection_id).await;
 171            }
 172
 173            this.state_mut().add_connection(connection_id, user_id);
 174            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 175                log::error!("error updating contacts for {:?}: {}", user_id, err);
 176            }
 177
 178            let handle_io = handle_io.fuse();
 179            futures::pin_mut!(handle_io);
 180            loop {
 181                let next_message = incoming_rx.next().fuse();
 182                futures::pin_mut!(next_message);
 183                futures::select_biased! {
 184                    result = handle_io => {
 185                        if let Err(err) = result {
 186                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 187                        }
 188                        break;
 189                    }
 190                    message = next_message => {
 191                        if let Some(message) = message {
 192                            let start_time = Instant::now();
 193                            let type_name = message.payload_type_name();
 194                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 195                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 196                                let notifications = this.notifications.clone();
 197                                let is_background = message.is_background();
 198                                let handle_message = (handler)(this.clone(), message);
 199                                let handle_message = async move {
 200                                    if let Err(err) = handle_message.await {
 201                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 202                                    } else {
 203                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 204                                    }
 205                                    if let Some(mut notifications) = notifications {
 206                                        let _ = notifications.send(()).await;
 207                                    }
 208                                };
 209                                if is_background {
 210                                    executor.spawn_detached(handle_message);
 211                                } else {
 212                                    handle_message.await;
 213                                }
 214                            } else {
 215                                log::warn!("unhandled message: {}", type_name);
 216                            }
 217                        } else {
 218                            log::info!("rpc connection closed {:?}", addr);
 219                            break;
 220                        }
 221                    }
 222                }
 223            }
 224
 225            if let Err(err) = this.sign_out(connection_id).await {
 226                log::error!("error signing out connection {:?} - {:?}", addr, err);
 227            }
 228        }
 229    }
 230
 231    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 232        self.peer.disconnect(connection_id);
 233        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 234
 235        for (project_id, project) in removed_connection.hosted_projects {
 236            if let Some(share) = project.share {
 237                broadcast(
 238                    connection_id,
 239                    share.guests.keys().copied().collect(),
 240                    |conn_id| {
 241                        self.peer
 242                            .send(conn_id, proto::UnshareProject { project_id })
 243                    },
 244                )?;
 245            }
 246        }
 247
 248        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 249            broadcast(connection_id, peer_ids, |conn_id| {
 250                self.peer.send(
 251                    conn_id,
 252                    proto::RemoveProjectCollaborator {
 253                        project_id,
 254                        peer_id: connection_id.0,
 255                    },
 256                )
 257            })?;
 258        }
 259
 260        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 261        Ok(())
 262    }
 263
 264    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 265        Ok(proto::Ack {})
 266    }
 267
 268    async fn register_project(
 269        mut self: Arc<Server>,
 270        request: TypedEnvelope<proto::RegisterProject>,
 271    ) -> tide::Result<proto::RegisterProjectResponse> {
 272        let project_id = {
 273            let mut state = self.state_mut();
 274            let user_id = state.user_id_for_connection(request.sender_id)?;
 275            state.register_project(request.sender_id, user_id)
 276        };
 277        Ok(proto::RegisterProjectResponse { project_id })
 278    }
 279
 280    async fn unregister_project(
 281        mut self: Arc<Server>,
 282        request: TypedEnvelope<proto::UnregisterProject>,
 283    ) -> tide::Result<()> {
 284        let project = self
 285            .state_mut()
 286            .unregister_project(request.payload.project_id, request.sender_id)?;
 287        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 288        Ok(())
 289    }
 290
 291    async fn share_project(
 292        mut self: Arc<Server>,
 293        request: TypedEnvelope<proto::ShareProject>,
 294    ) -> tide::Result<proto::Ack> {
 295        self.state_mut()
 296            .share_project(request.payload.project_id, request.sender_id);
 297        Ok(proto::Ack {})
 298    }
 299
 300    async fn unshare_project(
 301        mut self: Arc<Server>,
 302        request: TypedEnvelope<proto::UnshareProject>,
 303    ) -> tide::Result<()> {
 304        let project_id = request.payload.project_id;
 305        let project = self
 306            .state_mut()
 307            .unshare_project(project_id, request.sender_id)?;
 308
 309        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 310            self.peer
 311                .send(conn_id, proto::UnshareProject { project_id })
 312        })?;
 313        self.update_contacts_for_users(&project.authorized_user_ids)?;
 314        Ok(())
 315    }
 316
 317    async fn join_project(
 318        mut self: Arc<Server>,
 319        request: TypedEnvelope<proto::JoinProject>,
 320    ) -> tide::Result<proto::JoinProjectResponse> {
 321        let project_id = request.payload.project_id;
 322
 323        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 324        let (response, connection_ids, contact_user_ids) = self
 325            .state_mut()
 326            .join_project(request.sender_id, user_id, project_id)
 327            .and_then(|joined| {
 328                let share = joined.project.share()?;
 329                let peer_count = share.guests.len();
 330                let mut collaborators = Vec::with_capacity(peer_count);
 331                collaborators.push(proto::Collaborator {
 332                    peer_id: joined.project.host_connection_id.0,
 333                    replica_id: 0,
 334                    user_id: joined.project.host_user_id.to_proto(),
 335                });
 336                let worktrees = joined
 337                    .project
 338                    .worktrees
 339                    .iter()
 340                    .filter_map(|(id, worktree)| {
 341                        worktree.share.as_ref().map(|share| proto::Worktree {
 342                            id: *id,
 343                            root_name: worktree.root_name.clone(),
 344                            entries: share.entries.values().cloned().collect(),
 345                            diagnostic_summaries: share
 346                                .diagnostic_summaries
 347                                .values()
 348                                .cloned()
 349                                .collect(),
 350                            weak: worktree.weak,
 351                            next_update_id: share.next_update_id as u64,
 352                        })
 353                    })
 354                    .collect();
 355                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 356                    if *peer_conn_id != request.sender_id {
 357                        collaborators.push(proto::Collaborator {
 358                            peer_id: peer_conn_id.0,
 359                            replica_id: *peer_replica_id as u32,
 360                            user_id: peer_user_id.to_proto(),
 361                        });
 362                    }
 363                }
 364                let response = proto::JoinProjectResponse {
 365                    worktrees,
 366                    replica_id: joined.replica_id as u32,
 367                    collaborators,
 368                };
 369                let connection_ids = joined.project.connection_ids();
 370                let contact_user_ids = joined.project.authorized_user_ids();
 371                Ok((response, connection_ids, contact_user_ids))
 372            })?;
 373
 374        broadcast(request.sender_id, connection_ids, |conn_id| {
 375            self.peer.send(
 376                conn_id,
 377                proto::AddProjectCollaborator {
 378                    project_id,
 379                    collaborator: Some(proto::Collaborator {
 380                        peer_id: request.sender_id.0,
 381                        replica_id: response.replica_id,
 382                        user_id: user_id.to_proto(),
 383                    }),
 384                },
 385            )
 386        })?;
 387        self.update_contacts_for_users(&contact_user_ids)?;
 388        Ok(response)
 389    }
 390
 391    async fn leave_project(
 392        mut self: Arc<Server>,
 393        request: TypedEnvelope<proto::LeaveProject>,
 394    ) -> tide::Result<()> {
 395        let sender_id = request.sender_id;
 396        let project_id = request.payload.project_id;
 397        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 398
 399        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 400            self.peer.send(
 401                conn_id,
 402                proto::RemoveProjectCollaborator {
 403                    project_id,
 404                    peer_id: sender_id.0,
 405                },
 406            )
 407        })?;
 408        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 409
 410        Ok(())
 411    }
 412
 413    async fn register_worktree(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::RegisterWorktree>,
 416    ) -> tide::Result<proto::Ack> {
 417        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 418
 419        let mut contact_user_ids = HashSet::default();
 420        contact_user_ids.insert(host_user_id);
 421        for github_login in request.payload.authorized_logins {
 422            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 423            contact_user_ids.insert(contact_user_id);
 424        }
 425
 426        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 427        self.state_mut().register_worktree(
 428            request.payload.project_id,
 429            request.payload.worktree_id,
 430            request.sender_id,
 431            Worktree {
 432                authorized_user_ids: contact_user_ids.clone(),
 433                root_name: request.payload.root_name,
 434                share: None,
 435                weak: false,
 436            },
 437        )?;
 438        self.update_contacts_for_users(&contact_user_ids)?;
 439        Ok(proto::Ack {})
 440    }
 441
 442    async fn unregister_worktree(
 443        mut self: Arc<Server>,
 444        request: TypedEnvelope<proto::UnregisterWorktree>,
 445    ) -> tide::Result<()> {
 446        let project_id = request.payload.project_id;
 447        let worktree_id = request.payload.worktree_id;
 448        let (worktree, guest_connection_ids) =
 449            self.state_mut()
 450                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 451        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 452            self.peer.send(
 453                conn_id,
 454                proto::UnregisterWorktree {
 455                    project_id,
 456                    worktree_id,
 457                },
 458            )
 459        })?;
 460        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 461        Ok(())
 462    }
 463
 464    async fn share_worktree(
 465        mut self: Arc<Server>,
 466        mut request: TypedEnvelope<proto::ShareWorktree>,
 467    ) -> tide::Result<proto::Ack> {
 468        let worktree = request
 469            .payload
 470            .worktree
 471            .as_mut()
 472            .ok_or_else(|| anyhow!("missing worktree"))?;
 473        let entries = worktree
 474            .entries
 475            .iter()
 476            .map(|entry| (entry.id, entry.clone()))
 477            .collect();
 478        let diagnostic_summaries = worktree
 479            .diagnostic_summaries
 480            .iter()
 481            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 482            .collect();
 483
 484        let shared_worktree = self.state_mut().share_worktree(
 485            request.payload.project_id,
 486            worktree.id,
 487            request.sender_id,
 488            entries,
 489            diagnostic_summaries,
 490            worktree.next_update_id,
 491        )?;
 492
 493        broadcast(
 494            request.sender_id,
 495            shared_worktree.connection_ids,
 496            |connection_id| {
 497                self.peer
 498                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 499            },
 500        )?;
 501        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 502
 503        Ok(proto::Ack {})
 504    }
 505
 506    async fn update_worktree(
 507        mut self: Arc<Server>,
 508        request: TypedEnvelope<proto::UpdateWorktree>,
 509    ) -> tide::Result<proto::Ack> {
 510        let connection_ids = self.state_mut().update_worktree(
 511            request.sender_id,
 512            request.payload.project_id,
 513            request.payload.worktree_id,
 514            request.payload.id,
 515            &request.payload.removed_entries,
 516            &request.payload.updated_entries,
 517        )?;
 518
 519        broadcast(request.sender_id, connection_ids, |connection_id| {
 520            self.peer
 521                .forward_send(request.sender_id, connection_id, request.payload.clone())
 522        })?;
 523
 524        Ok(proto::Ack {})
 525    }
 526
 527    async fn update_diagnostic_summary(
 528        mut self: Arc<Server>,
 529        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 530    ) -> tide::Result<()> {
 531        let summary = request
 532            .payload
 533            .summary
 534            .clone()
 535            .ok_or_else(|| anyhow!("invalid summary"))?;
 536        let receiver_ids = self.state_mut().update_diagnostic_summary(
 537            request.payload.project_id,
 538            request.payload.worktree_id,
 539            request.sender_id,
 540            summary,
 541        )?;
 542
 543        broadcast(request.sender_id, receiver_ids, |connection_id| {
 544            self.peer
 545                .forward_send(request.sender_id, connection_id, request.payload.clone())
 546        })?;
 547        Ok(())
 548    }
 549
 550    async fn disk_based_diagnostics_updating(
 551        self: Arc<Server>,
 552        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 553    ) -> tide::Result<()> {
 554        let receiver_ids = self
 555            .state()
 556            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 557        broadcast(request.sender_id, receiver_ids, |connection_id| {
 558            self.peer
 559                .forward_send(request.sender_id, connection_id, request.payload.clone())
 560        })?;
 561        Ok(())
 562    }
 563
 564    async fn disk_based_diagnostics_updated(
 565        self: Arc<Server>,
 566        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 567    ) -> tide::Result<()> {
 568        let receiver_ids = self
 569            .state()
 570            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 571        broadcast(request.sender_id, receiver_ids, |connection_id| {
 572            self.peer
 573                .forward_send(request.sender_id, connection_id, request.payload.clone())
 574        })?;
 575        Ok(())
 576    }
 577
 578    async fn get_definition(
 579        self: Arc<Server>,
 580        request: TypedEnvelope<proto::GetDefinition>,
 581    ) -> tide::Result<proto::GetDefinitionResponse> {
 582        let host_connection_id = self
 583            .state()
 584            .read_project(request.payload.project_id, request.sender_id)?
 585            .host_connection_id;
 586        Ok(self
 587            .peer
 588            .forward_request(request.sender_id, host_connection_id, request.payload)
 589            .await?)
 590    }
 591
 592    async fn get_project_symbols(
 593        self: Arc<Server>,
 594        request: TypedEnvelope<proto::GetProjectSymbols>,
 595    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 596        let host_connection_id = self
 597            .state()
 598            .read_project(request.payload.project_id, request.sender_id)?
 599            .host_connection_id;
 600        Ok(self
 601            .peer
 602            .forward_request(request.sender_id, host_connection_id, request.payload)
 603            .await?)
 604    }
 605
 606    async fn open_buffer_for_symbol(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::OpenBufferForSymbol>,
 609    ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
 610        let host_connection_id = self
 611            .state()
 612            .read_project(request.payload.project_id, request.sender_id)?
 613            .host_connection_id;
 614        Ok(self
 615            .peer
 616            .forward_request(request.sender_id, host_connection_id, request.payload)
 617            .await?)
 618    }
 619
 620    async fn open_buffer(
 621        self: Arc<Server>,
 622        request: TypedEnvelope<proto::OpenBuffer>,
 623    ) -> tide::Result<proto::OpenBufferResponse> {
 624        let host_connection_id = self
 625            .state()
 626            .read_project(request.payload.project_id, request.sender_id)?
 627            .host_connection_id;
 628        Ok(self
 629            .peer
 630            .forward_request(request.sender_id, host_connection_id, request.payload)
 631            .await?)
 632    }
 633
 634    async fn close_buffer(
 635        self: Arc<Server>,
 636        request: TypedEnvelope<proto::CloseBuffer>,
 637    ) -> tide::Result<()> {
 638        let host_connection_id = self
 639            .state()
 640            .read_project(request.payload.project_id, request.sender_id)?
 641            .host_connection_id;
 642        self.peer
 643            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 644        Ok(())
 645    }
 646
 647    async fn save_buffer(
 648        self: Arc<Server>,
 649        request: TypedEnvelope<proto::SaveBuffer>,
 650    ) -> tide::Result<proto::BufferSaved> {
 651        let host;
 652        let mut guests;
 653        {
 654            let state = self.state();
 655            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 656            host = project.host_connection_id;
 657            guests = project.guest_connection_ids()
 658        }
 659
 660        let response = self
 661            .peer
 662            .forward_request(request.sender_id, host, request.payload.clone())
 663            .await?;
 664
 665        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 666        broadcast(host, guests, |conn_id| {
 667            self.peer.forward_send(host, conn_id, response.clone())
 668        })?;
 669
 670        Ok(response)
 671    }
 672
 673    async fn format_buffers(
 674        self: Arc<Server>,
 675        request: TypedEnvelope<proto::FormatBuffers>,
 676    ) -> tide::Result<proto::FormatBuffersResponse> {
 677        let host = self
 678            .state()
 679            .read_project(request.payload.project_id, request.sender_id)?
 680            .host_connection_id;
 681        Ok(self
 682            .peer
 683            .forward_request(request.sender_id, host, request.payload.clone())
 684            .await?)
 685    }
 686
 687    async fn get_completions(
 688        self: Arc<Server>,
 689        request: TypedEnvelope<proto::GetCompletions>,
 690    ) -> tide::Result<proto::GetCompletionsResponse> {
 691        let host = self
 692            .state()
 693            .read_project(request.payload.project_id, request.sender_id)?
 694            .host_connection_id;
 695        Ok(self
 696            .peer
 697            .forward_request(request.sender_id, host, request.payload.clone())
 698            .await?)
 699    }
 700
 701    async fn apply_additional_edits_for_completion(
 702        self: Arc<Server>,
 703        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 704    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 705        let host = self
 706            .state()
 707            .read_project(request.payload.project_id, request.sender_id)?
 708            .host_connection_id;
 709        Ok(self
 710            .peer
 711            .forward_request(request.sender_id, host, request.payload.clone())
 712            .await?)
 713    }
 714
 715    async fn get_code_actions(
 716        self: Arc<Server>,
 717        request: TypedEnvelope<proto::GetCodeActions>,
 718    ) -> tide::Result<proto::GetCodeActionsResponse> {
 719        let host = self
 720            .state()
 721            .read_project(request.payload.project_id, request.sender_id)?
 722            .host_connection_id;
 723        Ok(self
 724            .peer
 725            .forward_request(request.sender_id, host, request.payload.clone())
 726            .await?)
 727    }
 728
 729    async fn apply_code_action(
 730        self: Arc<Server>,
 731        request: TypedEnvelope<proto::ApplyCodeAction>,
 732    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 733        let host = self
 734            .state()
 735            .read_project(request.payload.project_id, request.sender_id)?
 736            .host_connection_id;
 737        Ok(self
 738            .peer
 739            .forward_request(request.sender_id, host, request.payload.clone())
 740            .await?)
 741    }
 742
 743    async fn prepare_rename(
 744        self: Arc<Server>,
 745        request: TypedEnvelope<proto::PrepareRename>,
 746    ) -> tide::Result<proto::PrepareRenameResponse> {
 747        let host = self
 748            .state()
 749            .read_project(request.payload.project_id, request.sender_id)?
 750            .host_connection_id;
 751        Ok(self
 752            .peer
 753            .forward_request(request.sender_id, host, request.payload.clone())
 754            .await?)
 755    }
 756
 757    async fn perform_rename(
 758        self: Arc<Server>,
 759        request: TypedEnvelope<proto::PerformRename>,
 760    ) -> tide::Result<proto::PerformRenameResponse> {
 761        let host = self
 762            .state()
 763            .read_project(request.payload.project_id, request.sender_id)?
 764            .host_connection_id;
 765        Ok(self
 766            .peer
 767            .forward_request(request.sender_id, host, request.payload.clone())
 768            .await?)
 769    }
 770
 771    async fn update_buffer(
 772        self: Arc<Server>,
 773        request: TypedEnvelope<proto::UpdateBuffer>,
 774    ) -> tide::Result<proto::Ack> {
 775        let receiver_ids = self
 776            .state()
 777            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 778        broadcast(request.sender_id, receiver_ids, |connection_id| {
 779            self.peer
 780                .forward_send(request.sender_id, connection_id, request.payload.clone())
 781        })?;
 782        Ok(proto::Ack {})
 783    }
 784
 785    async fn update_buffer_file(
 786        self: Arc<Server>,
 787        request: TypedEnvelope<proto::UpdateBufferFile>,
 788    ) -> tide::Result<()> {
 789        let receiver_ids = self
 790            .state()
 791            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 792        broadcast(request.sender_id, receiver_ids, |connection_id| {
 793            self.peer
 794                .forward_send(request.sender_id, connection_id, request.payload.clone())
 795        })?;
 796        Ok(())
 797    }
 798
 799    async fn buffer_reloaded(
 800        self: Arc<Server>,
 801        request: TypedEnvelope<proto::BufferReloaded>,
 802    ) -> tide::Result<()> {
 803        let receiver_ids = self
 804            .state()
 805            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 806        broadcast(request.sender_id, receiver_ids, |connection_id| {
 807            self.peer
 808                .forward_send(request.sender_id, connection_id, request.payload.clone())
 809        })?;
 810        Ok(())
 811    }
 812
 813    async fn buffer_saved(
 814        self: Arc<Server>,
 815        request: TypedEnvelope<proto::BufferSaved>,
 816    ) -> tide::Result<()> {
 817        let receiver_ids = self
 818            .state()
 819            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 820        broadcast(request.sender_id, receiver_ids, |connection_id| {
 821            self.peer
 822                .forward_send(request.sender_id, connection_id, request.payload.clone())
 823        })?;
 824        Ok(())
 825    }
 826
 827    async fn get_channels(
 828        self: Arc<Server>,
 829        request: TypedEnvelope<proto::GetChannels>,
 830    ) -> tide::Result<proto::GetChannelsResponse> {
 831        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 832        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 833        Ok(proto::GetChannelsResponse {
 834            channels: channels
 835                .into_iter()
 836                .map(|chan| proto::Channel {
 837                    id: chan.id.to_proto(),
 838                    name: chan.name,
 839                })
 840                .collect(),
 841        })
 842    }
 843
 844    async fn get_users(
 845        self: Arc<Server>,
 846        request: TypedEnvelope<proto::GetUsers>,
 847    ) -> tide::Result<proto::GetUsersResponse> {
 848        let user_ids = request
 849            .payload
 850            .user_ids
 851            .into_iter()
 852            .map(UserId::from_proto)
 853            .collect();
 854        let users = self
 855            .app_state
 856            .db
 857            .get_users_by_ids(user_ids)
 858            .await?
 859            .into_iter()
 860            .map(|user| proto::User {
 861                id: user.id.to_proto(),
 862                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 863                github_login: user.github_login,
 864            })
 865            .collect();
 866        Ok(proto::GetUsersResponse { users })
 867    }
 868
 869    fn update_contacts_for_users<'a>(
 870        self: &Arc<Server>,
 871        user_ids: impl IntoIterator<Item = &'a UserId>,
 872    ) -> anyhow::Result<()> {
 873        let mut result = Ok(());
 874        let state = self.state();
 875        for user_id in user_ids {
 876            let contacts = state.contacts_for_user(*user_id);
 877            for connection_id in state.connection_ids_for_user(*user_id) {
 878                if let Err(error) = self.peer.send(
 879                    connection_id,
 880                    proto::UpdateContacts {
 881                        contacts: contacts.clone(),
 882                    },
 883                ) {
 884                    result = Err(error);
 885                }
 886            }
 887        }
 888        result
 889    }
 890
 891    async fn join_channel(
 892        mut self: Arc<Self>,
 893        request: TypedEnvelope<proto::JoinChannel>,
 894    ) -> tide::Result<proto::JoinChannelResponse> {
 895        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 896        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 897        if !self
 898            .app_state
 899            .db
 900            .can_user_access_channel(user_id, channel_id)
 901            .await?
 902        {
 903            Err(anyhow!("access denied"))?;
 904        }
 905
 906        self.state_mut().join_channel(request.sender_id, channel_id);
 907        let messages = self
 908            .app_state
 909            .db
 910            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 911            .await?
 912            .into_iter()
 913            .map(|msg| proto::ChannelMessage {
 914                id: msg.id.to_proto(),
 915                body: msg.body,
 916                timestamp: msg.sent_at.unix_timestamp() as u64,
 917                sender_id: msg.sender_id.to_proto(),
 918                nonce: Some(msg.nonce.as_u128().into()),
 919            })
 920            .collect::<Vec<_>>();
 921        Ok(proto::JoinChannelResponse {
 922            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 923            messages,
 924        })
 925    }
 926
 927    async fn leave_channel(
 928        mut self: Arc<Self>,
 929        request: TypedEnvelope<proto::LeaveChannel>,
 930    ) -> tide::Result<()> {
 931        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 932        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 933        if !self
 934            .app_state
 935            .db
 936            .can_user_access_channel(user_id, channel_id)
 937            .await?
 938        {
 939            Err(anyhow!("access denied"))?;
 940        }
 941
 942        self.state_mut()
 943            .leave_channel(request.sender_id, channel_id);
 944
 945        Ok(())
 946    }
 947
 948    async fn send_channel_message(
 949        self: Arc<Self>,
 950        request: TypedEnvelope<proto::SendChannelMessage>,
 951    ) -> tide::Result<proto::SendChannelMessageResponse> {
 952        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 953        let user_id;
 954        let connection_ids;
 955        {
 956            let state = self.state();
 957            user_id = state.user_id_for_connection(request.sender_id)?;
 958            connection_ids = state.channel_connection_ids(channel_id)?;
 959        }
 960
 961        // Validate the message body.
 962        let body = request.payload.body.trim().to_string();
 963        if body.len() > MAX_MESSAGE_LEN {
 964            return Err(anyhow!("message is too long"))?;
 965        }
 966        if body.is_empty() {
 967            return Err(anyhow!("message can't be blank"))?;
 968        }
 969
 970        let timestamp = OffsetDateTime::now_utc();
 971        let nonce = request
 972            .payload
 973            .nonce
 974            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 975
 976        let message_id = self
 977            .app_state
 978            .db
 979            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 980            .await?
 981            .to_proto();
 982        let message = proto::ChannelMessage {
 983            sender_id: user_id.to_proto(),
 984            id: message_id,
 985            body,
 986            timestamp: timestamp.unix_timestamp() as u64,
 987            nonce: Some(nonce),
 988        };
 989        broadcast(request.sender_id, connection_ids, |conn_id| {
 990            self.peer.send(
 991                conn_id,
 992                proto::ChannelMessageSent {
 993                    channel_id: channel_id.to_proto(),
 994                    message: Some(message.clone()),
 995                },
 996            )
 997        })?;
 998        Ok(proto::SendChannelMessageResponse {
 999            message: Some(message),
1000        })
1001    }
1002
1003    async fn get_channel_messages(
1004        self: Arc<Self>,
1005        request: TypedEnvelope<proto::GetChannelMessages>,
1006    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1007        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1008        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1009        if !self
1010            .app_state
1011            .db
1012            .can_user_access_channel(user_id, channel_id)
1013            .await?
1014        {
1015            Err(anyhow!("access denied"))?;
1016        }
1017
1018        let messages = self
1019            .app_state
1020            .db
1021            .get_channel_messages(
1022                channel_id,
1023                MESSAGE_COUNT_PER_PAGE,
1024                Some(MessageId::from_proto(request.payload.before_message_id)),
1025            )
1026            .await?
1027            .into_iter()
1028            .map(|msg| proto::ChannelMessage {
1029                id: msg.id.to_proto(),
1030                body: msg.body,
1031                timestamp: msg.sent_at.unix_timestamp() as u64,
1032                sender_id: msg.sender_id.to_proto(),
1033                nonce: Some(msg.nonce.as_u128().into()),
1034            })
1035            .collect::<Vec<_>>();
1036
1037        Ok(proto::GetChannelMessagesResponse {
1038            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1039            messages,
1040        })
1041    }
1042
1043    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1044        self.store.read()
1045    }
1046
1047    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1048        self.store.write()
1049    }
1050}
1051
1052impl Executor for RealExecutor {
1053    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1054        task::spawn(future);
1055    }
1056}
1057
1058fn broadcast<F>(
1059    sender_id: ConnectionId,
1060    receiver_ids: Vec<ConnectionId>,
1061    mut f: F,
1062) -> anyhow::Result<()>
1063where
1064    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1065{
1066    let mut result = Ok(());
1067    for receiver_id in receiver_ids {
1068        if receiver_id != sender_id {
1069            if let Err(error) = f(receiver_id) {
1070                if result.is_ok() {
1071                    result = Err(error);
1072                }
1073            }
1074        }
1075    }
1076    result
1077}
1078
1079pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1080    let server = Server::new(app.state().clone(), rpc.clone(), None);
1081    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1082        let server = server.clone();
1083        async move {
1084            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1085
1086            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1087            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1088            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1089            let client_protocol_version: Option<u32> = request
1090                .header("X-Zed-Protocol-Version")
1091                .and_then(|v| v.as_str().parse().ok());
1092
1093            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1094                return Ok(Response::new(StatusCode::UpgradeRequired));
1095            }
1096
1097            let header = match request.header("Sec-Websocket-Key") {
1098                Some(h) => h.as_str(),
1099                None => return Err(anyhow!("expected sec-websocket-key"))?,
1100            };
1101
1102            let user_id = process_auth_header(&request).await?;
1103
1104            let mut response = Response::new(StatusCode::SwitchingProtocols);
1105            response.insert_header(UPGRADE, "websocket");
1106            response.insert_header(CONNECTION, "Upgrade");
1107            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1108            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1109            response.insert_header("Sec-Websocket-Version", "13");
1110
1111            let http_res: &mut tide::http::Response = response.as_mut();
1112            let upgrade_receiver = http_res.recv_upgrade().await;
1113            let addr = request.remote().unwrap_or("unknown").to_string();
1114            task::spawn(async move {
1115                if let Some(stream) = upgrade_receiver.await {
1116                    server
1117                        .handle_connection(
1118                            Connection::new(
1119                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1120                            ),
1121                            addr,
1122                            user_id,
1123                            None,
1124                            RealExecutor,
1125                        )
1126                        .await;
1127                }
1128            });
1129
1130            Ok(response)
1131        }
1132    });
1133}
1134
1135fn header_contains_ignore_case<T>(
1136    request: &tide::Request<T>,
1137    header_name: HeaderName,
1138    value: &str,
1139) -> bool {
1140    request
1141        .header(header_name)
1142        .map(|h| {
1143            h.as_str()
1144                .split(',')
1145                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1146        })
1147        .unwrap_or(false)
1148}
1149
1150#[cfg(test)]
1151mod tests {
1152    use super::*;
1153    use crate::{
1154        auth,
1155        db::{tests::TestDb, UserId},
1156        github, AppState, Config,
1157    };
1158    use ::rpc::Peer;
1159    use collections::BTreeMap;
1160    use gpui::{executor, ModelHandle, TestAppContext};
1161    use parking_lot::Mutex;
1162    use postage::{sink::Sink, watch};
1163    use rand::prelude::*;
1164    use rpc::PeerId;
1165    use serde_json::json;
1166    use sqlx::types::time::OffsetDateTime;
1167    use std::{
1168        cell::Cell,
1169        env,
1170        ops::Deref,
1171        path::Path,
1172        rc::Rc,
1173        sync::{
1174            atomic::{AtomicBool, Ordering::SeqCst},
1175            Arc,
1176        },
1177        time::Duration,
1178    };
1179    use zed::{
1180        client::{
1181            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1182            EstablishConnectionError, UserStore,
1183        },
1184        editor::{
1185            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1186            Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1187        },
1188        fs::{FakeFs, Fs as _},
1189        language::{
1190            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1191            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1192        },
1193        lsp,
1194        project::{DiagnosticSummary, Project, ProjectPath},
1195        workspace::{Workspace, WorkspaceParams},
1196    };
1197
1198    #[cfg(test)]
1199    #[ctor::ctor]
1200    fn init_logger() {
1201        if std::env::var("RUST_LOG").is_ok() {
1202            env_logger::init();
1203        }
1204    }
1205
1206    #[gpui::test(iterations = 10)]
1207    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1208        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1209        let lang_registry = Arc::new(LanguageRegistry::new());
1210        let fs = FakeFs::new(cx_a.background());
1211        cx_a.foreground().forbid_parking();
1212
1213        // Connect to a server as 2 clients.
1214        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1215        let client_a = server.create_client(&mut cx_a, "user_a").await;
1216        let client_b = server.create_client(&mut cx_b, "user_b").await;
1217
1218        // Share a project as client A
1219        fs.insert_tree(
1220            "/a",
1221            json!({
1222                ".zed.toml": r#"collaborators = ["user_b"]"#,
1223                "a.txt": "a-contents",
1224                "b.txt": "b-contents",
1225            }),
1226        )
1227        .await;
1228        let project_a = cx_a.update(|cx| {
1229            Project::local(
1230                client_a.clone(),
1231                client_a.user_store.clone(),
1232                lang_registry.clone(),
1233                fs.clone(),
1234                cx,
1235            )
1236        });
1237        let (worktree_a, _) = project_a
1238            .update(&mut cx_a, |p, cx| {
1239                p.find_or_create_local_worktree("/a", false, cx)
1240            })
1241            .await
1242            .unwrap();
1243        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1244        worktree_a
1245            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1246            .await;
1247        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1248        project_a
1249            .update(&mut cx_a, |p, cx| p.share(cx))
1250            .await
1251            .unwrap();
1252
1253        // Join that project as client B
1254        let project_b = Project::remote(
1255            project_id,
1256            client_b.clone(),
1257            client_b.user_store.clone(),
1258            lang_registry.clone(),
1259            fs.clone(),
1260            &mut cx_b.to_async(),
1261        )
1262        .await
1263        .unwrap();
1264
1265        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1266            assert_eq!(
1267                project
1268                    .collaborators()
1269                    .get(&client_a.peer_id)
1270                    .unwrap()
1271                    .user
1272                    .github_login,
1273                "user_a"
1274            );
1275            project.replica_id()
1276        });
1277        project_a
1278            .condition(&cx_a, |tree, _| {
1279                tree.collaborators()
1280                    .get(&client_b.peer_id)
1281                    .map_or(false, |collaborator| {
1282                        collaborator.replica_id == replica_id_b
1283                            && collaborator.user.github_login == "user_b"
1284                    })
1285            })
1286            .await;
1287
1288        // Open the same file as client B and client A.
1289        let buffer_b = project_b
1290            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1291            .await
1292            .unwrap();
1293        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1294        buffer_b.read_with(&cx_b, |buf, cx| {
1295            assert_eq!(buf.read(cx).text(), "b-contents")
1296        });
1297        project_a.read_with(&cx_a, |project, cx| {
1298            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1299        });
1300        let buffer_a = project_a
1301            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1302            .await
1303            .unwrap();
1304
1305        let editor_b = cx_b.add_view(window_b, |cx| {
1306            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1307        });
1308
1309        // TODO
1310        // // Create a selection set as client B and see that selection set as client A.
1311        // buffer_a
1312        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1313        //     .await;
1314
1315        // Edit the buffer as client B and see that edit as client A.
1316        editor_b.update(&mut cx_b, |editor, cx| {
1317            editor.handle_input(&Input("ok, ".into()), cx)
1318        });
1319        buffer_a
1320            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1321            .await;
1322
1323        // TODO
1324        // // Remove the selection set as client B, see those selections disappear as client A.
1325        cx_b.update(move |_| drop(editor_b));
1326        // buffer_a
1327        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1328        //     .await;
1329
1330        // Close the buffer as client A, see that the buffer is closed.
1331        cx_a.update(move |_| drop(buffer_a));
1332        project_a
1333            .condition(&cx_a, |project, cx| {
1334                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1335            })
1336            .await;
1337
1338        // Dropping the client B's project removes client B from client A's collaborators.
1339        cx_b.update(move |_| drop(project_b));
1340        project_a
1341            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1342            .await;
1343    }
1344
1345    #[gpui::test(iterations = 10)]
1346    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1347        let lang_registry = Arc::new(LanguageRegistry::new());
1348        let fs = FakeFs::new(cx_a.background());
1349        cx_a.foreground().forbid_parking();
1350
1351        // Connect to a server as 2 clients.
1352        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1353        let client_a = server.create_client(&mut cx_a, "user_a").await;
1354        let client_b = server.create_client(&mut cx_b, "user_b").await;
1355
1356        // Share a project as client A
1357        fs.insert_tree(
1358            "/a",
1359            json!({
1360                ".zed.toml": r#"collaborators = ["user_b"]"#,
1361                "a.txt": "a-contents",
1362                "b.txt": "b-contents",
1363            }),
1364        )
1365        .await;
1366        let project_a = cx_a.update(|cx| {
1367            Project::local(
1368                client_a.clone(),
1369                client_a.user_store.clone(),
1370                lang_registry.clone(),
1371                fs.clone(),
1372                cx,
1373            )
1374        });
1375        let (worktree_a, _) = project_a
1376            .update(&mut cx_a, |p, cx| {
1377                p.find_or_create_local_worktree("/a", false, cx)
1378            })
1379            .await
1380            .unwrap();
1381        worktree_a
1382            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1383            .await;
1384        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1385        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1386        project_a
1387            .update(&mut cx_a, |p, cx| p.share(cx))
1388            .await
1389            .unwrap();
1390        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1391
1392        // Join that project as client B
1393        let project_b = Project::remote(
1394            project_id,
1395            client_b.clone(),
1396            client_b.user_store.clone(),
1397            lang_registry.clone(),
1398            fs.clone(),
1399            &mut cx_b.to_async(),
1400        )
1401        .await
1402        .unwrap();
1403        project_b
1404            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1405            .await
1406            .unwrap();
1407
1408        // Unshare the project as client A
1409        project_a
1410            .update(&mut cx_a, |project, cx| project.unshare(cx))
1411            .await
1412            .unwrap();
1413        project_b
1414            .condition(&mut cx_b, |project, _| project.is_read_only())
1415            .await;
1416        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1417        drop(project_b);
1418
1419        // Share the project again and ensure guests can still join.
1420        project_a
1421            .update(&mut cx_a, |project, cx| project.share(cx))
1422            .await
1423            .unwrap();
1424        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1425
1426        let project_c = Project::remote(
1427            project_id,
1428            client_b.clone(),
1429            client_b.user_store.clone(),
1430            lang_registry.clone(),
1431            fs.clone(),
1432            &mut cx_b.to_async(),
1433        )
1434        .await
1435        .unwrap();
1436        project_c
1437            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1438            .await
1439            .unwrap();
1440    }
1441
1442    #[gpui::test(iterations = 10)]
1443    async fn test_propagate_saves_and_fs_changes(
1444        mut cx_a: TestAppContext,
1445        mut cx_b: TestAppContext,
1446        mut cx_c: TestAppContext,
1447    ) {
1448        let lang_registry = Arc::new(LanguageRegistry::new());
1449        let fs = FakeFs::new(cx_a.background());
1450        cx_a.foreground().forbid_parking();
1451
1452        // Connect to a server as 3 clients.
1453        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1454        let client_a = server.create_client(&mut cx_a, "user_a").await;
1455        let client_b = server.create_client(&mut cx_b, "user_b").await;
1456        let client_c = server.create_client(&mut cx_c, "user_c").await;
1457
1458        // Share a worktree as client A.
1459        fs.insert_tree(
1460            "/a",
1461            json!({
1462                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1463                "file1": "",
1464                "file2": ""
1465            }),
1466        )
1467        .await;
1468        let project_a = cx_a.update(|cx| {
1469            Project::local(
1470                client_a.clone(),
1471                client_a.user_store.clone(),
1472                lang_registry.clone(),
1473                fs.clone(),
1474                cx,
1475            )
1476        });
1477        let (worktree_a, _) = project_a
1478            .update(&mut cx_a, |p, cx| {
1479                p.find_or_create_local_worktree("/a", false, cx)
1480            })
1481            .await
1482            .unwrap();
1483        worktree_a
1484            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1485            .await;
1486        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1487        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1488        project_a
1489            .update(&mut cx_a, |p, cx| p.share(cx))
1490            .await
1491            .unwrap();
1492
1493        // Join that worktree as clients B and C.
1494        let project_b = Project::remote(
1495            project_id,
1496            client_b.clone(),
1497            client_b.user_store.clone(),
1498            lang_registry.clone(),
1499            fs.clone(),
1500            &mut cx_b.to_async(),
1501        )
1502        .await
1503        .unwrap();
1504        let project_c = Project::remote(
1505            project_id,
1506            client_c.clone(),
1507            client_c.user_store.clone(),
1508            lang_registry.clone(),
1509            fs.clone(),
1510            &mut cx_c.to_async(),
1511        )
1512        .await
1513        .unwrap();
1514        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1515        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1516
1517        // Open and edit a buffer as both guests B and C.
1518        let buffer_b = project_b
1519            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1520            .await
1521            .unwrap();
1522        let buffer_c = project_c
1523            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1524            .await
1525            .unwrap();
1526        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1527        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1528
1529        // Open and edit that buffer as the host.
1530        let buffer_a = project_a
1531            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1532            .await
1533            .unwrap();
1534
1535        buffer_a
1536            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1537            .await;
1538        buffer_a.update(&mut cx_a, |buf, cx| {
1539            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1540        });
1541
1542        // Wait for edits to propagate
1543        buffer_a
1544            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1545            .await;
1546        buffer_b
1547            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1548            .await;
1549        buffer_c
1550            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1551            .await;
1552
1553        // Edit the buffer as the host and concurrently save as guest B.
1554        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1555        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1556        save_b.await.unwrap();
1557        assert_eq!(
1558            fs.load("/a/file1".as_ref()).await.unwrap(),
1559            "hi-a, i-am-c, i-am-b, i-am-a"
1560        );
1561        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1562        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1563        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1564
1565        // Make changes on host's file system, see those changes on guest worktrees.
1566        fs.rename(
1567            "/a/file1".as_ref(),
1568            "/a/file1-renamed".as_ref(),
1569            Default::default(),
1570        )
1571        .await
1572        .unwrap();
1573
1574        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1575            .await
1576            .unwrap();
1577        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1578
1579        worktree_a
1580            .condition(&cx_a, |tree, _| {
1581                tree.paths()
1582                    .map(|p| p.to_string_lossy())
1583                    .collect::<Vec<_>>()
1584                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1585            })
1586            .await;
1587        worktree_b
1588            .condition(&cx_b, |tree, _| {
1589                tree.paths()
1590                    .map(|p| p.to_string_lossy())
1591                    .collect::<Vec<_>>()
1592                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1593            })
1594            .await;
1595        worktree_c
1596            .condition(&cx_c, |tree, _| {
1597                tree.paths()
1598                    .map(|p| p.to_string_lossy())
1599                    .collect::<Vec<_>>()
1600                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1601            })
1602            .await;
1603
1604        // Ensure buffer files are updated as well.
1605        buffer_a
1606            .condition(&cx_a, |buf, _| {
1607                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1608            })
1609            .await;
1610        buffer_b
1611            .condition(&cx_b, |buf, _| {
1612                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1613            })
1614            .await;
1615        buffer_c
1616            .condition(&cx_c, |buf, _| {
1617                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1618            })
1619            .await;
1620    }
1621
1622    #[gpui::test(iterations = 10)]
1623    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1624        cx_a.foreground().forbid_parking();
1625        let lang_registry = Arc::new(LanguageRegistry::new());
1626        let fs = FakeFs::new(cx_a.background());
1627
1628        // Connect to a server as 2 clients.
1629        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1630        let client_a = server.create_client(&mut cx_a, "user_a").await;
1631        let client_b = server.create_client(&mut cx_b, "user_b").await;
1632
1633        // Share a project as client A
1634        fs.insert_tree(
1635            "/dir",
1636            json!({
1637                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1638                "a.txt": "a-contents",
1639            }),
1640        )
1641        .await;
1642
1643        let project_a = cx_a.update(|cx| {
1644            Project::local(
1645                client_a.clone(),
1646                client_a.user_store.clone(),
1647                lang_registry.clone(),
1648                fs.clone(),
1649                cx,
1650            )
1651        });
1652        let (worktree_a, _) = project_a
1653            .update(&mut cx_a, |p, cx| {
1654                p.find_or_create_local_worktree("/dir", false, cx)
1655            })
1656            .await
1657            .unwrap();
1658        worktree_a
1659            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1660            .await;
1661        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1662        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1663        project_a
1664            .update(&mut cx_a, |p, cx| p.share(cx))
1665            .await
1666            .unwrap();
1667
1668        // Join that project as client B
1669        let project_b = Project::remote(
1670            project_id,
1671            client_b.clone(),
1672            client_b.user_store.clone(),
1673            lang_registry.clone(),
1674            fs.clone(),
1675            &mut cx_b.to_async(),
1676        )
1677        .await
1678        .unwrap();
1679
1680        // Open a buffer as client B
1681        let buffer_b = project_b
1682            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1683            .await
1684            .unwrap();
1685
1686        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1687        buffer_b.read_with(&cx_b, |buf, _| {
1688            assert!(buf.is_dirty());
1689            assert!(!buf.has_conflict());
1690        });
1691
1692        buffer_b
1693            .update(&mut cx_b, |buf, cx| buf.save(cx))
1694            .await
1695            .unwrap();
1696        buffer_b
1697            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1698            .await;
1699        buffer_b.read_with(&cx_b, |buf, _| {
1700            assert!(!buf.has_conflict());
1701        });
1702
1703        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1704        buffer_b.read_with(&cx_b, |buf, _| {
1705            assert!(buf.is_dirty());
1706            assert!(!buf.has_conflict());
1707        });
1708    }
1709
1710    #[gpui::test(iterations = 10)]
1711    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1712        cx_a.foreground().forbid_parking();
1713        let lang_registry = Arc::new(LanguageRegistry::new());
1714        let fs = FakeFs::new(cx_a.background());
1715
1716        // Connect to a server as 2 clients.
1717        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1718        let client_a = server.create_client(&mut cx_a, "user_a").await;
1719        let client_b = server.create_client(&mut cx_b, "user_b").await;
1720
1721        // Share a project as client A
1722        fs.insert_tree(
1723            "/dir",
1724            json!({
1725                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1726                "a.txt": "a-contents",
1727            }),
1728        )
1729        .await;
1730
1731        let project_a = cx_a.update(|cx| {
1732            Project::local(
1733                client_a.clone(),
1734                client_a.user_store.clone(),
1735                lang_registry.clone(),
1736                fs.clone(),
1737                cx,
1738            )
1739        });
1740        let (worktree_a, _) = project_a
1741            .update(&mut cx_a, |p, cx| {
1742                p.find_or_create_local_worktree("/dir", false, cx)
1743            })
1744            .await
1745            .unwrap();
1746        worktree_a
1747            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1748            .await;
1749        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1750        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1751        project_a
1752            .update(&mut cx_a, |p, cx| p.share(cx))
1753            .await
1754            .unwrap();
1755
1756        // Join that project as client B
1757        let project_b = Project::remote(
1758            project_id,
1759            client_b.clone(),
1760            client_b.user_store.clone(),
1761            lang_registry.clone(),
1762            fs.clone(),
1763            &mut cx_b.to_async(),
1764        )
1765        .await
1766        .unwrap();
1767        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1768
1769        // Open a buffer as client B
1770        let buffer_b = project_b
1771            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1772            .await
1773            .unwrap();
1774        buffer_b.read_with(&cx_b, |buf, _| {
1775            assert!(!buf.is_dirty());
1776            assert!(!buf.has_conflict());
1777        });
1778
1779        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1780            .await
1781            .unwrap();
1782        buffer_b
1783            .condition(&cx_b, |buf, _| {
1784                buf.text() == "new contents" && !buf.is_dirty()
1785            })
1786            .await;
1787        buffer_b.read_with(&cx_b, |buf, _| {
1788            assert!(!buf.has_conflict());
1789        });
1790    }
1791
1792    #[gpui::test(iterations = 10)]
1793    async fn test_editing_while_guest_opens_buffer(
1794        mut cx_a: TestAppContext,
1795        mut cx_b: TestAppContext,
1796    ) {
1797        cx_a.foreground().forbid_parking();
1798        let lang_registry = Arc::new(LanguageRegistry::new());
1799        let fs = FakeFs::new(cx_a.background());
1800
1801        // Connect to a server as 2 clients.
1802        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1803        let client_a = server.create_client(&mut cx_a, "user_a").await;
1804        let client_b = server.create_client(&mut cx_b, "user_b").await;
1805
1806        // Share a project as client A
1807        fs.insert_tree(
1808            "/dir",
1809            json!({
1810                ".zed.toml": r#"collaborators = ["user_b"]"#,
1811                "a.txt": "a-contents",
1812            }),
1813        )
1814        .await;
1815        let project_a = cx_a.update(|cx| {
1816            Project::local(
1817                client_a.clone(),
1818                client_a.user_store.clone(),
1819                lang_registry.clone(),
1820                fs.clone(),
1821                cx,
1822            )
1823        });
1824        let (worktree_a, _) = project_a
1825            .update(&mut cx_a, |p, cx| {
1826                p.find_or_create_local_worktree("/dir", false, cx)
1827            })
1828            .await
1829            .unwrap();
1830        worktree_a
1831            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1832            .await;
1833        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1834        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1835        project_a
1836            .update(&mut cx_a, |p, cx| p.share(cx))
1837            .await
1838            .unwrap();
1839
1840        // Join that project as client B
1841        let project_b = Project::remote(
1842            project_id,
1843            client_b.clone(),
1844            client_b.user_store.clone(),
1845            lang_registry.clone(),
1846            fs.clone(),
1847            &mut cx_b.to_async(),
1848        )
1849        .await
1850        .unwrap();
1851
1852        // Open a buffer as client A
1853        let buffer_a = project_a
1854            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1855            .await
1856            .unwrap();
1857
1858        // Start opening the same buffer as client B
1859        let buffer_b = cx_b
1860            .background()
1861            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1862
1863        // Edit the buffer as client A while client B is still opening it.
1864        cx_b.background().simulate_random_delay().await;
1865        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1866        cx_b.background().simulate_random_delay().await;
1867        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1868
1869        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1870        let buffer_b = buffer_b.await.unwrap();
1871        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1872    }
1873
1874    #[gpui::test(iterations = 10)]
1875    async fn test_leaving_worktree_while_opening_buffer(
1876        mut cx_a: TestAppContext,
1877        mut cx_b: TestAppContext,
1878    ) {
1879        cx_a.foreground().forbid_parking();
1880        let lang_registry = Arc::new(LanguageRegistry::new());
1881        let fs = FakeFs::new(cx_a.background());
1882
1883        // Connect to a server as 2 clients.
1884        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1885        let client_a = server.create_client(&mut cx_a, "user_a").await;
1886        let client_b = server.create_client(&mut cx_b, "user_b").await;
1887
1888        // Share a project as client A
1889        fs.insert_tree(
1890            "/dir",
1891            json!({
1892                ".zed.toml": r#"collaborators = ["user_b"]"#,
1893                "a.txt": "a-contents",
1894            }),
1895        )
1896        .await;
1897        let project_a = cx_a.update(|cx| {
1898            Project::local(
1899                client_a.clone(),
1900                client_a.user_store.clone(),
1901                lang_registry.clone(),
1902                fs.clone(),
1903                cx,
1904            )
1905        });
1906        let (worktree_a, _) = project_a
1907            .update(&mut cx_a, |p, cx| {
1908                p.find_or_create_local_worktree("/dir", false, cx)
1909            })
1910            .await
1911            .unwrap();
1912        worktree_a
1913            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1914            .await;
1915        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1916        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1917        project_a
1918            .update(&mut cx_a, |p, cx| p.share(cx))
1919            .await
1920            .unwrap();
1921
1922        // Join that project as client B
1923        let project_b = Project::remote(
1924            project_id,
1925            client_b.clone(),
1926            client_b.user_store.clone(),
1927            lang_registry.clone(),
1928            fs.clone(),
1929            &mut cx_b.to_async(),
1930        )
1931        .await
1932        .unwrap();
1933
1934        // See that a guest has joined as client A.
1935        project_a
1936            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1937            .await;
1938
1939        // Begin opening a buffer as client B, but leave the project before the open completes.
1940        let buffer_b = cx_b
1941            .background()
1942            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1943        cx_b.update(|_| drop(project_b));
1944        drop(buffer_b);
1945
1946        // See that the guest has left.
1947        project_a
1948            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1949            .await;
1950    }
1951
1952    #[gpui::test(iterations = 10)]
1953    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1954        cx_a.foreground().forbid_parking();
1955        let lang_registry = Arc::new(LanguageRegistry::new());
1956        let fs = FakeFs::new(cx_a.background());
1957
1958        // Connect to a server as 2 clients.
1959        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1960        let client_a = server.create_client(&mut cx_a, "user_a").await;
1961        let client_b = server.create_client(&mut cx_b, "user_b").await;
1962
1963        // Share a project as client A
1964        fs.insert_tree(
1965            "/a",
1966            json!({
1967                ".zed.toml": r#"collaborators = ["user_b"]"#,
1968                "a.txt": "a-contents",
1969                "b.txt": "b-contents",
1970            }),
1971        )
1972        .await;
1973        let project_a = cx_a.update(|cx| {
1974            Project::local(
1975                client_a.clone(),
1976                client_a.user_store.clone(),
1977                lang_registry.clone(),
1978                fs.clone(),
1979                cx,
1980            )
1981        });
1982        let (worktree_a, _) = project_a
1983            .update(&mut cx_a, |p, cx| {
1984                p.find_or_create_local_worktree("/a", false, cx)
1985            })
1986            .await
1987            .unwrap();
1988        worktree_a
1989            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1990            .await;
1991        let project_id = project_a
1992            .update(&mut cx_a, |project, _| project.next_remote_id())
1993            .await;
1994        project_a
1995            .update(&mut cx_a, |project, cx| project.share(cx))
1996            .await
1997            .unwrap();
1998
1999        // Join that project as client B
2000        let _project_b = Project::remote(
2001            project_id,
2002            client_b.clone(),
2003            client_b.user_store.clone(),
2004            lang_registry.clone(),
2005            fs.clone(),
2006            &mut cx_b.to_async(),
2007        )
2008        .await
2009        .unwrap();
2010
2011        // See that a guest has joined as client A.
2012        project_a
2013            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2014            .await;
2015
2016        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2017        client_b.disconnect(&cx_b.to_async()).unwrap();
2018        project_a
2019            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2020            .await;
2021    }
2022
2023    #[gpui::test(iterations = 10)]
2024    async fn test_collaborating_with_diagnostics(
2025        mut cx_a: TestAppContext,
2026        mut cx_b: TestAppContext,
2027    ) {
2028        cx_a.foreground().forbid_parking();
2029        let mut lang_registry = Arc::new(LanguageRegistry::new());
2030        let fs = FakeFs::new(cx_a.background());
2031
2032        // Set up a fake language server.
2033        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2034        Arc::get_mut(&mut lang_registry)
2035            .unwrap()
2036            .add(Arc::new(Language::new(
2037                LanguageConfig {
2038                    name: "Rust".into(),
2039                    path_suffixes: vec!["rs".to_string()],
2040                    language_server: Some(language_server_config),
2041                    ..Default::default()
2042                },
2043                Some(tree_sitter_rust::language()),
2044            )));
2045
2046        // Connect to a server as 2 clients.
2047        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2048        let client_a = server.create_client(&mut cx_a, "user_a").await;
2049        let client_b = server.create_client(&mut cx_b, "user_b").await;
2050
2051        // Share a project as client A
2052        fs.insert_tree(
2053            "/a",
2054            json!({
2055                ".zed.toml": r#"collaborators = ["user_b"]"#,
2056                "a.rs": "let one = two",
2057                "other.rs": "",
2058            }),
2059        )
2060        .await;
2061        let project_a = cx_a.update(|cx| {
2062            Project::local(
2063                client_a.clone(),
2064                client_a.user_store.clone(),
2065                lang_registry.clone(),
2066                fs.clone(),
2067                cx,
2068            )
2069        });
2070        let (worktree_a, _) = project_a
2071            .update(&mut cx_a, |p, cx| {
2072                p.find_or_create_local_worktree("/a", false, cx)
2073            })
2074            .await
2075            .unwrap();
2076        worktree_a
2077            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2078            .await;
2079        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2080        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2081        project_a
2082            .update(&mut cx_a, |p, cx| p.share(cx))
2083            .await
2084            .unwrap();
2085
2086        // Cause the language server to start.
2087        let _ = cx_a
2088            .background()
2089            .spawn(project_a.update(&mut cx_a, |project, cx| {
2090                project.open_buffer(
2091                    ProjectPath {
2092                        worktree_id,
2093                        path: Path::new("other.rs").into(),
2094                    },
2095                    cx,
2096                )
2097            }))
2098            .await
2099            .unwrap();
2100
2101        // Simulate a language server reporting errors for a file.
2102        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2103        fake_language_server
2104            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2105                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2106                version: None,
2107                diagnostics: vec![lsp::Diagnostic {
2108                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2109                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2110                    message: "message 1".to_string(),
2111                    ..Default::default()
2112                }],
2113            })
2114            .await;
2115
2116        // Wait for server to see the diagnostics update.
2117        server
2118            .condition(|store| {
2119                let worktree = store
2120                    .project(project_id)
2121                    .unwrap()
2122                    .worktrees
2123                    .get(&worktree_id.to_proto())
2124                    .unwrap();
2125
2126                !worktree
2127                    .share
2128                    .as_ref()
2129                    .unwrap()
2130                    .diagnostic_summaries
2131                    .is_empty()
2132            })
2133            .await;
2134
2135        // Join the worktree as client B.
2136        let project_b = Project::remote(
2137            project_id,
2138            client_b.clone(),
2139            client_b.user_store.clone(),
2140            lang_registry.clone(),
2141            fs.clone(),
2142            &mut cx_b.to_async(),
2143        )
2144        .await
2145        .unwrap();
2146
2147        project_b.read_with(&cx_b, |project, cx| {
2148            assert_eq!(
2149                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2150                &[(
2151                    ProjectPath {
2152                        worktree_id,
2153                        path: Arc::from(Path::new("a.rs")),
2154                    },
2155                    DiagnosticSummary {
2156                        error_count: 1,
2157                        warning_count: 0,
2158                        ..Default::default()
2159                    },
2160                )]
2161            )
2162        });
2163
2164        // Simulate a language server reporting more errors for a file.
2165        fake_language_server
2166            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2167                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2168                version: None,
2169                diagnostics: vec![
2170                    lsp::Diagnostic {
2171                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2172                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2173                        message: "message 1".to_string(),
2174                        ..Default::default()
2175                    },
2176                    lsp::Diagnostic {
2177                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2178                        range: lsp::Range::new(
2179                            lsp::Position::new(0, 10),
2180                            lsp::Position::new(0, 13),
2181                        ),
2182                        message: "message 2".to_string(),
2183                        ..Default::default()
2184                    },
2185                ],
2186            })
2187            .await;
2188
2189        // Client b gets the updated summaries
2190        project_b
2191            .condition(&cx_b, |project, cx| {
2192                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2193                    == &[(
2194                        ProjectPath {
2195                            worktree_id,
2196                            path: Arc::from(Path::new("a.rs")),
2197                        },
2198                        DiagnosticSummary {
2199                            error_count: 1,
2200                            warning_count: 1,
2201                            ..Default::default()
2202                        },
2203                    )]
2204            })
2205            .await;
2206
2207        // Open the file with the errors on client B. They should be present.
2208        let buffer_b = cx_b
2209            .background()
2210            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2211            .await
2212            .unwrap();
2213
2214        buffer_b.read_with(&cx_b, |buffer, _| {
2215            assert_eq!(
2216                buffer
2217                    .snapshot()
2218                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2219                    .map(|entry| entry)
2220                    .collect::<Vec<_>>(),
2221                &[
2222                    DiagnosticEntry {
2223                        range: Point::new(0, 4)..Point::new(0, 7),
2224                        diagnostic: Diagnostic {
2225                            group_id: 0,
2226                            message: "message 1".to_string(),
2227                            severity: lsp::DiagnosticSeverity::ERROR,
2228                            is_primary: true,
2229                            ..Default::default()
2230                        }
2231                    },
2232                    DiagnosticEntry {
2233                        range: Point::new(0, 10)..Point::new(0, 13),
2234                        diagnostic: Diagnostic {
2235                            group_id: 1,
2236                            severity: lsp::DiagnosticSeverity::WARNING,
2237                            message: "message 2".to_string(),
2238                            is_primary: true,
2239                            ..Default::default()
2240                        }
2241                    }
2242                ]
2243            );
2244        });
2245    }
2246
2247    #[gpui::test(iterations = 10)]
2248    async fn test_collaborating_with_completion(
2249        mut cx_a: TestAppContext,
2250        mut cx_b: TestAppContext,
2251    ) {
2252        cx_a.foreground().forbid_parking();
2253        let mut lang_registry = Arc::new(LanguageRegistry::new());
2254        let fs = FakeFs::new(cx_a.background());
2255
2256        // Set up a fake language server.
2257        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2258        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2259            completion_provider: Some(lsp::CompletionOptions {
2260                trigger_characters: Some(vec![".".to_string()]),
2261                ..Default::default()
2262            }),
2263            ..Default::default()
2264        });
2265        Arc::get_mut(&mut lang_registry)
2266            .unwrap()
2267            .add(Arc::new(Language::new(
2268                LanguageConfig {
2269                    name: "Rust".into(),
2270                    path_suffixes: vec!["rs".to_string()],
2271                    language_server: Some(language_server_config),
2272                    ..Default::default()
2273                },
2274                Some(tree_sitter_rust::language()),
2275            )));
2276
2277        // Connect to a server as 2 clients.
2278        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2279        let client_a = server.create_client(&mut cx_a, "user_a").await;
2280        let client_b = server.create_client(&mut cx_b, "user_b").await;
2281
2282        // Share a project as client A
2283        fs.insert_tree(
2284            "/a",
2285            json!({
2286                ".zed.toml": r#"collaborators = ["user_b"]"#,
2287                "main.rs": "fn main() { a }",
2288                "other.rs": "",
2289            }),
2290        )
2291        .await;
2292        let project_a = cx_a.update(|cx| {
2293            Project::local(
2294                client_a.clone(),
2295                client_a.user_store.clone(),
2296                lang_registry.clone(),
2297                fs.clone(),
2298                cx,
2299            )
2300        });
2301        let (worktree_a, _) = project_a
2302            .update(&mut cx_a, |p, cx| {
2303                p.find_or_create_local_worktree("/a", false, cx)
2304            })
2305            .await
2306            .unwrap();
2307        worktree_a
2308            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2309            .await;
2310        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2311        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2312        project_a
2313            .update(&mut cx_a, |p, cx| p.share(cx))
2314            .await
2315            .unwrap();
2316
2317        // Join the worktree as client B.
2318        let project_b = Project::remote(
2319            project_id,
2320            client_b.clone(),
2321            client_b.user_store.clone(),
2322            lang_registry.clone(),
2323            fs.clone(),
2324            &mut cx_b.to_async(),
2325        )
2326        .await
2327        .unwrap();
2328
2329        // Open a file in an editor as the guest.
2330        let buffer_b = project_b
2331            .update(&mut cx_b, |p, cx| {
2332                p.open_buffer((worktree_id, "main.rs"), cx)
2333            })
2334            .await
2335            .unwrap();
2336        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2337        let editor_b = cx_b.add_view(window_b, |cx| {
2338            Editor::for_buffer(
2339                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2340                Arc::new(|cx| EditorSettings::test(cx)),
2341                Some(project_b.clone()),
2342                cx,
2343            )
2344        });
2345
2346        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2347        buffer_b
2348            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2349            .await;
2350
2351        // Type a completion trigger character as the guest.
2352        editor_b.update(&mut cx_b, |editor, cx| {
2353            editor.select_ranges([13..13], None, cx);
2354            editor.handle_input(&Input(".".into()), cx);
2355            cx.focus(&editor_b);
2356        });
2357
2358        // Receive a completion request as the host's language server.
2359        // Return some completions from the host's language server.
2360        cx_a.foreground().start_waiting();
2361        fake_language_server
2362            .handle_request::<lsp::request::Completion, _>(|params| {
2363                assert_eq!(
2364                    params.text_document_position.text_document.uri,
2365                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2366                );
2367                assert_eq!(
2368                    params.text_document_position.position,
2369                    lsp::Position::new(0, 14),
2370                );
2371
2372                Some(lsp::CompletionResponse::Array(vec![
2373                    lsp::CompletionItem {
2374                        label: "first_method(…)".into(),
2375                        detail: Some("fn(&mut self, B) -> C".into()),
2376                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2377                            new_text: "first_method($1)".to_string(),
2378                            range: lsp::Range::new(
2379                                lsp::Position::new(0, 14),
2380                                lsp::Position::new(0, 14),
2381                            ),
2382                        })),
2383                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2384                        ..Default::default()
2385                    },
2386                    lsp::CompletionItem {
2387                        label: "second_method(…)".into(),
2388                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2389                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2390                            new_text: "second_method()".to_string(),
2391                            range: lsp::Range::new(
2392                                lsp::Position::new(0, 14),
2393                                lsp::Position::new(0, 14),
2394                            ),
2395                        })),
2396                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2397                        ..Default::default()
2398                    },
2399                ]))
2400            })
2401            .next()
2402            .await
2403            .unwrap();
2404        cx_a.foreground().finish_waiting();
2405
2406        // Open the buffer on the host.
2407        let buffer_a = project_a
2408            .update(&mut cx_a, |p, cx| {
2409                p.open_buffer((worktree_id, "main.rs"), cx)
2410            })
2411            .await
2412            .unwrap();
2413        buffer_a
2414            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2415            .await;
2416
2417        // Confirm a completion on the guest.
2418        editor_b
2419            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2420            .await;
2421        editor_b.update(&mut cx_b, |editor, cx| {
2422            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2423            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2424        });
2425
2426        // Return a resolved completion from the host's language server.
2427        // The resolved completion has an additional text edit.
2428        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2429            assert_eq!(params.label, "first_method(…)");
2430            lsp::CompletionItem {
2431                label: "first_method(…)".into(),
2432                detail: Some("fn(&mut self, B) -> C".into()),
2433                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2434                    new_text: "first_method($1)".to_string(),
2435                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2436                })),
2437                additional_text_edits: Some(vec![lsp::TextEdit {
2438                    new_text: "use d::SomeTrait;\n".to_string(),
2439                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2440                }]),
2441                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2442                ..Default::default()
2443            }
2444        });
2445
2446        // The additional edit is applied.
2447        buffer_a
2448            .condition(&cx_a, |buffer, _| {
2449                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2450            })
2451            .await;
2452        buffer_b
2453            .condition(&cx_b, |buffer, _| {
2454                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2455            })
2456            .await;
2457    }
2458
2459    #[gpui::test(iterations = 10)]
2460    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2461        cx_a.foreground().forbid_parking();
2462        let mut lang_registry = Arc::new(LanguageRegistry::new());
2463        let fs = FakeFs::new(cx_a.background());
2464
2465        // Set up a fake language server.
2466        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2467        Arc::get_mut(&mut lang_registry)
2468            .unwrap()
2469            .add(Arc::new(Language::new(
2470                LanguageConfig {
2471                    name: "Rust".into(),
2472                    path_suffixes: vec!["rs".to_string()],
2473                    language_server: Some(language_server_config),
2474                    ..Default::default()
2475                },
2476                Some(tree_sitter_rust::language()),
2477            )));
2478
2479        // Connect to a server as 2 clients.
2480        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2481        let client_a = server.create_client(&mut cx_a, "user_a").await;
2482        let client_b = server.create_client(&mut cx_b, "user_b").await;
2483
2484        // Share a project as client A
2485        fs.insert_tree(
2486            "/a",
2487            json!({
2488                ".zed.toml": r#"collaborators = ["user_b"]"#,
2489                "a.rs": "let one = two",
2490            }),
2491        )
2492        .await;
2493        let project_a = cx_a.update(|cx| {
2494            Project::local(
2495                client_a.clone(),
2496                client_a.user_store.clone(),
2497                lang_registry.clone(),
2498                fs.clone(),
2499                cx,
2500            )
2501        });
2502        let (worktree_a, _) = project_a
2503            .update(&mut cx_a, |p, cx| {
2504                p.find_or_create_local_worktree("/a", false, cx)
2505            })
2506            .await
2507            .unwrap();
2508        worktree_a
2509            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2510            .await;
2511        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2512        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2513        project_a
2514            .update(&mut cx_a, |p, cx| p.share(cx))
2515            .await
2516            .unwrap();
2517
2518        // Join the worktree as client B.
2519        let project_b = Project::remote(
2520            project_id,
2521            client_b.clone(),
2522            client_b.user_store.clone(),
2523            lang_registry.clone(),
2524            fs.clone(),
2525            &mut cx_b.to_async(),
2526        )
2527        .await
2528        .unwrap();
2529
2530        let buffer_b = cx_b
2531            .background()
2532            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2533            .await
2534            .unwrap();
2535
2536        let format = project_b.update(&mut cx_b, |project, cx| {
2537            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2538        });
2539
2540        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2541        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2542            Some(vec![
2543                lsp::TextEdit {
2544                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2545                    new_text: "h".to_string(),
2546                },
2547                lsp::TextEdit {
2548                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2549                    new_text: "y".to_string(),
2550                },
2551            ])
2552        });
2553
2554        format.await.unwrap();
2555        assert_eq!(
2556            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2557            "let honey = two"
2558        );
2559    }
2560
2561    #[gpui::test(iterations = 10)]
2562    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2563        cx_a.foreground().forbid_parking();
2564        let mut lang_registry = Arc::new(LanguageRegistry::new());
2565        let fs = FakeFs::new(cx_a.background());
2566        fs.insert_tree(
2567            "/root-1",
2568            json!({
2569                ".zed.toml": r#"collaborators = ["user_b"]"#,
2570                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2571            }),
2572        )
2573        .await;
2574        fs.insert_tree(
2575            "/root-2",
2576            json!({
2577                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2578            }),
2579        )
2580        .await;
2581
2582        // Set up a fake language server.
2583        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2584        Arc::get_mut(&mut lang_registry)
2585            .unwrap()
2586            .add(Arc::new(Language::new(
2587                LanguageConfig {
2588                    name: "Rust".into(),
2589                    path_suffixes: vec!["rs".to_string()],
2590                    language_server: Some(language_server_config),
2591                    ..Default::default()
2592                },
2593                Some(tree_sitter_rust::language()),
2594            )));
2595
2596        // Connect to a server as 2 clients.
2597        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2598        let client_a = server.create_client(&mut cx_a, "user_a").await;
2599        let client_b = server.create_client(&mut cx_b, "user_b").await;
2600
2601        // Share a project as client A
2602        let project_a = cx_a.update(|cx| {
2603            Project::local(
2604                client_a.clone(),
2605                client_a.user_store.clone(),
2606                lang_registry.clone(),
2607                fs.clone(),
2608                cx,
2609            )
2610        });
2611        let (worktree_a, _) = project_a
2612            .update(&mut cx_a, |p, cx| {
2613                p.find_or_create_local_worktree("/root-1", false, cx)
2614            })
2615            .await
2616            .unwrap();
2617        worktree_a
2618            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2619            .await;
2620        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2621        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2622        project_a
2623            .update(&mut cx_a, |p, cx| p.share(cx))
2624            .await
2625            .unwrap();
2626
2627        // Join the worktree as client B.
2628        let project_b = Project::remote(
2629            project_id,
2630            client_b.clone(),
2631            client_b.user_store.clone(),
2632            lang_registry.clone(),
2633            fs.clone(),
2634            &mut cx_b.to_async(),
2635        )
2636        .await
2637        .unwrap();
2638
2639        // Open the file on client B.
2640        let buffer_b = cx_b
2641            .background()
2642            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2643            .await
2644            .unwrap();
2645
2646        // Request the definition of a symbol as the guest.
2647        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2648
2649        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2650        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2651            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2652                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2653                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2654            )))
2655        });
2656
2657        let definitions_1 = definitions_1.await.unwrap();
2658        cx_b.read(|cx| {
2659            assert_eq!(definitions_1.len(), 1);
2660            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2661            let target_buffer = definitions_1[0].target_buffer.read(cx);
2662            assert_eq!(
2663                target_buffer.text(),
2664                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2665            );
2666            assert_eq!(
2667                definitions_1[0].target_range.to_point(target_buffer),
2668                Point::new(0, 6)..Point::new(0, 9)
2669            );
2670        });
2671
2672        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2673        // the previous call to `definition`.
2674        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2675        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2676            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2677                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2678                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2679            )))
2680        });
2681
2682        let definitions_2 = definitions_2.await.unwrap();
2683        cx_b.read(|cx| {
2684            assert_eq!(definitions_2.len(), 1);
2685            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2686            let target_buffer = definitions_2[0].target_buffer.read(cx);
2687            assert_eq!(
2688                target_buffer.text(),
2689                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2690            );
2691            assert_eq!(
2692                definitions_2[0].target_range.to_point(target_buffer),
2693                Point::new(1, 6)..Point::new(1, 11)
2694            );
2695        });
2696        assert_eq!(
2697            definitions_1[0].target_buffer,
2698            definitions_2[0].target_buffer
2699        );
2700
2701        cx_b.update(|_| {
2702            drop(definitions_1);
2703            drop(definitions_2);
2704        });
2705        project_b
2706            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2707            .await;
2708    }
2709
2710    #[gpui::test(iterations = 10)]
2711    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2712        cx_a.foreground().forbid_parking();
2713        let mut lang_registry = Arc::new(LanguageRegistry::new());
2714        let fs = FakeFs::new(cx_a.background());
2715        fs.insert_tree(
2716            "/code",
2717            json!({
2718                "crate-1": {
2719                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2720                    "one.rs": "const ONE: usize = 1;",
2721                },
2722                "crate-2": {
2723                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2724                },
2725                "private": {
2726                    "passwords.txt": "the-password",
2727                }
2728            }),
2729        )
2730        .await;
2731
2732        // Set up a fake language server.
2733        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2734        Arc::get_mut(&mut lang_registry)
2735            .unwrap()
2736            .add(Arc::new(Language::new(
2737                LanguageConfig {
2738                    name: "Rust".into(),
2739                    path_suffixes: vec!["rs".to_string()],
2740                    language_server: Some(language_server_config),
2741                    ..Default::default()
2742                },
2743                Some(tree_sitter_rust::language()),
2744            )));
2745
2746        // Connect to a server as 2 clients.
2747        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2748        let client_a = server.create_client(&mut cx_a, "user_a").await;
2749        let client_b = server.create_client(&mut cx_b, "user_b").await;
2750
2751        // Share a project as client A
2752        let project_a = cx_a.update(|cx| {
2753            Project::local(
2754                client_a.clone(),
2755                client_a.user_store.clone(),
2756                lang_registry.clone(),
2757                fs.clone(),
2758                cx,
2759            )
2760        });
2761        let (worktree_a, _) = project_a
2762            .update(&mut cx_a, |p, cx| {
2763                p.find_or_create_local_worktree("/code/crate-1", false, cx)
2764            })
2765            .await
2766            .unwrap();
2767        worktree_a
2768            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2769            .await;
2770        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2771        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2772        project_a
2773            .update(&mut cx_a, |p, cx| p.share(cx))
2774            .await
2775            .unwrap();
2776
2777        // Join the worktree as client B.
2778        let project_b = Project::remote(
2779            project_id,
2780            client_b.clone(),
2781            client_b.user_store.clone(),
2782            lang_registry.clone(),
2783            fs.clone(),
2784            &mut cx_b.to_async(),
2785        )
2786        .await
2787        .unwrap();
2788
2789        // Cause the language server to start.
2790        let _buffer = cx_b
2791            .background()
2792            .spawn(project_b.update(&mut cx_b, |p, cx| {
2793                p.open_buffer((worktree_id, "one.rs"), cx)
2794            }))
2795            .await
2796            .unwrap();
2797
2798        // Request the definition of a symbol as the guest.
2799        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
2800        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2801        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_| {
2802            #[allow(deprecated)]
2803            Some(vec![lsp::SymbolInformation {
2804                name: "TWO".into(),
2805                location: lsp::Location {
2806                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2807                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2808                },
2809                kind: lsp::SymbolKind::CONSTANT,
2810                tags: None,
2811                container_name: None,
2812                deprecated: None,
2813            }])
2814        });
2815
2816        let symbols = symbols.await.unwrap();
2817        assert_eq!(symbols.len(), 1);
2818        assert_eq!(symbols[0].name, "TWO");
2819
2820        // Open one of the returned symbols.
2821        let buffer_b_2 = project_b
2822            .update(&mut cx_b, |project, cx| {
2823                project.open_buffer_for_symbol(&symbols[0], cx)
2824            })
2825            .await
2826            .unwrap();
2827        buffer_b_2.read_with(&cx_b, |buffer, _| {
2828            assert_eq!(
2829                buffer.file().unwrap().path().as_ref(),
2830                Path::new("../crate-2/two.rs")
2831            );
2832        });
2833
2834        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
2835        let mut fake_symbol = symbols[0].clone();
2836        fake_symbol.path = Path::new("/code/secrets").into();
2837        let error = project_b
2838            .update(&mut cx_b, |project, cx| {
2839                project.open_buffer_for_symbol(&fake_symbol, cx)
2840            })
2841            .await
2842            .unwrap_err();
2843        assert!(error.to_string().contains("invalid symbol signature"));
2844    }
2845
2846    #[gpui::test(iterations = 10)]
2847    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2848        mut cx_a: TestAppContext,
2849        mut cx_b: TestAppContext,
2850        mut rng: StdRng,
2851    ) {
2852        cx_a.foreground().forbid_parking();
2853        let mut lang_registry = Arc::new(LanguageRegistry::new());
2854        let fs = FakeFs::new(cx_a.background());
2855        fs.insert_tree(
2856            "/root",
2857            json!({
2858                ".zed.toml": r#"collaborators = ["user_b"]"#,
2859                "a.rs": "const ONE: usize = b::TWO;",
2860                "b.rs": "const TWO: usize = 2",
2861            }),
2862        )
2863        .await;
2864
2865        // Set up a fake language server.
2866        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2867
2868        Arc::get_mut(&mut lang_registry)
2869            .unwrap()
2870            .add(Arc::new(Language::new(
2871                LanguageConfig {
2872                    name: "Rust".into(),
2873                    path_suffixes: vec!["rs".to_string()],
2874                    language_server: Some(language_server_config),
2875                    ..Default::default()
2876                },
2877                Some(tree_sitter_rust::language()),
2878            )));
2879
2880        // Connect to a server as 2 clients.
2881        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2882        let client_a = server.create_client(&mut cx_a, "user_a").await;
2883        let client_b = server.create_client(&mut cx_b, "user_b").await;
2884
2885        // Share a project as client A
2886        let project_a = cx_a.update(|cx| {
2887            Project::local(
2888                client_a.clone(),
2889                client_a.user_store.clone(),
2890                lang_registry.clone(),
2891                fs.clone(),
2892                cx,
2893            )
2894        });
2895
2896        let (worktree_a, _) = project_a
2897            .update(&mut cx_a, |p, cx| {
2898                p.find_or_create_local_worktree("/root", false, cx)
2899            })
2900            .await
2901            .unwrap();
2902        worktree_a
2903            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2904            .await;
2905        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2906        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2907        project_a
2908            .update(&mut cx_a, |p, cx| p.share(cx))
2909            .await
2910            .unwrap();
2911
2912        // Join the worktree as client B.
2913        let project_b = Project::remote(
2914            project_id,
2915            client_b.clone(),
2916            client_b.user_store.clone(),
2917            lang_registry.clone(),
2918            fs.clone(),
2919            &mut cx_b.to_async(),
2920        )
2921        .await
2922        .unwrap();
2923
2924        let buffer_b1 = cx_b
2925            .background()
2926            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2927            .await
2928            .unwrap();
2929
2930        let definitions;
2931        let buffer_b2;
2932        if rng.gen() {
2933            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2934            buffer_b2 =
2935                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2936        } else {
2937            buffer_b2 =
2938                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2939            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2940        }
2941
2942        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2943        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2944            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2945                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2946                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2947            )))
2948        });
2949
2950        let buffer_b2 = buffer_b2.await.unwrap();
2951        let definitions = definitions.await.unwrap();
2952        assert_eq!(definitions.len(), 1);
2953        assert_eq!(definitions[0].target_buffer, buffer_b2);
2954    }
2955
2956    #[gpui::test(iterations = 10)]
2957    async fn test_collaborating_with_code_actions(
2958        mut cx_a: TestAppContext,
2959        mut cx_b: TestAppContext,
2960    ) {
2961        cx_a.foreground().forbid_parking();
2962        let mut lang_registry = Arc::new(LanguageRegistry::new());
2963        let fs = FakeFs::new(cx_a.background());
2964        let mut path_openers_b = Vec::new();
2965        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2966
2967        // Set up a fake language server.
2968        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2969        Arc::get_mut(&mut lang_registry)
2970            .unwrap()
2971            .add(Arc::new(Language::new(
2972                LanguageConfig {
2973                    name: "Rust".into(),
2974                    path_suffixes: vec!["rs".to_string()],
2975                    language_server: Some(language_server_config),
2976                    ..Default::default()
2977                },
2978                Some(tree_sitter_rust::language()),
2979            )));
2980
2981        // Connect to a server as 2 clients.
2982        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2983        let client_a = server.create_client(&mut cx_a, "user_a").await;
2984        let client_b = server.create_client(&mut cx_b, "user_b").await;
2985
2986        // Share a project as client A
2987        fs.insert_tree(
2988            "/a",
2989            json!({
2990                ".zed.toml": r#"collaborators = ["user_b"]"#,
2991                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2992                "other.rs": "pub fn foo() -> usize { 4 }",
2993            }),
2994        )
2995        .await;
2996        let project_a = cx_a.update(|cx| {
2997            Project::local(
2998                client_a.clone(),
2999                client_a.user_store.clone(),
3000                lang_registry.clone(),
3001                fs.clone(),
3002                cx,
3003            )
3004        });
3005        let (worktree_a, _) = project_a
3006            .update(&mut cx_a, |p, cx| {
3007                p.find_or_create_local_worktree("/a", false, cx)
3008            })
3009            .await
3010            .unwrap();
3011        worktree_a
3012            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3013            .await;
3014        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3015        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3016        project_a
3017            .update(&mut cx_a, |p, cx| p.share(cx))
3018            .await
3019            .unwrap();
3020
3021        // Join the worktree as client B.
3022        let project_b = Project::remote(
3023            project_id,
3024            client_b.clone(),
3025            client_b.user_store.clone(),
3026            lang_registry.clone(),
3027            fs.clone(),
3028            &mut cx_b.to_async(),
3029        )
3030        .await
3031        .unwrap();
3032        let mut params = cx_b.update(WorkspaceParams::test);
3033        params.languages = lang_registry.clone();
3034        params.client = client_b.client.clone();
3035        params.user_store = client_b.user_store.clone();
3036        params.project = project_b;
3037        params.path_openers = path_openers_b.into();
3038
3039        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3040        let editor_b = workspace_b
3041            .update(&mut cx_b, |workspace, cx| {
3042                workspace.open_path((worktree_id, "main.rs").into(), cx)
3043            })
3044            .await
3045            .unwrap()
3046            .downcast::<Editor>()
3047            .unwrap();
3048
3049        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3050        fake_language_server
3051            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
3052                assert_eq!(
3053                    params.text_document.uri,
3054                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3055                );
3056                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3057                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3058                None
3059            })
3060            .next()
3061            .await;
3062
3063        // Move cursor to a location that contains code actions.
3064        editor_b.update(&mut cx_b, |editor, cx| {
3065            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3066            cx.focus(&editor_b);
3067        });
3068
3069        fake_language_server
3070            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
3071                assert_eq!(
3072                    params.text_document.uri,
3073                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3074                );
3075                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3076                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3077
3078                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3079                    lsp::CodeAction {
3080                        title: "Inline into all callers".to_string(),
3081                        edit: Some(lsp::WorkspaceEdit {
3082                            changes: Some(
3083                                [
3084                                    (
3085                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3086                                        vec![lsp::TextEdit::new(
3087                                            lsp::Range::new(
3088                                                lsp::Position::new(1, 22),
3089                                                lsp::Position::new(1, 34),
3090                                            ),
3091                                            "4".to_string(),
3092                                        )],
3093                                    ),
3094                                    (
3095                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3096                                        vec![lsp::TextEdit::new(
3097                                            lsp::Range::new(
3098                                                lsp::Position::new(0, 0),
3099                                                lsp::Position::new(0, 27),
3100                                            ),
3101                                            "".to_string(),
3102                                        )],
3103                                    ),
3104                                ]
3105                                .into_iter()
3106                                .collect(),
3107                            ),
3108                            ..Default::default()
3109                        }),
3110                        data: Some(json!({
3111                            "codeActionParams": {
3112                                "range": {
3113                                    "start": {"line": 1, "column": 31},
3114                                    "end": {"line": 1, "column": 31},
3115                                }
3116                            }
3117                        })),
3118                        ..Default::default()
3119                    },
3120                )])
3121            })
3122            .next()
3123            .await;
3124
3125        // Toggle code actions and wait for them to display.
3126        editor_b.update(&mut cx_b, |editor, cx| {
3127            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3128        });
3129        editor_b
3130            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3131            .await;
3132
3133        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3134
3135        // Confirming the code action will trigger a resolve request.
3136        let confirm_action = workspace_b
3137            .update(&mut cx_b, |workspace, cx| {
3138                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3139            })
3140            .unwrap();
3141        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
3142            lsp::CodeAction {
3143                title: "Inline into all callers".to_string(),
3144                edit: Some(lsp::WorkspaceEdit {
3145                    changes: Some(
3146                        [
3147                            (
3148                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3149                                vec![lsp::TextEdit::new(
3150                                    lsp::Range::new(
3151                                        lsp::Position::new(1, 22),
3152                                        lsp::Position::new(1, 34),
3153                                    ),
3154                                    "4".to_string(),
3155                                )],
3156                            ),
3157                            (
3158                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3159                                vec![lsp::TextEdit::new(
3160                                    lsp::Range::new(
3161                                        lsp::Position::new(0, 0),
3162                                        lsp::Position::new(0, 27),
3163                                    ),
3164                                    "".to_string(),
3165                                )],
3166                            ),
3167                        ]
3168                        .into_iter()
3169                        .collect(),
3170                    ),
3171                    ..Default::default()
3172                }),
3173                ..Default::default()
3174            }
3175        });
3176
3177        // After the action is confirmed, an editor containing both modified files is opened.
3178        confirm_action.await.unwrap();
3179        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3180            workspace
3181                .active_item(cx)
3182                .unwrap()
3183                .downcast::<Editor>()
3184                .unwrap()
3185        });
3186        code_action_editor.update(&mut cx_b, |editor, cx| {
3187            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3188            editor.undo(&Undo, cx);
3189            assert_eq!(
3190                editor.text(cx),
3191                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3192            );
3193            editor.redo(&Redo, cx);
3194            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3195        });
3196    }
3197
3198    #[gpui::test(iterations = 10)]
3199    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3200        cx_a.foreground().forbid_parking();
3201        let mut lang_registry = Arc::new(LanguageRegistry::new());
3202        let fs = FakeFs::new(cx_a.background());
3203        let mut path_openers_b = Vec::new();
3204        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3205
3206        // Set up a fake language server.
3207        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3208        Arc::get_mut(&mut lang_registry)
3209            .unwrap()
3210            .add(Arc::new(Language::new(
3211                LanguageConfig {
3212                    name: "Rust".into(),
3213                    path_suffixes: vec!["rs".to_string()],
3214                    language_server: Some(language_server_config),
3215                    ..Default::default()
3216                },
3217                Some(tree_sitter_rust::language()),
3218            )));
3219
3220        // Connect to a server as 2 clients.
3221        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3222        let client_a = server.create_client(&mut cx_a, "user_a").await;
3223        let client_b = server.create_client(&mut cx_b, "user_b").await;
3224
3225        // Share a project as client A
3226        fs.insert_tree(
3227            "/dir",
3228            json!({
3229                ".zed.toml": r#"collaborators = ["user_b"]"#,
3230                "one.rs": "const ONE: usize = 1;",
3231                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3232            }),
3233        )
3234        .await;
3235        let project_a = cx_a.update(|cx| {
3236            Project::local(
3237                client_a.clone(),
3238                client_a.user_store.clone(),
3239                lang_registry.clone(),
3240                fs.clone(),
3241                cx,
3242            )
3243        });
3244        let (worktree_a, _) = project_a
3245            .update(&mut cx_a, |p, cx| {
3246                p.find_or_create_local_worktree("/dir", false, cx)
3247            })
3248            .await
3249            .unwrap();
3250        worktree_a
3251            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3252            .await;
3253        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3254        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3255        project_a
3256            .update(&mut cx_a, |p, cx| p.share(cx))
3257            .await
3258            .unwrap();
3259
3260        // Join the worktree as client B.
3261        let project_b = Project::remote(
3262            project_id,
3263            client_b.clone(),
3264            client_b.user_store.clone(),
3265            lang_registry.clone(),
3266            fs.clone(),
3267            &mut cx_b.to_async(),
3268        )
3269        .await
3270        .unwrap();
3271        let mut params = cx_b.update(WorkspaceParams::test);
3272        params.languages = lang_registry.clone();
3273        params.client = client_b.client.clone();
3274        params.user_store = client_b.user_store.clone();
3275        params.project = project_b;
3276        params.path_openers = path_openers_b.into();
3277
3278        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3279        let editor_b = workspace_b
3280            .update(&mut cx_b, |workspace, cx| {
3281                workspace.open_path((worktree_id, "one.rs").into(), cx)
3282            })
3283            .await
3284            .unwrap()
3285            .downcast::<Editor>()
3286            .unwrap();
3287        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3288
3289        // Move cursor to a location that can be renamed.
3290        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3291            editor.select_ranges([7..7], None, cx);
3292            editor.rename(&Rename, cx).unwrap()
3293        });
3294
3295        fake_language_server
3296            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
3297                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3298                assert_eq!(params.position, lsp::Position::new(0, 7));
3299                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3300                    lsp::Position::new(0, 6),
3301                    lsp::Position::new(0, 9),
3302                )))
3303            })
3304            .next()
3305            .await
3306            .unwrap();
3307        prepare_rename.await.unwrap();
3308        editor_b.update(&mut cx_b, |editor, cx| {
3309            let rename = editor.pending_rename().unwrap();
3310            let buffer = editor.buffer().read(cx).snapshot(cx);
3311            assert_eq!(
3312                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3313                6..9
3314            );
3315            rename.editor.update(cx, |rename_editor, cx| {
3316                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3317                    rename_buffer.edit([0..3], "THREE", cx);
3318                });
3319            });
3320        });
3321
3322        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3323            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3324        });
3325        fake_language_server
3326            .handle_request::<lsp::request::Rename, _>(|params| {
3327                assert_eq!(
3328                    params.text_document_position.text_document.uri.as_str(),
3329                    "file:///dir/one.rs"
3330                );
3331                assert_eq!(
3332                    params.text_document_position.position,
3333                    lsp::Position::new(0, 6)
3334                );
3335                assert_eq!(params.new_name, "THREE");
3336                Some(lsp::WorkspaceEdit {
3337                    changes: Some(
3338                        [
3339                            (
3340                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3341                                vec![lsp::TextEdit::new(
3342                                    lsp::Range::new(
3343                                        lsp::Position::new(0, 6),
3344                                        lsp::Position::new(0, 9),
3345                                    ),
3346                                    "THREE".to_string(),
3347                                )],
3348                            ),
3349                            (
3350                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3351                                vec![
3352                                    lsp::TextEdit::new(
3353                                        lsp::Range::new(
3354                                            lsp::Position::new(0, 24),
3355                                            lsp::Position::new(0, 27),
3356                                        ),
3357                                        "THREE".to_string(),
3358                                    ),
3359                                    lsp::TextEdit::new(
3360                                        lsp::Range::new(
3361                                            lsp::Position::new(0, 35),
3362                                            lsp::Position::new(0, 38),
3363                                        ),
3364                                        "THREE".to_string(),
3365                                    ),
3366                                ],
3367                            ),
3368                        ]
3369                        .into_iter()
3370                        .collect(),
3371                    ),
3372                    ..Default::default()
3373                })
3374            })
3375            .next()
3376            .await
3377            .unwrap();
3378        confirm_rename.await.unwrap();
3379
3380        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3381            workspace
3382                .active_item(cx)
3383                .unwrap()
3384                .downcast::<Editor>()
3385                .unwrap()
3386        });
3387        rename_editor.update(&mut cx_b, |editor, cx| {
3388            assert_eq!(
3389                editor.text(cx),
3390                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3391            );
3392            editor.undo(&Undo, cx);
3393            assert_eq!(
3394                editor.text(cx),
3395                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3396            );
3397            editor.redo(&Redo, cx);
3398            assert_eq!(
3399                editor.text(cx),
3400                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3401            );
3402        });
3403
3404        // Ensure temporary rename edits cannot be undone/redone.
3405        editor_b.update(&mut cx_b, |editor, cx| {
3406            editor.undo(&Undo, cx);
3407            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3408            editor.undo(&Undo, cx);
3409            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3410            editor.redo(&Redo, cx);
3411            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3412        })
3413    }
3414
3415    #[gpui::test(iterations = 10)]
3416    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3417        cx_a.foreground().forbid_parking();
3418
3419        // Connect to a server as 2 clients.
3420        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3421        let client_a = server.create_client(&mut cx_a, "user_a").await;
3422        let client_b = server.create_client(&mut cx_b, "user_b").await;
3423
3424        // Create an org that includes these 2 users.
3425        let db = &server.app_state.db;
3426        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3427        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3428            .await
3429            .unwrap();
3430        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3431            .await
3432            .unwrap();
3433
3434        // Create a channel that includes all the users.
3435        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3436        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3437            .await
3438            .unwrap();
3439        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3440            .await
3441            .unwrap();
3442        db.create_channel_message(
3443            channel_id,
3444            client_b.current_user_id(&cx_b),
3445            "hello A, it's B.",
3446            OffsetDateTime::now_utc(),
3447            1,
3448        )
3449        .await
3450        .unwrap();
3451
3452        let channels_a = cx_a
3453            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3454        channels_a
3455            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3456            .await;
3457        channels_a.read_with(&cx_a, |list, _| {
3458            assert_eq!(
3459                list.available_channels().unwrap(),
3460                &[ChannelDetails {
3461                    id: channel_id.to_proto(),
3462                    name: "test-channel".to_string()
3463                }]
3464            )
3465        });
3466        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3467            this.get_channel(channel_id.to_proto(), cx).unwrap()
3468        });
3469        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3470        channel_a
3471            .condition(&cx_a, |channel, _| {
3472                channel_messages(channel)
3473                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3474            })
3475            .await;
3476
3477        let channels_b = cx_b
3478            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3479        channels_b
3480            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3481            .await;
3482        channels_b.read_with(&cx_b, |list, _| {
3483            assert_eq!(
3484                list.available_channels().unwrap(),
3485                &[ChannelDetails {
3486                    id: channel_id.to_proto(),
3487                    name: "test-channel".to_string()
3488                }]
3489            )
3490        });
3491
3492        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3493            this.get_channel(channel_id.to_proto(), cx).unwrap()
3494        });
3495        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3496        channel_b
3497            .condition(&cx_b, |channel, _| {
3498                channel_messages(channel)
3499                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3500            })
3501            .await;
3502
3503        channel_a
3504            .update(&mut cx_a, |channel, cx| {
3505                channel
3506                    .send_message("oh, hi B.".to_string(), cx)
3507                    .unwrap()
3508                    .detach();
3509                let task = channel.send_message("sup".to_string(), cx).unwrap();
3510                assert_eq!(
3511                    channel_messages(channel),
3512                    &[
3513                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3514                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3515                        ("user_a".to_string(), "sup".to_string(), true)
3516                    ]
3517                );
3518                task
3519            })
3520            .await
3521            .unwrap();
3522
3523        channel_b
3524            .condition(&cx_b, |channel, _| {
3525                channel_messages(channel)
3526                    == [
3527                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3528                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3529                        ("user_a".to_string(), "sup".to_string(), false),
3530                    ]
3531            })
3532            .await;
3533
3534        assert_eq!(
3535            server
3536                .state()
3537                .await
3538                .channel(channel_id)
3539                .unwrap()
3540                .connection_ids
3541                .len(),
3542            2
3543        );
3544        cx_b.update(|_| drop(channel_b));
3545        server
3546            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3547            .await;
3548
3549        cx_a.update(|_| drop(channel_a));
3550        server
3551            .condition(|state| state.channel(channel_id).is_none())
3552            .await;
3553    }
3554
3555    #[gpui::test(iterations = 10)]
3556    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3557        cx_a.foreground().forbid_parking();
3558
3559        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3560        let client_a = server.create_client(&mut cx_a, "user_a").await;
3561
3562        let db = &server.app_state.db;
3563        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3564        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3565        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3566            .await
3567            .unwrap();
3568        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3569            .await
3570            .unwrap();
3571
3572        let channels_a = cx_a
3573            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3574        channels_a
3575            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3576            .await;
3577        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3578            this.get_channel(channel_id.to_proto(), cx).unwrap()
3579        });
3580
3581        // Messages aren't allowed to be too long.
3582        channel_a
3583            .update(&mut cx_a, |channel, cx| {
3584                let long_body = "this is long.\n".repeat(1024);
3585                channel.send_message(long_body, cx).unwrap()
3586            })
3587            .await
3588            .unwrap_err();
3589
3590        // Messages aren't allowed to be blank.
3591        channel_a.update(&mut cx_a, |channel, cx| {
3592            channel.send_message(String::new(), cx).unwrap_err()
3593        });
3594
3595        // Leading and trailing whitespace are trimmed.
3596        channel_a
3597            .update(&mut cx_a, |channel, cx| {
3598                channel
3599                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3600                    .unwrap()
3601            })
3602            .await
3603            .unwrap();
3604        assert_eq!(
3605            db.get_channel_messages(channel_id, 10, None)
3606                .await
3607                .unwrap()
3608                .iter()
3609                .map(|m| &m.body)
3610                .collect::<Vec<_>>(),
3611            &["surrounded by whitespace"]
3612        );
3613    }
3614
3615    #[gpui::test(iterations = 10)]
3616    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3617        cx_a.foreground().forbid_parking();
3618
3619        // Connect to a server as 2 clients.
3620        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3621        let client_a = server.create_client(&mut cx_a, "user_a").await;
3622        let client_b = server.create_client(&mut cx_b, "user_b").await;
3623        let mut status_b = client_b.status();
3624
3625        // Create an org that includes these 2 users.
3626        let db = &server.app_state.db;
3627        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3628        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3629            .await
3630            .unwrap();
3631        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3632            .await
3633            .unwrap();
3634
3635        // Create a channel that includes all the users.
3636        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3637        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3638            .await
3639            .unwrap();
3640        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3641            .await
3642            .unwrap();
3643        db.create_channel_message(
3644            channel_id,
3645            client_b.current_user_id(&cx_b),
3646            "hello A, it's B.",
3647            OffsetDateTime::now_utc(),
3648            2,
3649        )
3650        .await
3651        .unwrap();
3652
3653        let channels_a = cx_a
3654            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3655        channels_a
3656            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3657            .await;
3658
3659        channels_a.read_with(&cx_a, |list, _| {
3660            assert_eq!(
3661                list.available_channels().unwrap(),
3662                &[ChannelDetails {
3663                    id: channel_id.to_proto(),
3664                    name: "test-channel".to_string()
3665                }]
3666            )
3667        });
3668        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3669            this.get_channel(channel_id.to_proto(), cx).unwrap()
3670        });
3671        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3672        channel_a
3673            .condition(&cx_a, |channel, _| {
3674                channel_messages(channel)
3675                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3676            })
3677            .await;
3678
3679        let channels_b = cx_b
3680            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3681        channels_b
3682            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3683            .await;
3684        channels_b.read_with(&cx_b, |list, _| {
3685            assert_eq!(
3686                list.available_channels().unwrap(),
3687                &[ChannelDetails {
3688                    id: channel_id.to_proto(),
3689                    name: "test-channel".to_string()
3690                }]
3691            )
3692        });
3693
3694        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3695            this.get_channel(channel_id.to_proto(), cx).unwrap()
3696        });
3697        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3698        channel_b
3699            .condition(&cx_b, |channel, _| {
3700                channel_messages(channel)
3701                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3702            })
3703            .await;
3704
3705        // Disconnect client B, ensuring we can still access its cached channel data.
3706        server.forbid_connections();
3707        server.disconnect_client(client_b.current_user_id(&cx_b));
3708        while !matches!(
3709            status_b.next().await,
3710            Some(client::Status::ReconnectionError { .. })
3711        ) {}
3712
3713        channels_b.read_with(&cx_b, |channels, _| {
3714            assert_eq!(
3715                channels.available_channels().unwrap(),
3716                [ChannelDetails {
3717                    id: channel_id.to_proto(),
3718                    name: "test-channel".to_string()
3719                }]
3720            )
3721        });
3722        channel_b.read_with(&cx_b, |channel, _| {
3723            assert_eq!(
3724                channel_messages(channel),
3725                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3726            )
3727        });
3728
3729        // Send a message from client B while it is disconnected.
3730        channel_b
3731            .update(&mut cx_b, |channel, cx| {
3732                let task = channel
3733                    .send_message("can you see this?".to_string(), cx)
3734                    .unwrap();
3735                assert_eq!(
3736                    channel_messages(channel),
3737                    &[
3738                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3739                        ("user_b".to_string(), "can you see this?".to_string(), true)
3740                    ]
3741                );
3742                task
3743            })
3744            .await
3745            .unwrap_err();
3746
3747        // Send a message from client A while B is disconnected.
3748        channel_a
3749            .update(&mut cx_a, |channel, cx| {
3750                channel
3751                    .send_message("oh, hi B.".to_string(), cx)
3752                    .unwrap()
3753                    .detach();
3754                let task = channel.send_message("sup".to_string(), cx).unwrap();
3755                assert_eq!(
3756                    channel_messages(channel),
3757                    &[
3758                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3759                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3760                        ("user_a".to_string(), "sup".to_string(), true)
3761                    ]
3762                );
3763                task
3764            })
3765            .await
3766            .unwrap();
3767
3768        // Give client B a chance to reconnect.
3769        server.allow_connections();
3770        cx_b.foreground().advance_clock(Duration::from_secs(10));
3771
3772        // Verify that B sees the new messages upon reconnection, as well as the message client B
3773        // sent while offline.
3774        channel_b
3775            .condition(&cx_b, |channel, _| {
3776                channel_messages(channel)
3777                    == [
3778                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3779                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3780                        ("user_a".to_string(), "sup".to_string(), false),
3781                        ("user_b".to_string(), "can you see this?".to_string(), false),
3782                    ]
3783            })
3784            .await;
3785
3786        // Ensure client A and B can communicate normally after reconnection.
3787        channel_a
3788            .update(&mut cx_a, |channel, cx| {
3789                channel.send_message("you online?".to_string(), cx).unwrap()
3790            })
3791            .await
3792            .unwrap();
3793        channel_b
3794            .condition(&cx_b, |channel, _| {
3795                channel_messages(channel)
3796                    == [
3797                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3798                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3799                        ("user_a".to_string(), "sup".to_string(), false),
3800                        ("user_b".to_string(), "can you see this?".to_string(), false),
3801                        ("user_a".to_string(), "you online?".to_string(), false),
3802                    ]
3803            })
3804            .await;
3805
3806        channel_b
3807            .update(&mut cx_b, |channel, cx| {
3808                channel.send_message("yep".to_string(), cx).unwrap()
3809            })
3810            .await
3811            .unwrap();
3812        channel_a
3813            .condition(&cx_a, |channel, _| {
3814                channel_messages(channel)
3815                    == [
3816                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3817                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3818                        ("user_a".to_string(), "sup".to_string(), false),
3819                        ("user_b".to_string(), "can you see this?".to_string(), false),
3820                        ("user_a".to_string(), "you online?".to_string(), false),
3821                        ("user_b".to_string(), "yep".to_string(), false),
3822                    ]
3823            })
3824            .await;
3825    }
3826
3827    #[gpui::test(iterations = 10)]
3828    async fn test_contacts(
3829        mut cx_a: TestAppContext,
3830        mut cx_b: TestAppContext,
3831        mut cx_c: TestAppContext,
3832    ) {
3833        cx_a.foreground().forbid_parking();
3834        let lang_registry = Arc::new(LanguageRegistry::new());
3835        let fs = FakeFs::new(cx_a.background());
3836
3837        // Connect to a server as 3 clients.
3838        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3839        let client_a = server.create_client(&mut cx_a, "user_a").await;
3840        let client_b = server.create_client(&mut cx_b, "user_b").await;
3841        let client_c = server.create_client(&mut cx_c, "user_c").await;
3842
3843        // Share a worktree as client A.
3844        fs.insert_tree(
3845            "/a",
3846            json!({
3847                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3848            }),
3849        )
3850        .await;
3851
3852        let project_a = cx_a.update(|cx| {
3853            Project::local(
3854                client_a.clone(),
3855                client_a.user_store.clone(),
3856                lang_registry.clone(),
3857                fs.clone(),
3858                cx,
3859            )
3860        });
3861        let (worktree_a, _) = project_a
3862            .update(&mut cx_a, |p, cx| {
3863                p.find_or_create_local_worktree("/a", false, cx)
3864            })
3865            .await
3866            .unwrap();
3867        worktree_a
3868            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3869            .await;
3870
3871        client_a
3872            .user_store
3873            .condition(&cx_a, |user_store, _| {
3874                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3875            })
3876            .await;
3877        client_b
3878            .user_store
3879            .condition(&cx_b, |user_store, _| {
3880                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3881            })
3882            .await;
3883        client_c
3884            .user_store
3885            .condition(&cx_c, |user_store, _| {
3886                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3887            })
3888            .await;
3889
3890        let project_id = project_a
3891            .update(&mut cx_a, |project, _| project.next_remote_id())
3892            .await;
3893        project_a
3894            .update(&mut cx_a, |project, cx| project.share(cx))
3895            .await
3896            .unwrap();
3897
3898        let _project_b = Project::remote(
3899            project_id,
3900            client_b.clone(),
3901            client_b.user_store.clone(),
3902            lang_registry.clone(),
3903            fs.clone(),
3904            &mut cx_b.to_async(),
3905        )
3906        .await
3907        .unwrap();
3908
3909        client_a
3910            .user_store
3911            .condition(&cx_a, |user_store, _| {
3912                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3913            })
3914            .await;
3915        client_b
3916            .user_store
3917            .condition(&cx_b, |user_store, _| {
3918                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3919            })
3920            .await;
3921        client_c
3922            .user_store
3923            .condition(&cx_c, |user_store, _| {
3924                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3925            })
3926            .await;
3927
3928        project_a
3929            .condition(&cx_a, |project, _| {
3930                project.collaborators().contains_key(&client_b.peer_id)
3931            })
3932            .await;
3933
3934        cx_a.update(move |_| drop(project_a));
3935        client_a
3936            .user_store
3937            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3938            .await;
3939        client_b
3940            .user_store
3941            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3942            .await;
3943        client_c
3944            .user_store
3945            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3946            .await;
3947
3948        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3949            user_store
3950                .contacts()
3951                .iter()
3952                .map(|contact| {
3953                    let worktrees = contact
3954                        .projects
3955                        .iter()
3956                        .map(|p| {
3957                            (
3958                                p.worktree_root_names[0].as_str(),
3959                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3960                            )
3961                        })
3962                        .collect();
3963                    (contact.user.github_login.as_str(), worktrees)
3964                })
3965                .collect()
3966        }
3967    }
3968
3969    #[gpui::test(iterations = 100)]
3970    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
3971        cx.foreground().forbid_parking();
3972        let max_peers = env::var("MAX_PEERS")
3973            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3974            .unwrap_or(5);
3975        let max_operations = env::var("OPERATIONS")
3976            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3977            .unwrap_or(10);
3978
3979        let rng = Arc::new(Mutex::new(rng));
3980
3981        let guest_lang_registry = Arc::new(LanguageRegistry::new());
3982        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
3983
3984        let fs = FakeFs::new(cx.background());
3985        fs.insert_tree(
3986            "/_collab",
3987            json!({
3988                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3989            }),
3990        )
3991        .await;
3992
3993        let operations = Rc::new(Cell::new(0));
3994        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
3995        let mut clients = Vec::new();
3996
3997        let mut next_entity_id = 100000;
3998        let mut host_cx = TestAppContext::new(
3999            cx.foreground_platform(),
4000            cx.platform(),
4001            cx.foreground(),
4002            cx.background(),
4003            cx.font_cache(),
4004            next_entity_id,
4005        );
4006        let host = server.create_client(&mut host_cx, "host").await;
4007        let host_project = host_cx.update(|cx| {
4008            Project::local(
4009                host.client.clone(),
4010                host.user_store.clone(),
4011                Arc::new(LanguageRegistry::new()),
4012                fs.clone(),
4013                cx,
4014            )
4015        });
4016        let host_project_id = host_project
4017            .update(&mut host_cx, |p, _| p.next_remote_id())
4018            .await;
4019
4020        let (collab_worktree, _) = host_project
4021            .update(&mut host_cx, |project, cx| {
4022                project.find_or_create_local_worktree("/_collab", false, cx)
4023            })
4024            .await
4025            .unwrap();
4026        collab_worktree
4027            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4028            .await;
4029        host_project
4030            .update(&mut host_cx, |project, cx| project.share(cx))
4031            .await
4032            .unwrap();
4033
4034        clients.push(cx.foreground().spawn(host.simulate_host(
4035            host_project.clone(),
4036            language_server_config,
4037            operations.clone(),
4038            max_operations,
4039            rng.clone(),
4040            host_cx.clone(),
4041        )));
4042
4043        while operations.get() < max_operations {
4044            cx.background().simulate_random_delay().await;
4045            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4046                operations.set(operations.get() + 1);
4047
4048                let guest_id = clients.len();
4049                log::info!("Adding guest {}", guest_id);
4050                next_entity_id += 100000;
4051                let mut guest_cx = TestAppContext::new(
4052                    cx.foreground_platform(),
4053                    cx.platform(),
4054                    cx.foreground(),
4055                    cx.background(),
4056                    cx.font_cache(),
4057                    next_entity_id,
4058                );
4059                let guest = server
4060                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4061                    .await;
4062                let guest_project = Project::remote(
4063                    host_project_id,
4064                    guest.client.clone(),
4065                    guest.user_store.clone(),
4066                    guest_lang_registry.clone(),
4067                    fs.clone(),
4068                    &mut guest_cx.to_async(),
4069                )
4070                .await
4071                .unwrap();
4072                clients.push(cx.foreground().spawn(guest.simulate_guest(
4073                    guest_id,
4074                    guest_project,
4075                    operations.clone(),
4076                    max_operations,
4077                    rng.clone(),
4078                    guest_cx,
4079                )));
4080
4081                log::info!("Guest {} added", guest_id);
4082            }
4083        }
4084
4085        let clients = futures::future::join_all(clients).await;
4086        cx.foreground().run_until_parked();
4087
4088        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4089            project
4090                .worktrees(cx)
4091                .map(|worktree| {
4092                    let snapshot = worktree.read(cx).snapshot();
4093                    (snapshot.id(), snapshot)
4094                })
4095                .collect::<BTreeMap<_, _>>()
4096        });
4097
4098        for (guest_client, guest_cx) in clients.iter().skip(1) {
4099            let guest_id = guest_client.client.id();
4100            let worktree_snapshots =
4101                guest_client
4102                    .project
4103                    .as_ref()
4104                    .unwrap()
4105                    .read_with(guest_cx, |project, cx| {
4106                        project
4107                            .worktrees(cx)
4108                            .map(|worktree| {
4109                                let worktree = worktree.read(cx);
4110                                assert!(
4111                                    !worktree.as_remote().unwrap().has_pending_updates(),
4112                                    "Guest {} worktree {:?} contains deferred updates",
4113                                    guest_id,
4114                                    worktree.id()
4115                                );
4116                                (worktree.id(), worktree.snapshot())
4117                            })
4118                            .collect::<BTreeMap<_, _>>()
4119                    });
4120
4121            assert_eq!(
4122                worktree_snapshots.keys().collect::<Vec<_>>(),
4123                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4124                "guest {} has different worktrees than the host",
4125                guest_id
4126            );
4127            for (id, host_snapshot) in &host_worktree_snapshots {
4128                let guest_snapshot = &worktree_snapshots[id];
4129                assert_eq!(
4130                    guest_snapshot.root_name(),
4131                    host_snapshot.root_name(),
4132                    "guest {} has different root name than the host for worktree {}",
4133                    guest_id,
4134                    id
4135                );
4136                assert_eq!(
4137                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4138                    host_snapshot.entries(false).collect::<Vec<_>>(),
4139                    "guest {} has different snapshot than the host for worktree {}",
4140                    guest_id,
4141                    id
4142                );
4143            }
4144
4145            guest_client
4146                .project
4147                .as_ref()
4148                .unwrap()
4149                .read_with(guest_cx, |project, _| {
4150                    assert!(
4151                        !project.has_buffered_operations(),
4152                        "guest {} has buffered operations ",
4153                        guest_id,
4154                    );
4155                });
4156
4157            for guest_buffer in &guest_client.buffers {
4158                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4159                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4160                    project
4161                        .shared_buffer(guest_client.peer_id, buffer_id)
4162                        .expect(&format!(
4163                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4164                            guest_id, guest_client.peer_id, buffer_id
4165                        ))
4166                });
4167                assert_eq!(
4168                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4169                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4170                    "guest {} buffer {} differs from the host's buffer",
4171                    guest_id,
4172                    buffer_id,
4173                );
4174            }
4175        }
4176    }
4177
4178    struct TestServer {
4179        peer: Arc<Peer>,
4180        app_state: Arc<AppState>,
4181        server: Arc<Server>,
4182        foreground: Rc<executor::Foreground>,
4183        notifications: mpsc::UnboundedReceiver<()>,
4184        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4185        forbid_connections: Arc<AtomicBool>,
4186        _test_db: TestDb,
4187    }
4188
4189    impl TestServer {
4190        async fn start(
4191            foreground: Rc<executor::Foreground>,
4192            background: Arc<executor::Background>,
4193        ) -> Self {
4194            let test_db = TestDb::fake(background);
4195            let app_state = Self::build_app_state(&test_db).await;
4196            let peer = Peer::new();
4197            let notifications = mpsc::unbounded();
4198            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4199            Self {
4200                peer,
4201                app_state,
4202                server,
4203                foreground,
4204                notifications: notifications.1,
4205                connection_killers: Default::default(),
4206                forbid_connections: Default::default(),
4207                _test_db: test_db,
4208            }
4209        }
4210
4211        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4212            let http = FakeHttpClient::with_404_response();
4213            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4214            let client_name = name.to_string();
4215            let mut client = Client::new(http.clone());
4216            let server = self.server.clone();
4217            let connection_killers = self.connection_killers.clone();
4218            let forbid_connections = self.forbid_connections.clone();
4219            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4220
4221            Arc::get_mut(&mut client)
4222                .unwrap()
4223                .override_authenticate(move |cx| {
4224                    cx.spawn(|_| async move {
4225                        let access_token = "the-token".to_string();
4226                        Ok(Credentials {
4227                            user_id: user_id.0 as u64,
4228                            access_token,
4229                        })
4230                    })
4231                })
4232                .override_establish_connection(move |credentials, cx| {
4233                    assert_eq!(credentials.user_id, user_id.0 as u64);
4234                    assert_eq!(credentials.access_token, "the-token");
4235
4236                    let server = server.clone();
4237                    let connection_killers = connection_killers.clone();
4238                    let forbid_connections = forbid_connections.clone();
4239                    let client_name = client_name.clone();
4240                    let connection_id_tx = connection_id_tx.clone();
4241                    cx.spawn(move |cx| async move {
4242                        if forbid_connections.load(SeqCst) {
4243                            Err(EstablishConnectionError::other(anyhow!(
4244                                "server is forbidding connections"
4245                            )))
4246                        } else {
4247                            let (client_conn, server_conn, kill_conn) =
4248                                Connection::in_memory(cx.background());
4249                            connection_killers.lock().insert(user_id, kill_conn);
4250                            cx.background()
4251                                .spawn(server.handle_connection(
4252                                    server_conn,
4253                                    client_name,
4254                                    user_id,
4255                                    Some(connection_id_tx),
4256                                    cx.background(),
4257                                ))
4258                                .detach();
4259                            Ok(client_conn)
4260                        }
4261                    })
4262                });
4263
4264            client
4265                .authenticate_and_connect(&cx.to_async())
4266                .await
4267                .unwrap();
4268
4269            Channel::init(&client);
4270            Project::init(&client);
4271
4272            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4273            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4274            let mut authed_user =
4275                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4276            while authed_user.next().await.unwrap().is_none() {}
4277
4278            TestClient {
4279                client,
4280                peer_id,
4281                user_store,
4282                project: Default::default(),
4283                buffers: Default::default(),
4284            }
4285        }
4286
4287        fn disconnect_client(&self, user_id: UserId) {
4288            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4289                let _ = kill_conn.try_send(Some(()));
4290            }
4291        }
4292
4293        fn forbid_connections(&self) {
4294            self.forbid_connections.store(true, SeqCst);
4295        }
4296
4297        fn allow_connections(&self) {
4298            self.forbid_connections.store(false, SeqCst);
4299        }
4300
4301        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4302            let mut config = Config::default();
4303            config.session_secret = "a".repeat(32);
4304            config.database_url = test_db.url.clone();
4305            let github_client = github::AppClient::test();
4306            Arc::new(AppState {
4307                db: test_db.db().clone(),
4308                handlebars: Default::default(),
4309                auth_client: auth::build_client("", ""),
4310                repo_client: github::RepoClient::test(&github_client),
4311                github_client,
4312                config,
4313            })
4314        }
4315
4316        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4317            self.server.store.read()
4318        }
4319
4320        async fn condition<F>(&mut self, mut predicate: F)
4321        where
4322            F: FnMut(&Store) -> bool,
4323        {
4324            async_std::future::timeout(Duration::from_millis(500), async {
4325                while !(predicate)(&*self.server.store.read()) {
4326                    self.foreground.start_waiting();
4327                    self.notifications.next().await;
4328                    self.foreground.finish_waiting();
4329                }
4330            })
4331            .await
4332            .expect("condition timed out");
4333        }
4334    }
4335
4336    impl Drop for TestServer {
4337        fn drop(&mut self) {
4338            self.peer.reset();
4339        }
4340    }
4341
4342    struct TestClient {
4343        client: Arc<Client>,
4344        pub peer_id: PeerId,
4345        pub user_store: ModelHandle<UserStore>,
4346        project: Option<ModelHandle<Project>>,
4347        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4348    }
4349
4350    impl Deref for TestClient {
4351        type Target = Arc<Client>;
4352
4353        fn deref(&self) -> &Self::Target {
4354            &self.client
4355        }
4356    }
4357
4358    impl TestClient {
4359        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4360            UserId::from_proto(
4361                self.user_store
4362                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4363            )
4364        }
4365
4366        fn simulate_host(
4367            mut self,
4368            project: ModelHandle<Project>,
4369            mut language_server_config: LanguageServerConfig,
4370            operations: Rc<Cell<usize>>,
4371            max_operations: usize,
4372            rng: Arc<Mutex<StdRng>>,
4373            mut cx: TestAppContext,
4374        ) -> impl Future<Output = (Self, TestAppContext)> {
4375            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4376
4377            // Set up a fake language server.
4378            language_server_config.set_fake_initializer({
4379                let rng = rng.clone();
4380                let files = files.clone();
4381                move |fake_server| {
4382                    fake_server.handle_request::<lsp::request::Completion, _>(|_| {
4383                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4384                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4385                                range: lsp::Range::new(
4386                                    lsp::Position::new(0, 0),
4387                                    lsp::Position::new(0, 0),
4388                                ),
4389                                new_text: "the-new-text".to_string(),
4390                            })),
4391                            ..Default::default()
4392                        }]))
4393                    });
4394
4395                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
4396                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4397                            lsp::CodeAction {
4398                                title: "the-code-action".to_string(),
4399                                ..Default::default()
4400                            },
4401                        )])
4402                    });
4403
4404                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
4405                        Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4406                            params.position,
4407                            params.position,
4408                        )))
4409                    });
4410
4411                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4412                        let files = files.clone();
4413                        let rng = rng.clone();
4414                        move |_| {
4415                            let files = files.lock();
4416                            let mut rng = rng.lock();
4417                            let count = rng.gen_range::<usize, _>(1..3);
4418                            Some(lsp::GotoDefinitionResponse::Array(
4419                                (0..count)
4420                                    .map(|_| {
4421                                        let file = files.choose(&mut *rng).unwrap().as_path();
4422                                        lsp::Location {
4423                                            uri: lsp::Url::from_file_path(file).unwrap(),
4424                                            range: Default::default(),
4425                                        }
4426                                    })
4427                                    .collect(),
4428                            ))
4429                        }
4430                    });
4431                }
4432            });
4433
4434            project.update(&mut cx, |project, _| {
4435                project.languages().add(Arc::new(Language::new(
4436                    LanguageConfig {
4437                        name: "Rust".into(),
4438                        path_suffixes: vec!["rs".to_string()],
4439                        language_server: Some(language_server_config),
4440                        ..Default::default()
4441                    },
4442                    None,
4443                )));
4444            });
4445
4446            async move {
4447                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4448                while operations.get() < max_operations {
4449                    operations.set(operations.get() + 1);
4450
4451                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4452                    match distribution {
4453                        0..=20 if !files.lock().is_empty() => {
4454                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4455                            let mut path = path.as_path();
4456                            while let Some(parent_path) = path.parent() {
4457                                path = parent_path;
4458                                if rng.lock().gen() {
4459                                    break;
4460                                }
4461                            }
4462
4463                            log::info!("Host: find/create local worktree {:?}", path);
4464                            project
4465                                .update(&mut cx, |project, cx| {
4466                                    project.find_or_create_local_worktree(path, false, cx)
4467                                })
4468                                .await
4469                                .unwrap();
4470                        }
4471                        10..=80 if !files.lock().is_empty() => {
4472                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4473                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4474                                let (worktree, path) = project
4475                                    .update(&mut cx, |project, cx| {
4476                                        project.find_or_create_local_worktree(file, false, cx)
4477                                    })
4478                                    .await
4479                                    .unwrap();
4480                                let project_path =
4481                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4482                                log::info!("Host: opening path {:?}", project_path);
4483                                let buffer = project
4484                                    .update(&mut cx, |project, cx| {
4485                                        project.open_buffer(project_path, cx)
4486                                    })
4487                                    .await
4488                                    .unwrap();
4489                                self.buffers.insert(buffer.clone());
4490                                buffer
4491                            } else {
4492                                self.buffers
4493                                    .iter()
4494                                    .choose(&mut *rng.lock())
4495                                    .unwrap()
4496                                    .clone()
4497                            };
4498
4499                            if rng.lock().gen_bool(0.1) {
4500                                cx.update(|cx| {
4501                                    log::info!(
4502                                        "Host: dropping buffer {:?}",
4503                                        buffer.read(cx).file().unwrap().full_path(cx)
4504                                    );
4505                                    self.buffers.remove(&buffer);
4506                                    drop(buffer);
4507                                });
4508                            } else {
4509                                buffer.update(&mut cx, |buffer, cx| {
4510                                    log::info!(
4511                                        "Host: updating buffer {:?}",
4512                                        buffer.file().unwrap().full_path(cx)
4513                                    );
4514                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4515                                });
4516                            }
4517                        }
4518                        _ => loop {
4519                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4520                            let mut path = PathBuf::new();
4521                            path.push("/");
4522                            for _ in 0..path_component_count {
4523                                let letter = rng.lock().gen_range(b'a'..=b'z');
4524                                path.push(std::str::from_utf8(&[letter]).unwrap());
4525                            }
4526                            path.set_extension("rs");
4527                            let parent_path = path.parent().unwrap();
4528
4529                            log::info!("Host: creating file {:?}", path,);
4530
4531                            if fs.create_dir(&parent_path).await.is_ok()
4532                                && fs.create_file(&path, Default::default()).await.is_ok()
4533                            {
4534                                files.lock().push(path);
4535                                break;
4536                            } else {
4537                                log::info!("Host: cannot create file");
4538                            }
4539                        },
4540                    }
4541
4542                    cx.background().simulate_random_delay().await;
4543                }
4544
4545                self.project = Some(project);
4546                (self, cx)
4547            }
4548        }
4549
4550        pub async fn simulate_guest(
4551            mut self,
4552            guest_id: usize,
4553            project: ModelHandle<Project>,
4554            operations: Rc<Cell<usize>>,
4555            max_operations: usize,
4556            rng: Arc<Mutex<StdRng>>,
4557            mut cx: TestAppContext,
4558        ) -> (Self, TestAppContext) {
4559            while operations.get() < max_operations {
4560                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4561                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4562                        project
4563                            .worktrees(&cx)
4564                            .filter(|worktree| {
4565                                worktree.read(cx).entries(false).any(|e| e.is_file())
4566                            })
4567                            .choose(&mut *rng.lock())
4568                    }) {
4569                        worktree
4570                    } else {
4571                        cx.background().simulate_random_delay().await;
4572                        continue;
4573                    };
4574
4575                    operations.set(operations.get() + 1);
4576                    let project_path = worktree.read_with(&cx, |worktree, _| {
4577                        let entry = worktree
4578                            .entries(false)
4579                            .filter(|e| e.is_file())
4580                            .choose(&mut *rng.lock())
4581                            .unwrap();
4582                        (worktree.id(), entry.path.clone())
4583                    });
4584                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4585                    let buffer = project
4586                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4587                        .await
4588                        .unwrap();
4589                    self.buffers.insert(buffer.clone());
4590                    buffer
4591                } else {
4592                    operations.set(operations.get() + 1);
4593
4594                    self.buffers
4595                        .iter()
4596                        .choose(&mut *rng.lock())
4597                        .unwrap()
4598                        .clone()
4599                };
4600
4601                let choice = rng.lock().gen_range(0..100);
4602                match choice {
4603                    0..=9 => {
4604                        cx.update(|cx| {
4605                            log::info!(
4606                                "Guest {}: dropping buffer {:?}",
4607                                guest_id,
4608                                buffer.read(cx).file().unwrap().full_path(cx)
4609                            );
4610                            self.buffers.remove(&buffer);
4611                            drop(buffer);
4612                        });
4613                    }
4614                    10..=19 => {
4615                        let completions = project.update(&mut cx, |project, cx| {
4616                            log::info!(
4617                                "Guest {}: requesting completions for buffer {:?}",
4618                                guest_id,
4619                                buffer.read(cx).file().unwrap().full_path(cx)
4620                            );
4621                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4622                            project.completions(&buffer, offset, cx)
4623                        });
4624                        let completions = cx.background().spawn(async move {
4625                            completions.await.expect("completions request failed");
4626                        });
4627                        if rng.lock().gen_bool(0.3) {
4628                            log::info!("Guest {}: detaching completions request", guest_id);
4629                            completions.detach();
4630                        } else {
4631                            completions.await;
4632                        }
4633                    }
4634                    20..=29 => {
4635                        let code_actions = project.update(&mut cx, |project, cx| {
4636                            log::info!(
4637                                "Guest {}: requesting code actions for buffer {:?}",
4638                                guest_id,
4639                                buffer.read(cx).file().unwrap().full_path(cx)
4640                            );
4641                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4642                            project.code_actions(&buffer, range, cx)
4643                        });
4644                        let code_actions = cx.background().spawn(async move {
4645                            code_actions.await.expect("code actions request failed");
4646                        });
4647                        if rng.lock().gen_bool(0.3) {
4648                            log::info!("Guest {}: detaching code actions request", guest_id);
4649                            code_actions.detach();
4650                        } else {
4651                            code_actions.await;
4652                        }
4653                    }
4654                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4655                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4656                            log::info!(
4657                                "Guest {}: saving buffer {:?}",
4658                                guest_id,
4659                                buffer.file().unwrap().full_path(cx)
4660                            );
4661                            (buffer.version(), buffer.save(cx))
4662                        });
4663                        let save = cx.spawn(|cx| async move {
4664                            let (saved_version, _) = save.await.expect("save request failed");
4665                            buffer.read_with(&cx, |buffer, _| {
4666                                assert!(buffer.version().observed_all(&saved_version));
4667                                assert!(saved_version.observed_all(&requested_version));
4668                            });
4669                        });
4670                        if rng.lock().gen_bool(0.3) {
4671                            log::info!("Guest {}: detaching save request", guest_id);
4672                            save.detach();
4673                        } else {
4674                            save.await;
4675                        }
4676                    }
4677                    40..=45 => {
4678                        let prepare_rename = project.update(&mut cx, |project, cx| {
4679                            log::info!(
4680                                "Guest {}: preparing rename for buffer {:?}",
4681                                guest_id,
4682                                buffer.read(cx).file().unwrap().full_path(cx)
4683                            );
4684                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4685                            project.prepare_rename(buffer, offset, cx)
4686                        });
4687                        let prepare_rename = cx.background().spawn(async move {
4688                            prepare_rename.await.expect("prepare rename request failed");
4689                        });
4690                        if rng.lock().gen_bool(0.3) {
4691                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4692                            prepare_rename.detach();
4693                        } else {
4694                            prepare_rename.await;
4695                        }
4696                    }
4697                    46..=49 => {
4698                        let definitions = project.update(&mut cx, |project, cx| {
4699                            log::info!(
4700                                "Guest {}: requesting defintions for buffer {:?}",
4701                                guest_id,
4702                                buffer.read(cx).file().unwrap().full_path(cx)
4703                            );
4704                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4705                            project.definition(&buffer, offset, cx)
4706                        });
4707                        let definitions = cx.background().spawn(async move {
4708                            definitions.await.expect("definitions request failed");
4709                        });
4710                        if rng.lock().gen_bool(0.3) {
4711                            log::info!("Guest {}: detaching definitions request", guest_id);
4712                            definitions.detach();
4713                        } else {
4714                            definitions.await;
4715                        }
4716                    }
4717                    _ => {
4718                        buffer.update(&mut cx, |buffer, cx| {
4719                            log::info!(
4720                                "Guest {}: updating buffer {:?}",
4721                                guest_id,
4722                                buffer.file().unwrap().full_path(cx)
4723                            );
4724                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4725                        });
4726                    }
4727                }
4728                cx.background().simulate_random_delay().await;
4729            }
4730
4731            self.project = Some(project);
4732            (self, cx)
4733        }
4734    }
4735
4736    impl Executor for Arc<gpui::executor::Background> {
4737        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4738            self.spawn(future).detach();
4739        }
4740    }
4741
4742    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4743        channel
4744            .messages()
4745            .cursor::<()>()
4746            .map(|m| {
4747                (
4748                    m.sender.github_login.clone(),
4749                    m.body.clone(),
4750                    m.is_pending(),
4751                )
4752            })
4753            .collect()
4754    }
4755
4756    struct EmptyView;
4757
4758    impl gpui::Entity for EmptyView {
4759        type Event = ();
4760    }
4761
4762    impl gpui::View for EmptyView {
4763        fn ui_name() -> &'static str {
4764            "empty view"
4765        }
4766
4767        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4768            gpui::Element::boxed(gpui::elements::Empty)
4769        }
4770    }
4771}