rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::share_worktree)
  77            .add_request_handler(Server::update_worktree)
  78            .add_message_handler(Server::update_diagnostic_summary)
  79            .add_message_handler(Server::disk_based_diagnostics_updating)
  80            .add_message_handler(Server::disk_based_diagnostics_updated)
  81            .add_request_handler(Server::get_definition)
  82            .add_request_handler(Server::get_project_symbols)
  83            .add_request_handler(Server::open_buffer_for_symbol)
  84            .add_request_handler(Server::open_buffer)
  85            .add_message_handler(Server::close_buffer)
  86            .add_request_handler(Server::update_buffer)
  87            .add_message_handler(Server::update_buffer_file)
  88            .add_message_handler(Server::buffer_reloaded)
  89            .add_message_handler(Server::buffer_saved)
  90            .add_request_handler(Server::save_buffer)
  91            .add_request_handler(Server::format_buffers)
  92            .add_request_handler(Server::get_completions)
  93            .add_request_handler(Server::apply_additional_edits_for_completion)
  94            .add_request_handler(Server::get_code_actions)
  95            .add_request_handler(Server::apply_code_action)
  96            .add_request_handler(Server::prepare_rename)
  97            .add_request_handler(Server::perform_rename)
  98            .add_request_handler(Server::get_channels)
  99            .add_request_handler(Server::get_users)
 100            .add_request_handler(Server::join_channel)
 101            .add_message_handler(Server::leave_channel)
 102            .add_request_handler(Server::send_channel_message)
 103            .add_request_handler(Server::get_channel_messages);
 104
 105        Arc::new(server)
 106    }
 107
 108    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 109    where
 110        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 111        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 112        M: EnvelopedMessage,
 113    {
 114        let prev_handler = self.handlers.insert(
 115            TypeId::of::<M>(),
 116            Box::new(move |server, envelope| {
 117                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 118                (handler)(server, *envelope).boxed()
 119            }),
 120        );
 121        if prev_handler.is_some() {
 122            panic!("registered a handler for the same message twice");
 123        }
 124        self
 125    }
 126
 127    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 128    where
 129        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 130        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 131        M: RequestMessage,
 132    {
 133        self.add_message_handler(move |server, envelope| {
 134            let receipt = envelope.receipt();
 135            let response = (handler)(server.clone(), envelope);
 136            async move {
 137                match response.await {
 138                    Ok(response) => {
 139                        server.peer.respond(receipt, response)?;
 140                        Ok(())
 141                    }
 142                    Err(error) => {
 143                        server.peer.respond_with_error(
 144                            receipt,
 145                            proto::Error {
 146                                message: error.to_string(),
 147                            },
 148                        )?;
 149                        Err(error)
 150                    }
 151                }
 152            }
 153        })
 154    }
 155
 156    pub fn handle_connection<E: Executor>(
 157        self: &Arc<Self>,
 158        connection: Connection,
 159        addr: String,
 160        user_id: UserId,
 161        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 162        executor: E,
 163    ) -> impl Future<Output = ()> {
 164        let mut this = self.clone();
 165        async move {
 166            let (connection_id, handle_io, mut incoming_rx) =
 167                this.peer.add_connection(connection).await;
 168
 169            if let Some(send_connection_id) = send_connection_id.as_mut() {
 170                let _ = send_connection_id.send(connection_id).await;
 171            }
 172
 173            this.state_mut().add_connection(connection_id, user_id);
 174            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 175                log::error!("error updating contacts for {:?}: {}", user_id, err);
 176            }
 177
 178            let handle_io = handle_io.fuse();
 179            futures::pin_mut!(handle_io);
 180            loop {
 181                let next_message = incoming_rx.next().fuse();
 182                futures::pin_mut!(next_message);
 183                futures::select_biased! {
 184                    result = handle_io => {
 185                        if let Err(err) = result {
 186                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 187                        }
 188                        break;
 189                    }
 190                    message = next_message => {
 191                        if let Some(message) = message {
 192                            let start_time = Instant::now();
 193                            let type_name = message.payload_type_name();
 194                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 195                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 196                                let notifications = this.notifications.clone();
 197                                let is_background = message.is_background();
 198                                let handle_message = (handler)(this.clone(), message);
 199                                let handle_message = async move {
 200                                    if let Err(err) = handle_message.await {
 201                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 202                                    } else {
 203                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 204                                    }
 205                                    if let Some(mut notifications) = notifications {
 206                                        let _ = notifications.send(()).await;
 207                                    }
 208                                };
 209                                if is_background {
 210                                    executor.spawn_detached(handle_message);
 211                                } else {
 212                                    handle_message.await;
 213                                }
 214                            } else {
 215                                log::warn!("unhandled message: {}", type_name);
 216                            }
 217                        } else {
 218                            log::info!("rpc connection closed {:?}", addr);
 219                            break;
 220                        }
 221                    }
 222                }
 223            }
 224
 225            if let Err(err) = this.sign_out(connection_id).await {
 226                log::error!("error signing out connection {:?} - {:?}", addr, err);
 227            }
 228        }
 229    }
 230
 231    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 232        self.peer.disconnect(connection_id);
 233        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 234
 235        for (project_id, project) in removed_connection.hosted_projects {
 236            if let Some(share) = project.share {
 237                broadcast(
 238                    connection_id,
 239                    share.guests.keys().copied().collect(),
 240                    |conn_id| {
 241                        self.peer
 242                            .send(conn_id, proto::UnshareProject { project_id })
 243                    },
 244                )?;
 245            }
 246        }
 247
 248        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 249            broadcast(connection_id, peer_ids, |conn_id| {
 250                self.peer.send(
 251                    conn_id,
 252                    proto::RemoveProjectCollaborator {
 253                        project_id,
 254                        peer_id: connection_id.0,
 255                    },
 256                )
 257            })?;
 258        }
 259
 260        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 261        Ok(())
 262    }
 263
 264    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 265        Ok(proto::Ack {})
 266    }
 267
 268    async fn register_project(
 269        mut self: Arc<Server>,
 270        request: TypedEnvelope<proto::RegisterProject>,
 271    ) -> tide::Result<proto::RegisterProjectResponse> {
 272        let project_id = {
 273            let mut state = self.state_mut();
 274            let user_id = state.user_id_for_connection(request.sender_id)?;
 275            state.register_project(request.sender_id, user_id)
 276        };
 277        Ok(proto::RegisterProjectResponse { project_id })
 278    }
 279
 280    async fn unregister_project(
 281        mut self: Arc<Server>,
 282        request: TypedEnvelope<proto::UnregisterProject>,
 283    ) -> tide::Result<()> {
 284        let project = self
 285            .state_mut()
 286            .unregister_project(request.payload.project_id, request.sender_id)?;
 287        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 288        Ok(())
 289    }
 290
 291    async fn share_project(
 292        mut self: Arc<Server>,
 293        request: TypedEnvelope<proto::ShareProject>,
 294    ) -> tide::Result<proto::Ack> {
 295        self.state_mut()
 296            .share_project(request.payload.project_id, request.sender_id);
 297        Ok(proto::Ack {})
 298    }
 299
 300    async fn unshare_project(
 301        mut self: Arc<Server>,
 302        request: TypedEnvelope<proto::UnshareProject>,
 303    ) -> tide::Result<()> {
 304        let project_id = request.payload.project_id;
 305        let project = self
 306            .state_mut()
 307            .unshare_project(project_id, request.sender_id)?;
 308
 309        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 310            self.peer
 311                .send(conn_id, proto::UnshareProject { project_id })
 312        })?;
 313        self.update_contacts_for_users(&project.authorized_user_ids)?;
 314        Ok(())
 315    }
 316
 317    async fn join_project(
 318        mut self: Arc<Server>,
 319        request: TypedEnvelope<proto::JoinProject>,
 320    ) -> tide::Result<proto::JoinProjectResponse> {
 321        let project_id = request.payload.project_id;
 322
 323        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 324        let (response, connection_ids, contact_user_ids) = self
 325            .state_mut()
 326            .join_project(request.sender_id, user_id, project_id)
 327            .and_then(|joined| {
 328                let share = joined.project.share()?;
 329                let peer_count = share.guests.len();
 330                let mut collaborators = Vec::with_capacity(peer_count);
 331                collaborators.push(proto::Collaborator {
 332                    peer_id: joined.project.host_connection_id.0,
 333                    replica_id: 0,
 334                    user_id: joined.project.host_user_id.to_proto(),
 335                });
 336                let worktrees = joined
 337                    .project
 338                    .worktrees
 339                    .iter()
 340                    .filter_map(|(id, worktree)| {
 341                        worktree.share.as_ref().map(|share| proto::Worktree {
 342                            id: *id,
 343                            root_name: worktree.root_name.clone(),
 344                            entries: share.entries.values().cloned().collect(),
 345                            diagnostic_summaries: share
 346                                .diagnostic_summaries
 347                                .values()
 348                                .cloned()
 349                                .collect(),
 350                            weak: worktree.weak,
 351                            next_update_id: share.next_update_id as u64,
 352                        })
 353                    })
 354                    .collect();
 355                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 356                    if *peer_conn_id != request.sender_id {
 357                        collaborators.push(proto::Collaborator {
 358                            peer_id: peer_conn_id.0,
 359                            replica_id: *peer_replica_id as u32,
 360                            user_id: peer_user_id.to_proto(),
 361                        });
 362                    }
 363                }
 364                let response = proto::JoinProjectResponse {
 365                    worktrees,
 366                    replica_id: joined.replica_id as u32,
 367                    collaborators,
 368                };
 369                let connection_ids = joined.project.connection_ids();
 370                let contact_user_ids = joined.project.authorized_user_ids();
 371                Ok((response, connection_ids, contact_user_ids))
 372            })?;
 373
 374        broadcast(request.sender_id, connection_ids, |conn_id| {
 375            self.peer.send(
 376                conn_id,
 377                proto::AddProjectCollaborator {
 378                    project_id,
 379                    collaborator: Some(proto::Collaborator {
 380                        peer_id: request.sender_id.0,
 381                        replica_id: response.replica_id,
 382                        user_id: user_id.to_proto(),
 383                    }),
 384                },
 385            )
 386        })?;
 387        self.update_contacts_for_users(&contact_user_ids)?;
 388        Ok(response)
 389    }
 390
 391    async fn leave_project(
 392        mut self: Arc<Server>,
 393        request: TypedEnvelope<proto::LeaveProject>,
 394    ) -> tide::Result<()> {
 395        let sender_id = request.sender_id;
 396        let project_id = request.payload.project_id;
 397        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 398
 399        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 400            self.peer.send(
 401                conn_id,
 402                proto::RemoveProjectCollaborator {
 403                    project_id,
 404                    peer_id: sender_id.0,
 405                },
 406            )
 407        })?;
 408        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 409
 410        Ok(())
 411    }
 412
 413    async fn register_worktree(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::RegisterWorktree>,
 416    ) -> tide::Result<proto::Ack> {
 417        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 418
 419        let mut contact_user_ids = HashSet::default();
 420        contact_user_ids.insert(host_user_id);
 421        for github_login in request.payload.authorized_logins {
 422            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 423            contact_user_ids.insert(contact_user_id);
 424        }
 425
 426        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 427        self.state_mut().register_worktree(
 428            request.payload.project_id,
 429            request.payload.worktree_id,
 430            request.sender_id,
 431            Worktree {
 432                authorized_user_ids: contact_user_ids.clone(),
 433                root_name: request.payload.root_name,
 434                share: None,
 435                weak: false,
 436            },
 437        )?;
 438        self.update_contacts_for_users(&contact_user_ids)?;
 439        Ok(proto::Ack {})
 440    }
 441
 442    async fn unregister_worktree(
 443        mut self: Arc<Server>,
 444        request: TypedEnvelope<proto::UnregisterWorktree>,
 445    ) -> tide::Result<()> {
 446        let project_id = request.payload.project_id;
 447        let worktree_id = request.payload.worktree_id;
 448        let (worktree, guest_connection_ids) =
 449            self.state_mut()
 450                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 451        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 452            self.peer.send(
 453                conn_id,
 454                proto::UnregisterWorktree {
 455                    project_id,
 456                    worktree_id,
 457                },
 458            )
 459        })?;
 460        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 461        Ok(())
 462    }
 463
 464    async fn share_worktree(
 465        mut self: Arc<Server>,
 466        mut request: TypedEnvelope<proto::ShareWorktree>,
 467    ) -> tide::Result<proto::Ack> {
 468        let worktree = request
 469            .payload
 470            .worktree
 471            .as_mut()
 472            .ok_or_else(|| anyhow!("missing worktree"))?;
 473        let entries = worktree
 474            .entries
 475            .iter()
 476            .map(|entry| (entry.id, entry.clone()))
 477            .collect();
 478        let diagnostic_summaries = worktree
 479            .diagnostic_summaries
 480            .iter()
 481            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 482            .collect();
 483
 484        let shared_worktree = self.state_mut().share_worktree(
 485            request.payload.project_id,
 486            worktree.id,
 487            request.sender_id,
 488            entries,
 489            diagnostic_summaries,
 490            worktree.next_update_id,
 491        )?;
 492
 493        broadcast(
 494            request.sender_id,
 495            shared_worktree.connection_ids,
 496            |connection_id| {
 497                self.peer
 498                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 499            },
 500        )?;
 501        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 502
 503        Ok(proto::Ack {})
 504    }
 505
 506    async fn update_worktree(
 507        mut self: Arc<Server>,
 508        request: TypedEnvelope<proto::UpdateWorktree>,
 509    ) -> tide::Result<proto::Ack> {
 510        let connection_ids = self.state_mut().update_worktree(
 511            request.sender_id,
 512            request.payload.project_id,
 513            request.payload.worktree_id,
 514            request.payload.id,
 515            &request.payload.removed_entries,
 516            &request.payload.updated_entries,
 517        )?;
 518
 519        broadcast(request.sender_id, connection_ids, |connection_id| {
 520            self.peer
 521                .forward_send(request.sender_id, connection_id, request.payload.clone())
 522        })?;
 523
 524        Ok(proto::Ack {})
 525    }
 526
 527    async fn update_diagnostic_summary(
 528        mut self: Arc<Server>,
 529        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 530    ) -> tide::Result<()> {
 531        let summary = request
 532            .payload
 533            .summary
 534            .clone()
 535            .ok_or_else(|| anyhow!("invalid summary"))?;
 536        let receiver_ids = self.state_mut().update_diagnostic_summary(
 537            request.payload.project_id,
 538            request.payload.worktree_id,
 539            request.sender_id,
 540            summary,
 541        )?;
 542
 543        broadcast(request.sender_id, receiver_ids, |connection_id| {
 544            self.peer
 545                .forward_send(request.sender_id, connection_id, request.payload.clone())
 546        })?;
 547        Ok(())
 548    }
 549
 550    async fn disk_based_diagnostics_updating(
 551        self: Arc<Server>,
 552        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 553    ) -> tide::Result<()> {
 554        let receiver_ids = self
 555            .state()
 556            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 557        broadcast(request.sender_id, receiver_ids, |connection_id| {
 558            self.peer
 559                .forward_send(request.sender_id, connection_id, request.payload.clone())
 560        })?;
 561        Ok(())
 562    }
 563
 564    async fn disk_based_diagnostics_updated(
 565        self: Arc<Server>,
 566        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 567    ) -> tide::Result<()> {
 568        let receiver_ids = self
 569            .state()
 570            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 571        broadcast(request.sender_id, receiver_ids, |connection_id| {
 572            self.peer
 573                .forward_send(request.sender_id, connection_id, request.payload.clone())
 574        })?;
 575        Ok(())
 576    }
 577
 578    async fn get_definition(
 579        self: Arc<Server>,
 580        request: TypedEnvelope<proto::GetDefinition>,
 581    ) -> tide::Result<proto::GetDefinitionResponse> {
 582        let host_connection_id = self
 583            .state()
 584            .read_project(request.payload.project_id, request.sender_id)?
 585            .host_connection_id;
 586        Ok(self
 587            .peer
 588            .forward_request(request.sender_id, host_connection_id, request.payload)
 589            .await?)
 590    }
 591
 592    async fn get_project_symbols(
 593        self: Arc<Server>,
 594        request: TypedEnvelope<proto::GetProjectSymbols>,
 595    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 596        let host_connection_id = self
 597            .state()
 598            .read_project(request.payload.project_id, request.sender_id)?
 599            .host_connection_id;
 600        Ok(self
 601            .peer
 602            .forward_request(request.sender_id, host_connection_id, request.payload)
 603            .await?)
 604    }
 605
 606    async fn open_buffer_for_symbol(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::OpenBufferForSymbol>,
 609    ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
 610        let host_connection_id = self
 611            .state()
 612            .read_project(request.payload.project_id, request.sender_id)?
 613            .host_connection_id;
 614        Ok(self
 615            .peer
 616            .forward_request(request.sender_id, host_connection_id, request.payload)
 617            .await?)
 618    }
 619
 620    async fn open_buffer(
 621        self: Arc<Server>,
 622        request: TypedEnvelope<proto::OpenBuffer>,
 623    ) -> tide::Result<proto::OpenBufferResponse> {
 624        let host_connection_id = self
 625            .state()
 626            .read_project(request.payload.project_id, request.sender_id)?
 627            .host_connection_id;
 628        Ok(self
 629            .peer
 630            .forward_request(request.sender_id, host_connection_id, request.payload)
 631            .await?)
 632    }
 633
 634    async fn close_buffer(
 635        self: Arc<Server>,
 636        request: TypedEnvelope<proto::CloseBuffer>,
 637    ) -> tide::Result<()> {
 638        let host_connection_id = self
 639            .state()
 640            .read_project(request.payload.project_id, request.sender_id)?
 641            .host_connection_id;
 642        self.peer
 643            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 644        Ok(())
 645    }
 646
 647    async fn save_buffer(
 648        self: Arc<Server>,
 649        request: TypedEnvelope<proto::SaveBuffer>,
 650    ) -> tide::Result<proto::BufferSaved> {
 651        let host;
 652        let mut guests;
 653        {
 654            let state = self.state();
 655            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 656            host = project.host_connection_id;
 657            guests = project.guest_connection_ids()
 658        }
 659
 660        let response = self
 661            .peer
 662            .forward_request(request.sender_id, host, request.payload.clone())
 663            .await?;
 664
 665        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 666        broadcast(host, guests, |conn_id| {
 667            self.peer.forward_send(host, conn_id, response.clone())
 668        })?;
 669
 670        Ok(response)
 671    }
 672
 673    async fn format_buffers(
 674        self: Arc<Server>,
 675        request: TypedEnvelope<proto::FormatBuffers>,
 676    ) -> tide::Result<proto::FormatBuffersResponse> {
 677        let host = self
 678            .state()
 679            .read_project(request.payload.project_id, request.sender_id)?
 680            .host_connection_id;
 681        Ok(self
 682            .peer
 683            .forward_request(request.sender_id, host, request.payload.clone())
 684            .await?)
 685    }
 686
 687    async fn get_completions(
 688        self: Arc<Server>,
 689        request: TypedEnvelope<proto::GetCompletions>,
 690    ) -> tide::Result<proto::GetCompletionsResponse> {
 691        let host = self
 692            .state()
 693            .read_project(request.payload.project_id, request.sender_id)?
 694            .host_connection_id;
 695        Ok(self
 696            .peer
 697            .forward_request(request.sender_id, host, request.payload.clone())
 698            .await?)
 699    }
 700
 701    async fn apply_additional_edits_for_completion(
 702        self: Arc<Server>,
 703        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 704    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 705        let host = self
 706            .state()
 707            .read_project(request.payload.project_id, request.sender_id)?
 708            .host_connection_id;
 709        Ok(self
 710            .peer
 711            .forward_request(request.sender_id, host, request.payload.clone())
 712            .await?)
 713    }
 714
 715    async fn get_code_actions(
 716        self: Arc<Server>,
 717        request: TypedEnvelope<proto::GetCodeActions>,
 718    ) -> tide::Result<proto::GetCodeActionsResponse> {
 719        let host = self
 720            .state()
 721            .read_project(request.payload.project_id, request.sender_id)?
 722            .host_connection_id;
 723        Ok(self
 724            .peer
 725            .forward_request(request.sender_id, host, request.payload.clone())
 726            .await?)
 727    }
 728
 729    async fn apply_code_action(
 730        self: Arc<Server>,
 731        request: TypedEnvelope<proto::ApplyCodeAction>,
 732    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 733        let host = self
 734            .state()
 735            .read_project(request.payload.project_id, request.sender_id)?
 736            .host_connection_id;
 737        Ok(self
 738            .peer
 739            .forward_request(request.sender_id, host, request.payload.clone())
 740            .await?)
 741    }
 742
 743    async fn prepare_rename(
 744        self: Arc<Server>,
 745        request: TypedEnvelope<proto::PrepareRename>,
 746    ) -> tide::Result<proto::PrepareRenameResponse> {
 747        let host = self
 748            .state()
 749            .read_project(request.payload.project_id, request.sender_id)?
 750            .host_connection_id;
 751        Ok(self
 752            .peer
 753            .forward_request(request.sender_id, host, request.payload.clone())
 754            .await?)
 755    }
 756
 757    async fn perform_rename(
 758        self: Arc<Server>,
 759        request: TypedEnvelope<proto::PerformRename>,
 760    ) -> tide::Result<proto::PerformRenameResponse> {
 761        let host = self
 762            .state()
 763            .read_project(request.payload.project_id, request.sender_id)?
 764            .host_connection_id;
 765        Ok(self
 766            .peer
 767            .forward_request(request.sender_id, host, request.payload.clone())
 768            .await?)
 769    }
 770
 771    async fn update_buffer(
 772        self: Arc<Server>,
 773        request: TypedEnvelope<proto::UpdateBuffer>,
 774    ) -> tide::Result<proto::Ack> {
 775        let receiver_ids = self
 776            .state()
 777            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 778        broadcast(request.sender_id, receiver_ids, |connection_id| {
 779            self.peer
 780                .forward_send(request.sender_id, connection_id, request.payload.clone())
 781        })?;
 782        Ok(proto::Ack {})
 783    }
 784
 785    async fn update_buffer_file(
 786        self: Arc<Server>,
 787        request: TypedEnvelope<proto::UpdateBufferFile>,
 788    ) -> tide::Result<()> {
 789        let receiver_ids = self
 790            .state()
 791            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 792        broadcast(request.sender_id, receiver_ids, |connection_id| {
 793            self.peer
 794                .forward_send(request.sender_id, connection_id, request.payload.clone())
 795        })?;
 796        Ok(())
 797    }
 798
 799    async fn buffer_reloaded(
 800        self: Arc<Server>,
 801        request: TypedEnvelope<proto::BufferReloaded>,
 802    ) -> tide::Result<()> {
 803        let receiver_ids = self
 804            .state()
 805            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 806        broadcast(request.sender_id, receiver_ids, |connection_id| {
 807            self.peer
 808                .forward_send(request.sender_id, connection_id, request.payload.clone())
 809        })?;
 810        Ok(())
 811    }
 812
 813    async fn buffer_saved(
 814        self: Arc<Server>,
 815        request: TypedEnvelope<proto::BufferSaved>,
 816    ) -> tide::Result<()> {
 817        let receiver_ids = self
 818            .state()
 819            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 820        broadcast(request.sender_id, receiver_ids, |connection_id| {
 821            self.peer
 822                .forward_send(request.sender_id, connection_id, request.payload.clone())
 823        })?;
 824        Ok(())
 825    }
 826
 827    async fn get_channels(
 828        self: Arc<Server>,
 829        request: TypedEnvelope<proto::GetChannels>,
 830    ) -> tide::Result<proto::GetChannelsResponse> {
 831        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 832        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 833        Ok(proto::GetChannelsResponse {
 834            channels: channels
 835                .into_iter()
 836                .map(|chan| proto::Channel {
 837                    id: chan.id.to_proto(),
 838                    name: chan.name,
 839                })
 840                .collect(),
 841        })
 842    }
 843
 844    async fn get_users(
 845        self: Arc<Server>,
 846        request: TypedEnvelope<proto::GetUsers>,
 847    ) -> tide::Result<proto::GetUsersResponse> {
 848        let user_ids = request
 849            .payload
 850            .user_ids
 851            .into_iter()
 852            .map(UserId::from_proto)
 853            .collect();
 854        let users = self
 855            .app_state
 856            .db
 857            .get_users_by_ids(user_ids)
 858            .await?
 859            .into_iter()
 860            .map(|user| proto::User {
 861                id: user.id.to_proto(),
 862                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 863                github_login: user.github_login,
 864            })
 865            .collect();
 866        Ok(proto::GetUsersResponse { users })
 867    }
 868
 869    fn update_contacts_for_users<'a>(
 870        self: &Arc<Server>,
 871        user_ids: impl IntoIterator<Item = &'a UserId>,
 872    ) -> anyhow::Result<()> {
 873        let mut result = Ok(());
 874        let state = self.state();
 875        for user_id in user_ids {
 876            let contacts = state.contacts_for_user(*user_id);
 877            for connection_id in state.connection_ids_for_user(*user_id) {
 878                if let Err(error) = self.peer.send(
 879                    connection_id,
 880                    proto::UpdateContacts {
 881                        contacts: contacts.clone(),
 882                    },
 883                ) {
 884                    result = Err(error);
 885                }
 886            }
 887        }
 888        result
 889    }
 890
 891    async fn join_channel(
 892        mut self: Arc<Self>,
 893        request: TypedEnvelope<proto::JoinChannel>,
 894    ) -> tide::Result<proto::JoinChannelResponse> {
 895        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 896        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 897        if !self
 898            .app_state
 899            .db
 900            .can_user_access_channel(user_id, channel_id)
 901            .await?
 902        {
 903            Err(anyhow!("access denied"))?;
 904        }
 905
 906        self.state_mut().join_channel(request.sender_id, channel_id);
 907        let messages = self
 908            .app_state
 909            .db
 910            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 911            .await?
 912            .into_iter()
 913            .map(|msg| proto::ChannelMessage {
 914                id: msg.id.to_proto(),
 915                body: msg.body,
 916                timestamp: msg.sent_at.unix_timestamp() as u64,
 917                sender_id: msg.sender_id.to_proto(),
 918                nonce: Some(msg.nonce.as_u128().into()),
 919            })
 920            .collect::<Vec<_>>();
 921        Ok(proto::JoinChannelResponse {
 922            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 923            messages,
 924        })
 925    }
 926
 927    async fn leave_channel(
 928        mut self: Arc<Self>,
 929        request: TypedEnvelope<proto::LeaveChannel>,
 930    ) -> tide::Result<()> {
 931        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 932        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 933        if !self
 934            .app_state
 935            .db
 936            .can_user_access_channel(user_id, channel_id)
 937            .await?
 938        {
 939            Err(anyhow!("access denied"))?;
 940        }
 941
 942        self.state_mut()
 943            .leave_channel(request.sender_id, channel_id);
 944
 945        Ok(())
 946    }
 947
 948    async fn send_channel_message(
 949        self: Arc<Self>,
 950        request: TypedEnvelope<proto::SendChannelMessage>,
 951    ) -> tide::Result<proto::SendChannelMessageResponse> {
 952        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 953        let user_id;
 954        let connection_ids;
 955        {
 956            let state = self.state();
 957            user_id = state.user_id_for_connection(request.sender_id)?;
 958            connection_ids = state.channel_connection_ids(channel_id)?;
 959        }
 960
 961        // Validate the message body.
 962        let body = request.payload.body.trim().to_string();
 963        if body.len() > MAX_MESSAGE_LEN {
 964            return Err(anyhow!("message is too long"))?;
 965        }
 966        if body.is_empty() {
 967            return Err(anyhow!("message can't be blank"))?;
 968        }
 969
 970        let timestamp = OffsetDateTime::now_utc();
 971        let nonce = request
 972            .payload
 973            .nonce
 974            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 975
 976        let message_id = self
 977            .app_state
 978            .db
 979            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 980            .await?
 981            .to_proto();
 982        let message = proto::ChannelMessage {
 983            sender_id: user_id.to_proto(),
 984            id: message_id,
 985            body,
 986            timestamp: timestamp.unix_timestamp() as u64,
 987            nonce: Some(nonce),
 988        };
 989        broadcast(request.sender_id, connection_ids, |conn_id| {
 990            self.peer.send(
 991                conn_id,
 992                proto::ChannelMessageSent {
 993                    channel_id: channel_id.to_proto(),
 994                    message: Some(message.clone()),
 995                },
 996            )
 997        })?;
 998        Ok(proto::SendChannelMessageResponse {
 999            message: Some(message),
1000        })
1001    }
1002
1003    async fn get_channel_messages(
1004        self: Arc<Self>,
1005        request: TypedEnvelope<proto::GetChannelMessages>,
1006    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1007        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1008        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1009        if !self
1010            .app_state
1011            .db
1012            .can_user_access_channel(user_id, channel_id)
1013            .await?
1014        {
1015            Err(anyhow!("access denied"))?;
1016        }
1017
1018        let messages = self
1019            .app_state
1020            .db
1021            .get_channel_messages(
1022                channel_id,
1023                MESSAGE_COUNT_PER_PAGE,
1024                Some(MessageId::from_proto(request.payload.before_message_id)),
1025            )
1026            .await?
1027            .into_iter()
1028            .map(|msg| proto::ChannelMessage {
1029                id: msg.id.to_proto(),
1030                body: msg.body,
1031                timestamp: msg.sent_at.unix_timestamp() as u64,
1032                sender_id: msg.sender_id.to_proto(),
1033                nonce: Some(msg.nonce.as_u128().into()),
1034            })
1035            .collect::<Vec<_>>();
1036
1037        Ok(proto::GetChannelMessagesResponse {
1038            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1039            messages,
1040        })
1041    }
1042
1043    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1044        self.store.read()
1045    }
1046
1047    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1048        self.store.write()
1049    }
1050}
1051
1052impl Executor for RealExecutor {
1053    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1054        task::spawn(future);
1055    }
1056}
1057
1058fn broadcast<F>(
1059    sender_id: ConnectionId,
1060    receiver_ids: Vec<ConnectionId>,
1061    mut f: F,
1062) -> anyhow::Result<()>
1063where
1064    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1065{
1066    let mut result = Ok(());
1067    for receiver_id in receiver_ids {
1068        if receiver_id != sender_id {
1069            if let Err(error) = f(receiver_id) {
1070                if result.is_ok() {
1071                    result = Err(error);
1072                }
1073            }
1074        }
1075    }
1076    result
1077}
1078
1079pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1080    let server = Server::new(app.state().clone(), rpc.clone(), None);
1081    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1082        let server = server.clone();
1083        async move {
1084            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1085
1086            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1087            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1088            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1089            let client_protocol_version: Option<u32> = request
1090                .header("X-Zed-Protocol-Version")
1091                .and_then(|v| v.as_str().parse().ok());
1092
1093            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1094                return Ok(Response::new(StatusCode::UpgradeRequired));
1095            }
1096
1097            let header = match request.header("Sec-Websocket-Key") {
1098                Some(h) => h.as_str(),
1099                None => return Err(anyhow!("expected sec-websocket-key"))?,
1100            };
1101
1102            let user_id = process_auth_header(&request).await?;
1103
1104            let mut response = Response::new(StatusCode::SwitchingProtocols);
1105            response.insert_header(UPGRADE, "websocket");
1106            response.insert_header(CONNECTION, "Upgrade");
1107            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1108            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1109            response.insert_header("Sec-Websocket-Version", "13");
1110
1111            let http_res: &mut tide::http::Response = response.as_mut();
1112            let upgrade_receiver = http_res.recv_upgrade().await;
1113            let addr = request.remote().unwrap_or("unknown").to_string();
1114            task::spawn(async move {
1115                if let Some(stream) = upgrade_receiver.await {
1116                    server
1117                        .handle_connection(
1118                            Connection::new(
1119                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1120                            ),
1121                            addr,
1122                            user_id,
1123                            None,
1124                            RealExecutor,
1125                        )
1126                        .await;
1127                }
1128            });
1129
1130            Ok(response)
1131        }
1132    });
1133}
1134
1135fn header_contains_ignore_case<T>(
1136    request: &tide::Request<T>,
1137    header_name: HeaderName,
1138    value: &str,
1139) -> bool {
1140    request
1141        .header(header_name)
1142        .map(|h| {
1143            h.as_str()
1144                .split(',')
1145                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1146        })
1147        .unwrap_or(false)
1148}
1149
1150#[cfg(test)]
1151mod tests {
1152    use super::*;
1153    use crate::{
1154        auth,
1155        db::{tests::TestDb, UserId},
1156        github, AppState, Config,
1157    };
1158    use ::rpc::Peer;
1159    use collections::BTreeMap;
1160    use gpui::{executor, ModelHandle, TestAppContext};
1161    use parking_lot::Mutex;
1162    use postage::{sink::Sink, watch};
1163    use rand::prelude::*;
1164    use rpc::PeerId;
1165    use serde_json::json;
1166    use sqlx::types::time::OffsetDateTime;
1167    use std::{
1168        cell::Cell,
1169        env,
1170        ops::Deref,
1171        path::Path,
1172        rc::Rc,
1173        sync::{
1174            atomic::{AtomicBool, Ordering::SeqCst},
1175            Arc,
1176        },
1177        time::Duration,
1178    };
1179    use zed::{
1180        client::{
1181            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1182            EstablishConnectionError, UserStore,
1183        },
1184        editor::{
1185            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1186            Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1187        },
1188        fs::{FakeFs, Fs as _},
1189        language::{
1190            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1191            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1192        },
1193        lsp,
1194        project::{DiagnosticSummary, Project, ProjectPath},
1195        workspace::{Workspace, WorkspaceParams},
1196    };
1197
1198    #[cfg(test)]
1199    #[ctor::ctor]
1200    fn init_logger() {
1201        if std::env::var("RUST_LOG").is_ok() {
1202            env_logger::init();
1203        }
1204    }
1205
1206    #[gpui::test(iterations = 10)]
1207    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1208        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1209        let lang_registry = Arc::new(LanguageRegistry::new());
1210        let fs = FakeFs::new(cx_a.background());
1211        cx_a.foreground().forbid_parking();
1212
1213        // Connect to a server as 2 clients.
1214        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1215        let client_a = server.create_client(&mut cx_a, "user_a").await;
1216        let client_b = server.create_client(&mut cx_b, "user_b").await;
1217
1218        // Share a project as client A
1219        fs.insert_tree(
1220            "/a",
1221            json!({
1222                ".zed.toml": r#"collaborators = ["user_b"]"#,
1223                "a.txt": "a-contents",
1224                "b.txt": "b-contents",
1225            }),
1226        )
1227        .await;
1228        let project_a = cx_a.update(|cx| {
1229            Project::local(
1230                client_a.clone(),
1231                client_a.user_store.clone(),
1232                lang_registry.clone(),
1233                fs.clone(),
1234                cx,
1235            )
1236        });
1237        let (worktree_a, _) = project_a
1238            .update(&mut cx_a, |p, cx| {
1239                p.find_or_create_local_worktree("/a", false, cx)
1240            })
1241            .await
1242            .unwrap();
1243        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1244        worktree_a
1245            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1246            .await;
1247        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1248        project_a
1249            .update(&mut cx_a, |p, cx| p.share(cx))
1250            .await
1251            .unwrap();
1252
1253        // Join that project as client B
1254        let project_b = Project::remote(
1255            project_id,
1256            client_b.clone(),
1257            client_b.user_store.clone(),
1258            lang_registry.clone(),
1259            fs.clone(),
1260            &mut cx_b.to_async(),
1261        )
1262        .await
1263        .unwrap();
1264
1265        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1266            assert_eq!(
1267                project
1268                    .collaborators()
1269                    .get(&client_a.peer_id)
1270                    .unwrap()
1271                    .user
1272                    .github_login,
1273                "user_a"
1274            );
1275            project.replica_id()
1276        });
1277        project_a
1278            .condition(&cx_a, |tree, _| {
1279                tree.collaborators()
1280                    .get(&client_b.peer_id)
1281                    .map_or(false, |collaborator| {
1282                        collaborator.replica_id == replica_id_b
1283                            && collaborator.user.github_login == "user_b"
1284                    })
1285            })
1286            .await;
1287
1288        // Open the same file as client B and client A.
1289        let buffer_b = project_b
1290            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1291            .await
1292            .unwrap();
1293        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1294        buffer_b.read_with(&cx_b, |buf, cx| {
1295            assert_eq!(buf.read(cx).text(), "b-contents")
1296        });
1297        project_a.read_with(&cx_a, |project, cx| {
1298            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1299        });
1300        let buffer_a = project_a
1301            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1302            .await
1303            .unwrap();
1304
1305        let editor_b = cx_b.add_view(window_b, |cx| {
1306            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1307        });
1308
1309        // TODO
1310        // // Create a selection set as client B and see that selection set as client A.
1311        // buffer_a
1312        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1313        //     .await;
1314
1315        // Edit the buffer as client B and see that edit as client A.
1316        editor_b.update(&mut cx_b, |editor, cx| {
1317            editor.handle_input(&Input("ok, ".into()), cx)
1318        });
1319        buffer_a
1320            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1321            .await;
1322
1323        // TODO
1324        // // Remove the selection set as client B, see those selections disappear as client A.
1325        cx_b.update(move |_| drop(editor_b));
1326        // buffer_a
1327        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1328        //     .await;
1329
1330        // Close the buffer as client A, see that the buffer is closed.
1331        cx_a.update(move |_| drop(buffer_a));
1332        project_a
1333            .condition(&cx_a, |project, cx| {
1334                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1335            })
1336            .await;
1337
1338        // Dropping the client B's project removes client B from client A's collaborators.
1339        cx_b.update(move |_| drop(project_b));
1340        project_a
1341            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1342            .await;
1343    }
1344
1345    #[gpui::test(iterations = 10)]
1346    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1347        let lang_registry = Arc::new(LanguageRegistry::new());
1348        let fs = FakeFs::new(cx_a.background());
1349        cx_a.foreground().forbid_parking();
1350
1351        // Connect to a server as 2 clients.
1352        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1353        let client_a = server.create_client(&mut cx_a, "user_a").await;
1354        let client_b = server.create_client(&mut cx_b, "user_b").await;
1355
1356        // Share a project as client A
1357        fs.insert_tree(
1358            "/a",
1359            json!({
1360                ".zed.toml": r#"collaborators = ["user_b"]"#,
1361                "a.txt": "a-contents",
1362                "b.txt": "b-contents",
1363            }),
1364        )
1365        .await;
1366        let project_a = cx_a.update(|cx| {
1367            Project::local(
1368                client_a.clone(),
1369                client_a.user_store.clone(),
1370                lang_registry.clone(),
1371                fs.clone(),
1372                cx,
1373            )
1374        });
1375        let (worktree_a, _) = project_a
1376            .update(&mut cx_a, |p, cx| {
1377                p.find_or_create_local_worktree("/a", false, cx)
1378            })
1379            .await
1380            .unwrap();
1381        worktree_a
1382            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1383            .await;
1384        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1385        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1386        project_a
1387            .update(&mut cx_a, |p, cx| p.share(cx))
1388            .await
1389            .unwrap();
1390        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1391
1392        // Join that project as client B
1393        let project_b = Project::remote(
1394            project_id,
1395            client_b.clone(),
1396            client_b.user_store.clone(),
1397            lang_registry.clone(),
1398            fs.clone(),
1399            &mut cx_b.to_async(),
1400        )
1401        .await
1402        .unwrap();
1403        project_b
1404            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1405            .await
1406            .unwrap();
1407
1408        // Unshare the project as client A
1409        project_a
1410            .update(&mut cx_a, |project, cx| project.unshare(cx))
1411            .await
1412            .unwrap();
1413        project_b
1414            .condition(&mut cx_b, |project, _| project.is_read_only())
1415            .await;
1416        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1417        drop(project_b);
1418
1419        // Share the project again and ensure guests can still join.
1420        project_a
1421            .update(&mut cx_a, |project, cx| project.share(cx))
1422            .await
1423            .unwrap();
1424        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1425
1426        let project_c = Project::remote(
1427            project_id,
1428            client_b.clone(),
1429            client_b.user_store.clone(),
1430            lang_registry.clone(),
1431            fs.clone(),
1432            &mut cx_b.to_async(),
1433        )
1434        .await
1435        .unwrap();
1436        project_c
1437            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1438            .await
1439            .unwrap();
1440    }
1441
1442    #[gpui::test(iterations = 10)]
1443    async fn test_propagate_saves_and_fs_changes(
1444        mut cx_a: TestAppContext,
1445        mut cx_b: TestAppContext,
1446        mut cx_c: TestAppContext,
1447    ) {
1448        let lang_registry = Arc::new(LanguageRegistry::new());
1449        let fs = FakeFs::new(cx_a.background());
1450        cx_a.foreground().forbid_parking();
1451
1452        // Connect to a server as 3 clients.
1453        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1454        let client_a = server.create_client(&mut cx_a, "user_a").await;
1455        let client_b = server.create_client(&mut cx_b, "user_b").await;
1456        let client_c = server.create_client(&mut cx_c, "user_c").await;
1457
1458        // Share a worktree as client A.
1459        fs.insert_tree(
1460            "/a",
1461            json!({
1462                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1463                "file1": "",
1464                "file2": ""
1465            }),
1466        )
1467        .await;
1468        let project_a = cx_a.update(|cx| {
1469            Project::local(
1470                client_a.clone(),
1471                client_a.user_store.clone(),
1472                lang_registry.clone(),
1473                fs.clone(),
1474                cx,
1475            )
1476        });
1477        let (worktree_a, _) = project_a
1478            .update(&mut cx_a, |p, cx| {
1479                p.find_or_create_local_worktree("/a", false, cx)
1480            })
1481            .await
1482            .unwrap();
1483        worktree_a
1484            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1485            .await;
1486        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1487        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1488        project_a
1489            .update(&mut cx_a, |p, cx| p.share(cx))
1490            .await
1491            .unwrap();
1492
1493        // Join that worktree as clients B and C.
1494        let project_b = Project::remote(
1495            project_id,
1496            client_b.clone(),
1497            client_b.user_store.clone(),
1498            lang_registry.clone(),
1499            fs.clone(),
1500            &mut cx_b.to_async(),
1501        )
1502        .await
1503        .unwrap();
1504        let project_c = Project::remote(
1505            project_id,
1506            client_c.clone(),
1507            client_c.user_store.clone(),
1508            lang_registry.clone(),
1509            fs.clone(),
1510            &mut cx_c.to_async(),
1511        )
1512        .await
1513        .unwrap();
1514        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1515        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1516
1517        // Open and edit a buffer as both guests B and C.
1518        let buffer_b = project_b
1519            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1520            .await
1521            .unwrap();
1522        let buffer_c = project_c
1523            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1524            .await
1525            .unwrap();
1526        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1527        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1528
1529        // Open and edit that buffer as the host.
1530        let buffer_a = project_a
1531            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1532            .await
1533            .unwrap();
1534
1535        buffer_a
1536            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1537            .await;
1538        buffer_a.update(&mut cx_a, |buf, cx| {
1539            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1540        });
1541
1542        // Wait for edits to propagate
1543        buffer_a
1544            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1545            .await;
1546        buffer_b
1547            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1548            .await;
1549        buffer_c
1550            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1551            .await;
1552
1553        // Edit the buffer as the host and concurrently save as guest B.
1554        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1555        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1556        save_b.await.unwrap();
1557        assert_eq!(
1558            fs.load("/a/file1".as_ref()).await.unwrap(),
1559            "hi-a, i-am-c, i-am-b, i-am-a"
1560        );
1561        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1562        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1563        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1564
1565        // Make changes on host's file system, see those changes on guest worktrees.
1566        fs.rename(
1567            "/a/file1".as_ref(),
1568            "/a/file1-renamed".as_ref(),
1569            Default::default(),
1570        )
1571        .await
1572        .unwrap();
1573
1574        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1575            .await
1576            .unwrap();
1577        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1578
1579        worktree_a
1580            .condition(&cx_a, |tree, _| {
1581                tree.paths()
1582                    .map(|p| p.to_string_lossy())
1583                    .collect::<Vec<_>>()
1584                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1585            })
1586            .await;
1587        worktree_b
1588            .condition(&cx_b, |tree, _| {
1589                tree.paths()
1590                    .map(|p| p.to_string_lossy())
1591                    .collect::<Vec<_>>()
1592                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1593            })
1594            .await;
1595        worktree_c
1596            .condition(&cx_c, |tree, _| {
1597                tree.paths()
1598                    .map(|p| p.to_string_lossy())
1599                    .collect::<Vec<_>>()
1600                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1601            })
1602            .await;
1603
1604        // Ensure buffer files are updated as well.
1605        buffer_a
1606            .condition(&cx_a, |buf, _| {
1607                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1608            })
1609            .await;
1610        buffer_b
1611            .condition(&cx_b, |buf, _| {
1612                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1613            })
1614            .await;
1615        buffer_c
1616            .condition(&cx_c, |buf, _| {
1617                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1618            })
1619            .await;
1620    }
1621
1622    #[gpui::test(iterations = 10)]
1623    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1624        cx_a.foreground().forbid_parking();
1625        let lang_registry = Arc::new(LanguageRegistry::new());
1626        let fs = FakeFs::new(cx_a.background());
1627
1628        // Connect to a server as 2 clients.
1629        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1630        let client_a = server.create_client(&mut cx_a, "user_a").await;
1631        let client_b = server.create_client(&mut cx_b, "user_b").await;
1632
1633        // Share a project as client A
1634        fs.insert_tree(
1635            "/dir",
1636            json!({
1637                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1638                "a.txt": "a-contents",
1639            }),
1640        )
1641        .await;
1642
1643        let project_a = cx_a.update(|cx| {
1644            Project::local(
1645                client_a.clone(),
1646                client_a.user_store.clone(),
1647                lang_registry.clone(),
1648                fs.clone(),
1649                cx,
1650            )
1651        });
1652        let (worktree_a, _) = project_a
1653            .update(&mut cx_a, |p, cx| {
1654                p.find_or_create_local_worktree("/dir", false, cx)
1655            })
1656            .await
1657            .unwrap();
1658        worktree_a
1659            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1660            .await;
1661        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1662        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1663        project_a
1664            .update(&mut cx_a, |p, cx| p.share(cx))
1665            .await
1666            .unwrap();
1667
1668        // Join that project as client B
1669        let project_b = Project::remote(
1670            project_id,
1671            client_b.clone(),
1672            client_b.user_store.clone(),
1673            lang_registry.clone(),
1674            fs.clone(),
1675            &mut cx_b.to_async(),
1676        )
1677        .await
1678        .unwrap();
1679
1680        // Open a buffer as client B
1681        let buffer_b = project_b
1682            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1683            .await
1684            .unwrap();
1685
1686        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1687        buffer_b.read_with(&cx_b, |buf, _| {
1688            assert!(buf.is_dirty());
1689            assert!(!buf.has_conflict());
1690        });
1691
1692        buffer_b
1693            .update(&mut cx_b, |buf, cx| buf.save(cx))
1694            .await
1695            .unwrap();
1696        buffer_b
1697            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1698            .await;
1699        buffer_b.read_with(&cx_b, |buf, _| {
1700            assert!(!buf.has_conflict());
1701        });
1702
1703        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1704        buffer_b.read_with(&cx_b, |buf, _| {
1705            assert!(buf.is_dirty());
1706            assert!(!buf.has_conflict());
1707        });
1708    }
1709
1710    #[gpui::test(iterations = 10)]
1711    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1712        cx_a.foreground().forbid_parking();
1713        let lang_registry = Arc::new(LanguageRegistry::new());
1714        let fs = FakeFs::new(cx_a.background());
1715
1716        // Connect to a server as 2 clients.
1717        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1718        let client_a = server.create_client(&mut cx_a, "user_a").await;
1719        let client_b = server.create_client(&mut cx_b, "user_b").await;
1720
1721        // Share a project as client A
1722        fs.insert_tree(
1723            "/dir",
1724            json!({
1725                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1726                "a.txt": "a-contents",
1727            }),
1728        )
1729        .await;
1730
1731        let project_a = cx_a.update(|cx| {
1732            Project::local(
1733                client_a.clone(),
1734                client_a.user_store.clone(),
1735                lang_registry.clone(),
1736                fs.clone(),
1737                cx,
1738            )
1739        });
1740        let (worktree_a, _) = project_a
1741            .update(&mut cx_a, |p, cx| {
1742                p.find_or_create_local_worktree("/dir", false, cx)
1743            })
1744            .await
1745            .unwrap();
1746        worktree_a
1747            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1748            .await;
1749        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1750        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1751        project_a
1752            .update(&mut cx_a, |p, cx| p.share(cx))
1753            .await
1754            .unwrap();
1755
1756        // Join that project as client B
1757        let project_b = Project::remote(
1758            project_id,
1759            client_b.clone(),
1760            client_b.user_store.clone(),
1761            lang_registry.clone(),
1762            fs.clone(),
1763            &mut cx_b.to_async(),
1764        )
1765        .await
1766        .unwrap();
1767        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1768
1769        // Open a buffer as client B
1770        let buffer_b = project_b
1771            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1772            .await
1773            .unwrap();
1774        buffer_b.read_with(&cx_b, |buf, _| {
1775            assert!(!buf.is_dirty());
1776            assert!(!buf.has_conflict());
1777        });
1778
1779        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1780            .await
1781            .unwrap();
1782        buffer_b
1783            .condition(&cx_b, |buf, _| {
1784                buf.text() == "new contents" && !buf.is_dirty()
1785            })
1786            .await;
1787        buffer_b.read_with(&cx_b, |buf, _| {
1788            assert!(!buf.has_conflict());
1789        });
1790    }
1791
1792    #[gpui::test(iterations = 10)]
1793    async fn test_editing_while_guest_opens_buffer(
1794        mut cx_a: TestAppContext,
1795        mut cx_b: TestAppContext,
1796    ) {
1797        cx_a.foreground().forbid_parking();
1798        let lang_registry = Arc::new(LanguageRegistry::new());
1799        let fs = FakeFs::new(cx_a.background());
1800
1801        // Connect to a server as 2 clients.
1802        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1803        let client_a = server.create_client(&mut cx_a, "user_a").await;
1804        let client_b = server.create_client(&mut cx_b, "user_b").await;
1805
1806        // Share a project as client A
1807        fs.insert_tree(
1808            "/dir",
1809            json!({
1810                ".zed.toml": r#"collaborators = ["user_b"]"#,
1811                "a.txt": "a-contents",
1812            }),
1813        )
1814        .await;
1815        let project_a = cx_a.update(|cx| {
1816            Project::local(
1817                client_a.clone(),
1818                client_a.user_store.clone(),
1819                lang_registry.clone(),
1820                fs.clone(),
1821                cx,
1822            )
1823        });
1824        let (worktree_a, _) = project_a
1825            .update(&mut cx_a, |p, cx| {
1826                p.find_or_create_local_worktree("/dir", false, cx)
1827            })
1828            .await
1829            .unwrap();
1830        worktree_a
1831            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1832            .await;
1833        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1834        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1835        project_a
1836            .update(&mut cx_a, |p, cx| p.share(cx))
1837            .await
1838            .unwrap();
1839
1840        // Join that project as client B
1841        let project_b = Project::remote(
1842            project_id,
1843            client_b.clone(),
1844            client_b.user_store.clone(),
1845            lang_registry.clone(),
1846            fs.clone(),
1847            &mut cx_b.to_async(),
1848        )
1849        .await
1850        .unwrap();
1851
1852        // Open a buffer as client A
1853        let buffer_a = project_a
1854            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1855            .await
1856            .unwrap();
1857
1858        // Start opening the same buffer as client B
1859        let buffer_b = cx_b
1860            .background()
1861            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1862
1863        // Edit the buffer as client A while client B is still opening it.
1864        cx_b.background().simulate_random_delay().await;
1865        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1866        cx_b.background().simulate_random_delay().await;
1867        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1868
1869        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1870        let buffer_b = buffer_b.await.unwrap();
1871        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1872    }
1873
1874    #[gpui::test(iterations = 10)]
1875    async fn test_leaving_worktree_while_opening_buffer(
1876        mut cx_a: TestAppContext,
1877        mut cx_b: TestAppContext,
1878    ) {
1879        cx_a.foreground().forbid_parking();
1880        let lang_registry = Arc::new(LanguageRegistry::new());
1881        let fs = FakeFs::new(cx_a.background());
1882
1883        // Connect to a server as 2 clients.
1884        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1885        let client_a = server.create_client(&mut cx_a, "user_a").await;
1886        let client_b = server.create_client(&mut cx_b, "user_b").await;
1887
1888        // Share a project as client A
1889        fs.insert_tree(
1890            "/dir",
1891            json!({
1892                ".zed.toml": r#"collaborators = ["user_b"]"#,
1893                "a.txt": "a-contents",
1894            }),
1895        )
1896        .await;
1897        let project_a = cx_a.update(|cx| {
1898            Project::local(
1899                client_a.clone(),
1900                client_a.user_store.clone(),
1901                lang_registry.clone(),
1902                fs.clone(),
1903                cx,
1904            )
1905        });
1906        let (worktree_a, _) = project_a
1907            .update(&mut cx_a, |p, cx| {
1908                p.find_or_create_local_worktree("/dir", false, cx)
1909            })
1910            .await
1911            .unwrap();
1912        worktree_a
1913            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1914            .await;
1915        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1916        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1917        project_a
1918            .update(&mut cx_a, |p, cx| p.share(cx))
1919            .await
1920            .unwrap();
1921
1922        // Join that project as client B
1923        let project_b = Project::remote(
1924            project_id,
1925            client_b.clone(),
1926            client_b.user_store.clone(),
1927            lang_registry.clone(),
1928            fs.clone(),
1929            &mut cx_b.to_async(),
1930        )
1931        .await
1932        .unwrap();
1933
1934        // See that a guest has joined as client A.
1935        project_a
1936            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1937            .await;
1938
1939        // Begin opening a buffer as client B, but leave the project before the open completes.
1940        let buffer_b = cx_b
1941            .background()
1942            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1943        cx_b.update(|_| drop(project_b));
1944        drop(buffer_b);
1945
1946        // See that the guest has left.
1947        project_a
1948            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1949            .await;
1950    }
1951
1952    #[gpui::test(iterations = 10)]
1953    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1954        cx_a.foreground().forbid_parking();
1955        let lang_registry = Arc::new(LanguageRegistry::new());
1956        let fs = FakeFs::new(cx_a.background());
1957
1958        // Connect to a server as 2 clients.
1959        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1960        let client_a = server.create_client(&mut cx_a, "user_a").await;
1961        let client_b = server.create_client(&mut cx_b, "user_b").await;
1962
1963        // Share a project as client A
1964        fs.insert_tree(
1965            "/a",
1966            json!({
1967                ".zed.toml": r#"collaborators = ["user_b"]"#,
1968                "a.txt": "a-contents",
1969                "b.txt": "b-contents",
1970            }),
1971        )
1972        .await;
1973        let project_a = cx_a.update(|cx| {
1974            Project::local(
1975                client_a.clone(),
1976                client_a.user_store.clone(),
1977                lang_registry.clone(),
1978                fs.clone(),
1979                cx,
1980            )
1981        });
1982        let (worktree_a, _) = project_a
1983            .update(&mut cx_a, |p, cx| {
1984                p.find_or_create_local_worktree("/a", false, cx)
1985            })
1986            .await
1987            .unwrap();
1988        worktree_a
1989            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1990            .await;
1991        let project_id = project_a
1992            .update(&mut cx_a, |project, _| project.next_remote_id())
1993            .await;
1994        project_a
1995            .update(&mut cx_a, |project, cx| project.share(cx))
1996            .await
1997            .unwrap();
1998
1999        // Join that project as client B
2000        let _project_b = Project::remote(
2001            project_id,
2002            client_b.clone(),
2003            client_b.user_store.clone(),
2004            lang_registry.clone(),
2005            fs.clone(),
2006            &mut cx_b.to_async(),
2007        )
2008        .await
2009        .unwrap();
2010
2011        // See that a guest has joined as client A.
2012        project_a
2013            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2014            .await;
2015
2016        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2017        client_b.disconnect(&cx_b.to_async()).unwrap();
2018        project_a
2019            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2020            .await;
2021    }
2022
2023    #[gpui::test(iterations = 10)]
2024    async fn test_collaborating_with_diagnostics(
2025        mut cx_a: TestAppContext,
2026        mut cx_b: TestAppContext,
2027    ) {
2028        cx_a.foreground().forbid_parking();
2029        let mut lang_registry = Arc::new(LanguageRegistry::new());
2030        let fs = FakeFs::new(cx_a.background());
2031
2032        // Set up a fake language server.
2033        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2034        Arc::get_mut(&mut lang_registry)
2035            .unwrap()
2036            .add(Arc::new(Language::new(
2037                LanguageConfig {
2038                    name: "Rust".into(),
2039                    path_suffixes: vec!["rs".to_string()],
2040                    language_server: Some(language_server_config),
2041                    ..Default::default()
2042                },
2043                Some(tree_sitter_rust::language()),
2044            )));
2045
2046        // Connect to a server as 2 clients.
2047        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2048        let client_a = server.create_client(&mut cx_a, "user_a").await;
2049        let client_b = server.create_client(&mut cx_b, "user_b").await;
2050
2051        // Share a project as client A
2052        fs.insert_tree(
2053            "/a",
2054            json!({
2055                ".zed.toml": r#"collaborators = ["user_b"]"#,
2056                "a.rs": "let one = two",
2057                "other.rs": "",
2058            }),
2059        )
2060        .await;
2061        let project_a = cx_a.update(|cx| {
2062            Project::local(
2063                client_a.clone(),
2064                client_a.user_store.clone(),
2065                lang_registry.clone(),
2066                fs.clone(),
2067                cx,
2068            )
2069        });
2070        let (worktree_a, _) = project_a
2071            .update(&mut cx_a, |p, cx| {
2072                p.find_or_create_local_worktree("/a", false, cx)
2073            })
2074            .await
2075            .unwrap();
2076        worktree_a
2077            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2078            .await;
2079        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2080        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2081        project_a
2082            .update(&mut cx_a, |p, cx| p.share(cx))
2083            .await
2084            .unwrap();
2085
2086        // Cause the language server to start.
2087        let _ = cx_a
2088            .background()
2089            .spawn(project_a.update(&mut cx_a, |project, cx| {
2090                project.open_buffer(
2091                    ProjectPath {
2092                        worktree_id,
2093                        path: Path::new("other.rs").into(),
2094                    },
2095                    cx,
2096                )
2097            }))
2098            .await
2099            .unwrap();
2100
2101        // Simulate a language server reporting errors for a file.
2102        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2103        fake_language_server
2104            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2105                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2106                version: None,
2107                diagnostics: vec![lsp::Diagnostic {
2108                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2109                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2110                    message: "message 1".to_string(),
2111                    ..Default::default()
2112                }],
2113            })
2114            .await;
2115
2116        // Wait for server to see the diagnostics update.
2117        server
2118            .condition(|store| {
2119                let worktree = store
2120                    .project(project_id)
2121                    .unwrap()
2122                    .worktrees
2123                    .get(&worktree_id.to_proto())
2124                    .unwrap();
2125
2126                !worktree
2127                    .share
2128                    .as_ref()
2129                    .unwrap()
2130                    .diagnostic_summaries
2131                    .is_empty()
2132            })
2133            .await;
2134
2135        // Join the worktree as client B.
2136        let project_b = Project::remote(
2137            project_id,
2138            client_b.clone(),
2139            client_b.user_store.clone(),
2140            lang_registry.clone(),
2141            fs.clone(),
2142            &mut cx_b.to_async(),
2143        )
2144        .await
2145        .unwrap();
2146
2147        project_b.read_with(&cx_b, |project, cx| {
2148            assert_eq!(
2149                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2150                &[(
2151                    ProjectPath {
2152                        worktree_id,
2153                        path: Arc::from(Path::new("a.rs")),
2154                    },
2155                    DiagnosticSummary {
2156                        error_count: 1,
2157                        warning_count: 0,
2158                        ..Default::default()
2159                    },
2160                )]
2161            )
2162        });
2163
2164        // Simulate a language server reporting more errors for a file.
2165        fake_language_server
2166            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2167                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2168                version: None,
2169                diagnostics: vec![
2170                    lsp::Diagnostic {
2171                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2172                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2173                        message: "message 1".to_string(),
2174                        ..Default::default()
2175                    },
2176                    lsp::Diagnostic {
2177                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2178                        range: lsp::Range::new(
2179                            lsp::Position::new(0, 10),
2180                            lsp::Position::new(0, 13),
2181                        ),
2182                        message: "message 2".to_string(),
2183                        ..Default::default()
2184                    },
2185                ],
2186            })
2187            .await;
2188
2189        // Client b gets the updated summaries
2190        project_b
2191            .condition(&cx_b, |project, cx| {
2192                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2193                    == &[(
2194                        ProjectPath {
2195                            worktree_id,
2196                            path: Arc::from(Path::new("a.rs")),
2197                        },
2198                        DiagnosticSummary {
2199                            error_count: 1,
2200                            warning_count: 1,
2201                            ..Default::default()
2202                        },
2203                    )]
2204            })
2205            .await;
2206
2207        // Open the file with the errors on client B. They should be present.
2208        let buffer_b = cx_b
2209            .background()
2210            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2211            .await
2212            .unwrap();
2213
2214        buffer_b.read_with(&cx_b, |buffer, _| {
2215            assert_eq!(
2216                buffer
2217                    .snapshot()
2218                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2219                    .map(|entry| entry)
2220                    .collect::<Vec<_>>(),
2221                &[
2222                    DiagnosticEntry {
2223                        range: Point::new(0, 4)..Point::new(0, 7),
2224                        diagnostic: Diagnostic {
2225                            group_id: 0,
2226                            message: "message 1".to_string(),
2227                            severity: lsp::DiagnosticSeverity::ERROR,
2228                            is_primary: true,
2229                            ..Default::default()
2230                        }
2231                    },
2232                    DiagnosticEntry {
2233                        range: Point::new(0, 10)..Point::new(0, 13),
2234                        diagnostic: Diagnostic {
2235                            group_id: 1,
2236                            severity: lsp::DiagnosticSeverity::WARNING,
2237                            message: "message 2".to_string(),
2238                            is_primary: true,
2239                            ..Default::default()
2240                        }
2241                    }
2242                ]
2243            );
2244        });
2245    }
2246
2247    #[gpui::test(iterations = 10)]
2248    async fn test_collaborating_with_completion(
2249        mut cx_a: TestAppContext,
2250        mut cx_b: TestAppContext,
2251    ) {
2252        cx_a.foreground().forbid_parking();
2253        let mut lang_registry = Arc::new(LanguageRegistry::new());
2254        let fs = FakeFs::new(cx_a.background());
2255
2256        // Set up a fake language server.
2257        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2258        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2259            completion_provider: Some(lsp::CompletionOptions {
2260                trigger_characters: Some(vec![".".to_string()]),
2261                ..Default::default()
2262            }),
2263            ..Default::default()
2264        });
2265        Arc::get_mut(&mut lang_registry)
2266            .unwrap()
2267            .add(Arc::new(Language::new(
2268                LanguageConfig {
2269                    name: "Rust".into(),
2270                    path_suffixes: vec!["rs".to_string()],
2271                    language_server: Some(language_server_config),
2272                    ..Default::default()
2273                },
2274                Some(tree_sitter_rust::language()),
2275            )));
2276
2277        // Connect to a server as 2 clients.
2278        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2279        let client_a = server.create_client(&mut cx_a, "user_a").await;
2280        let client_b = server.create_client(&mut cx_b, "user_b").await;
2281
2282        // Share a project as client A
2283        fs.insert_tree(
2284            "/a",
2285            json!({
2286                ".zed.toml": r#"collaborators = ["user_b"]"#,
2287                "main.rs": "fn main() { a }",
2288                "other.rs": "",
2289            }),
2290        )
2291        .await;
2292        let project_a = cx_a.update(|cx| {
2293            Project::local(
2294                client_a.clone(),
2295                client_a.user_store.clone(),
2296                lang_registry.clone(),
2297                fs.clone(),
2298                cx,
2299            )
2300        });
2301        let (worktree_a, _) = project_a
2302            .update(&mut cx_a, |p, cx| {
2303                p.find_or_create_local_worktree("/a", false, cx)
2304            })
2305            .await
2306            .unwrap();
2307        worktree_a
2308            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2309            .await;
2310        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2311        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2312        project_a
2313            .update(&mut cx_a, |p, cx| p.share(cx))
2314            .await
2315            .unwrap();
2316
2317        // Join the worktree as client B.
2318        let project_b = Project::remote(
2319            project_id,
2320            client_b.clone(),
2321            client_b.user_store.clone(),
2322            lang_registry.clone(),
2323            fs.clone(),
2324            &mut cx_b.to_async(),
2325        )
2326        .await
2327        .unwrap();
2328
2329        // Open a file in an editor as the guest.
2330        let buffer_b = project_b
2331            .update(&mut cx_b, |p, cx| {
2332                p.open_buffer((worktree_id, "main.rs"), cx)
2333            })
2334            .await
2335            .unwrap();
2336        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2337        let editor_b = cx_b.add_view(window_b, |cx| {
2338            Editor::for_buffer(
2339                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2340                Arc::new(|cx| EditorSettings::test(cx)),
2341                Some(project_b.clone()),
2342                cx,
2343            )
2344        });
2345
2346        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2347        buffer_b
2348            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2349            .await;
2350
2351        // Type a completion trigger character as the guest.
2352        editor_b.update(&mut cx_b, |editor, cx| {
2353            editor.select_ranges([13..13], None, cx);
2354            editor.handle_input(&Input(".".into()), cx);
2355            cx.focus(&editor_b);
2356        });
2357
2358        // Receive a completion request as the host's language server.
2359        // Return some completions from the host's language server.
2360        cx_a.foreground().start_waiting();
2361        fake_language_server
2362            .handle_request::<lsp::request::Completion, _>(|params| {
2363                assert_eq!(
2364                    params.text_document_position.text_document.uri,
2365                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2366                );
2367                assert_eq!(
2368                    params.text_document_position.position,
2369                    lsp::Position::new(0, 14),
2370                );
2371
2372                Some(lsp::CompletionResponse::Array(vec![
2373                    lsp::CompletionItem {
2374                        label: "first_method(…)".into(),
2375                        detail: Some("fn(&mut self, B) -> C".into()),
2376                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2377                            new_text: "first_method($1)".to_string(),
2378                            range: lsp::Range::new(
2379                                lsp::Position::new(0, 14),
2380                                lsp::Position::new(0, 14),
2381                            ),
2382                        })),
2383                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2384                        ..Default::default()
2385                    },
2386                    lsp::CompletionItem {
2387                        label: "second_method(…)".into(),
2388                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2389                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2390                            new_text: "second_method()".to_string(),
2391                            range: lsp::Range::new(
2392                                lsp::Position::new(0, 14),
2393                                lsp::Position::new(0, 14),
2394                            ),
2395                        })),
2396                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2397                        ..Default::default()
2398                    },
2399                ]))
2400            })
2401            .next()
2402            .await
2403            .unwrap();
2404        cx_a.foreground().finish_waiting();
2405
2406        // Open the buffer on the host.
2407        let buffer_a = project_a
2408            .update(&mut cx_a, |p, cx| {
2409                p.open_buffer((worktree_id, "main.rs"), cx)
2410            })
2411            .await
2412            .unwrap();
2413        buffer_a
2414            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2415            .await;
2416
2417        // Confirm a completion on the guest.
2418        editor_b
2419            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2420            .await;
2421        editor_b.update(&mut cx_b, |editor, cx| {
2422            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2423            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2424        });
2425
2426        // Return a resolved completion from the host's language server.
2427        // The resolved completion has an additional text edit.
2428        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2429            assert_eq!(params.label, "first_method(…)");
2430            lsp::CompletionItem {
2431                label: "first_method(…)".into(),
2432                detail: Some("fn(&mut self, B) -> C".into()),
2433                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2434                    new_text: "first_method($1)".to_string(),
2435                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2436                })),
2437                additional_text_edits: Some(vec![lsp::TextEdit {
2438                    new_text: "use d::SomeTrait;\n".to_string(),
2439                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2440                }]),
2441                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2442                ..Default::default()
2443            }
2444        });
2445
2446        // The additional edit is applied.
2447        buffer_a
2448            .condition(&cx_a, |buffer, _| {
2449                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2450            })
2451            .await;
2452        buffer_b
2453            .condition(&cx_b, |buffer, _| {
2454                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2455            })
2456            .await;
2457    }
2458
2459    #[gpui::test(iterations = 10)]
2460    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2461        cx_a.foreground().forbid_parking();
2462        let mut lang_registry = Arc::new(LanguageRegistry::new());
2463        let fs = FakeFs::new(cx_a.background());
2464
2465        // Set up a fake language server.
2466        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2467        Arc::get_mut(&mut lang_registry)
2468            .unwrap()
2469            .add(Arc::new(Language::new(
2470                LanguageConfig {
2471                    name: "Rust".into(),
2472                    path_suffixes: vec!["rs".to_string()],
2473                    language_server: Some(language_server_config),
2474                    ..Default::default()
2475                },
2476                Some(tree_sitter_rust::language()),
2477            )));
2478
2479        // Connect to a server as 2 clients.
2480        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2481        let client_a = server.create_client(&mut cx_a, "user_a").await;
2482        let client_b = server.create_client(&mut cx_b, "user_b").await;
2483
2484        // Share a project as client A
2485        fs.insert_tree(
2486            "/a",
2487            json!({
2488                ".zed.toml": r#"collaborators = ["user_b"]"#,
2489                "a.rs": "let one = two",
2490            }),
2491        )
2492        .await;
2493        let project_a = cx_a.update(|cx| {
2494            Project::local(
2495                client_a.clone(),
2496                client_a.user_store.clone(),
2497                lang_registry.clone(),
2498                fs.clone(),
2499                cx,
2500            )
2501        });
2502        let (worktree_a, _) = project_a
2503            .update(&mut cx_a, |p, cx| {
2504                p.find_or_create_local_worktree("/a", false, cx)
2505            })
2506            .await
2507            .unwrap();
2508        worktree_a
2509            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2510            .await;
2511        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2512        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2513        project_a
2514            .update(&mut cx_a, |p, cx| p.share(cx))
2515            .await
2516            .unwrap();
2517
2518        // Join the worktree as client B.
2519        let project_b = Project::remote(
2520            project_id,
2521            client_b.clone(),
2522            client_b.user_store.clone(),
2523            lang_registry.clone(),
2524            fs.clone(),
2525            &mut cx_b.to_async(),
2526        )
2527        .await
2528        .unwrap();
2529
2530        let buffer_b = cx_b
2531            .background()
2532            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2533            .await
2534            .unwrap();
2535
2536        let format = project_b.update(&mut cx_b, |project, cx| {
2537            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2538        });
2539
2540        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2541        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2542            Some(vec![
2543                lsp::TextEdit {
2544                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2545                    new_text: "h".to_string(),
2546                },
2547                lsp::TextEdit {
2548                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2549                    new_text: "y".to_string(),
2550                },
2551            ])
2552        });
2553
2554        format.await.unwrap();
2555        assert_eq!(
2556            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2557            "let honey = two"
2558        );
2559    }
2560
2561    #[gpui::test(iterations = 10)]
2562    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2563        cx_a.foreground().forbid_parking();
2564        let mut lang_registry = Arc::new(LanguageRegistry::new());
2565        let fs = FakeFs::new(cx_a.background());
2566        fs.insert_tree(
2567            "/root-1",
2568            json!({
2569                ".zed.toml": r#"collaborators = ["user_b"]"#,
2570                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2571            }),
2572        )
2573        .await;
2574        fs.insert_tree(
2575            "/root-2",
2576            json!({
2577                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2578            }),
2579        )
2580        .await;
2581
2582        // Set up a fake language server.
2583        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2584        Arc::get_mut(&mut lang_registry)
2585            .unwrap()
2586            .add(Arc::new(Language::new(
2587                LanguageConfig {
2588                    name: "Rust".into(),
2589                    path_suffixes: vec!["rs".to_string()],
2590                    language_server: Some(language_server_config),
2591                    ..Default::default()
2592                },
2593                Some(tree_sitter_rust::language()),
2594            )));
2595
2596        // Connect to a server as 2 clients.
2597        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2598        let client_a = server.create_client(&mut cx_a, "user_a").await;
2599        let client_b = server.create_client(&mut cx_b, "user_b").await;
2600
2601        // Share a project as client A
2602        let project_a = cx_a.update(|cx| {
2603            Project::local(
2604                client_a.clone(),
2605                client_a.user_store.clone(),
2606                lang_registry.clone(),
2607                fs.clone(),
2608                cx,
2609            )
2610        });
2611        let (worktree_a, _) = project_a
2612            .update(&mut cx_a, |p, cx| {
2613                p.find_or_create_local_worktree("/root-1", false, cx)
2614            })
2615            .await
2616            .unwrap();
2617        worktree_a
2618            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2619            .await;
2620        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2621        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2622        project_a
2623            .update(&mut cx_a, |p, cx| p.share(cx))
2624            .await
2625            .unwrap();
2626
2627        // Join the worktree as client B.
2628        let project_b = Project::remote(
2629            project_id,
2630            client_b.clone(),
2631            client_b.user_store.clone(),
2632            lang_registry.clone(),
2633            fs.clone(),
2634            &mut cx_b.to_async(),
2635        )
2636        .await
2637        .unwrap();
2638
2639        // Open the file on client B.
2640        let buffer_b = cx_b
2641            .background()
2642            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2643            .await
2644            .unwrap();
2645
2646        // Request the definition of a symbol as the guest.
2647        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2648
2649        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2650        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2651            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2652                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2653                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2654            )))
2655        });
2656
2657        let definitions_1 = definitions_1.await.unwrap();
2658        cx_b.read(|cx| {
2659            assert_eq!(definitions_1.len(), 1);
2660            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2661            let target_buffer = definitions_1[0].target_buffer.read(cx);
2662            assert_eq!(
2663                target_buffer.text(),
2664                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2665            );
2666            assert_eq!(
2667                definitions_1[0].target_range.to_point(target_buffer),
2668                Point::new(0, 6)..Point::new(0, 9)
2669            );
2670        });
2671
2672        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2673        // the previous call to `definition`.
2674        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2675        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2676            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2677                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2678                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2679            )))
2680        });
2681
2682        let definitions_2 = definitions_2.await.unwrap();
2683        cx_b.read(|cx| {
2684            assert_eq!(definitions_2.len(), 1);
2685            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2686            let target_buffer = definitions_2[0].target_buffer.read(cx);
2687            assert_eq!(
2688                target_buffer.text(),
2689                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2690            );
2691            assert_eq!(
2692                definitions_2[0].target_range.to_point(target_buffer),
2693                Point::new(1, 6)..Point::new(1, 11)
2694            );
2695        });
2696        assert_eq!(
2697            definitions_1[0].target_buffer,
2698            definitions_2[0].target_buffer
2699        );
2700
2701        cx_b.update(|_| {
2702            drop(definitions_1);
2703            drop(definitions_2);
2704        });
2705        project_b
2706            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2707            .await;
2708    }
2709
2710    #[gpui::test(iterations = 10)]
2711    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2712        mut cx_a: TestAppContext,
2713        mut cx_b: TestAppContext,
2714        mut rng: StdRng,
2715    ) {
2716        cx_a.foreground().forbid_parking();
2717        let mut lang_registry = Arc::new(LanguageRegistry::new());
2718        let fs = FakeFs::new(cx_a.background());
2719        fs.insert_tree(
2720            "/root",
2721            json!({
2722                ".zed.toml": r#"collaborators = ["user_b"]"#,
2723                "a.rs": "const ONE: usize = b::TWO;",
2724                "b.rs": "const TWO: usize = 2",
2725            }),
2726        )
2727        .await;
2728
2729        // Set up a fake language server.
2730        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2731
2732        Arc::get_mut(&mut lang_registry)
2733            .unwrap()
2734            .add(Arc::new(Language::new(
2735                LanguageConfig {
2736                    name: "Rust".into(),
2737                    path_suffixes: vec!["rs".to_string()],
2738                    language_server: Some(language_server_config),
2739                    ..Default::default()
2740                },
2741                Some(tree_sitter_rust::language()),
2742            )));
2743
2744        // Connect to a server as 2 clients.
2745        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2746        let client_a = server.create_client(&mut cx_a, "user_a").await;
2747        let client_b = server.create_client(&mut cx_b, "user_b").await;
2748
2749        // Share a project as client A
2750        let project_a = cx_a.update(|cx| {
2751            Project::local(
2752                client_a.clone(),
2753                client_a.user_store.clone(),
2754                lang_registry.clone(),
2755                fs.clone(),
2756                cx,
2757            )
2758        });
2759
2760        let (worktree_a, _) = project_a
2761            .update(&mut cx_a, |p, cx| {
2762                p.find_or_create_local_worktree("/root", false, cx)
2763            })
2764            .await
2765            .unwrap();
2766        worktree_a
2767            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2768            .await;
2769        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2770        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2771        project_a
2772            .update(&mut cx_a, |p, cx| p.share(cx))
2773            .await
2774            .unwrap();
2775
2776        // Join the worktree as client B.
2777        let project_b = Project::remote(
2778            project_id,
2779            client_b.clone(),
2780            client_b.user_store.clone(),
2781            lang_registry.clone(),
2782            fs.clone(),
2783            &mut cx_b.to_async(),
2784        )
2785        .await
2786        .unwrap();
2787
2788        let buffer_b1 = cx_b
2789            .background()
2790            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2791            .await
2792            .unwrap();
2793
2794        let definitions;
2795        let buffer_b2;
2796        if rng.gen() {
2797            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2798            buffer_b2 =
2799                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2800        } else {
2801            buffer_b2 =
2802                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2803            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2804        }
2805
2806        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2807        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2808            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2809                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2810                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2811            )))
2812        });
2813
2814        let buffer_b2 = buffer_b2.await.unwrap();
2815        let definitions = definitions.await.unwrap();
2816        assert_eq!(definitions.len(), 1);
2817        assert_eq!(definitions[0].target_buffer, buffer_b2);
2818    }
2819
2820    #[gpui::test(iterations = 10)]
2821    async fn test_collaborating_with_code_actions(
2822        mut cx_a: TestAppContext,
2823        mut cx_b: TestAppContext,
2824    ) {
2825        cx_a.foreground().forbid_parking();
2826        let mut lang_registry = Arc::new(LanguageRegistry::new());
2827        let fs = FakeFs::new(cx_a.background());
2828        let mut path_openers_b = Vec::new();
2829        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2830
2831        // Set up a fake language server.
2832        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2833        Arc::get_mut(&mut lang_registry)
2834            .unwrap()
2835            .add(Arc::new(Language::new(
2836                LanguageConfig {
2837                    name: "Rust".into(),
2838                    path_suffixes: vec!["rs".to_string()],
2839                    language_server: Some(language_server_config),
2840                    ..Default::default()
2841                },
2842                Some(tree_sitter_rust::language()),
2843            )));
2844
2845        // Connect to a server as 2 clients.
2846        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2847        let client_a = server.create_client(&mut cx_a, "user_a").await;
2848        let client_b = server.create_client(&mut cx_b, "user_b").await;
2849
2850        // Share a project as client A
2851        fs.insert_tree(
2852            "/a",
2853            json!({
2854                ".zed.toml": r#"collaborators = ["user_b"]"#,
2855                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2856                "other.rs": "pub fn foo() -> usize { 4 }",
2857            }),
2858        )
2859        .await;
2860        let project_a = cx_a.update(|cx| {
2861            Project::local(
2862                client_a.clone(),
2863                client_a.user_store.clone(),
2864                lang_registry.clone(),
2865                fs.clone(),
2866                cx,
2867            )
2868        });
2869        let (worktree_a, _) = project_a
2870            .update(&mut cx_a, |p, cx| {
2871                p.find_or_create_local_worktree("/a", false, cx)
2872            })
2873            .await
2874            .unwrap();
2875        worktree_a
2876            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2877            .await;
2878        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2879        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2880        project_a
2881            .update(&mut cx_a, |p, cx| p.share(cx))
2882            .await
2883            .unwrap();
2884
2885        // Join the worktree as client B.
2886        let project_b = Project::remote(
2887            project_id,
2888            client_b.clone(),
2889            client_b.user_store.clone(),
2890            lang_registry.clone(),
2891            fs.clone(),
2892            &mut cx_b.to_async(),
2893        )
2894        .await
2895        .unwrap();
2896        let mut params = cx_b.update(WorkspaceParams::test);
2897        params.languages = lang_registry.clone();
2898        params.client = client_b.client.clone();
2899        params.user_store = client_b.user_store.clone();
2900        params.project = project_b;
2901        params.path_openers = path_openers_b.into();
2902
2903        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2904        let editor_b = workspace_b
2905            .update(&mut cx_b, |workspace, cx| {
2906                workspace.open_path((worktree_id, "main.rs").into(), cx)
2907            })
2908            .await
2909            .unwrap()
2910            .downcast::<Editor>()
2911            .unwrap();
2912
2913        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2914        fake_language_server
2915            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2916                assert_eq!(
2917                    params.text_document.uri,
2918                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2919                );
2920                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2921                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2922                None
2923            })
2924            .next()
2925            .await;
2926
2927        // Move cursor to a location that contains code actions.
2928        editor_b.update(&mut cx_b, |editor, cx| {
2929            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2930            cx.focus(&editor_b);
2931        });
2932
2933        fake_language_server
2934            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2935                assert_eq!(
2936                    params.text_document.uri,
2937                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2938                );
2939                assert_eq!(params.range.start, lsp::Position::new(1, 31));
2940                assert_eq!(params.range.end, lsp::Position::new(1, 31));
2941
2942                Some(vec![lsp::CodeActionOrCommand::CodeAction(
2943                    lsp::CodeAction {
2944                        title: "Inline into all callers".to_string(),
2945                        edit: Some(lsp::WorkspaceEdit {
2946                            changes: Some(
2947                                [
2948                                    (
2949                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
2950                                        vec![lsp::TextEdit::new(
2951                                            lsp::Range::new(
2952                                                lsp::Position::new(1, 22),
2953                                                lsp::Position::new(1, 34),
2954                                            ),
2955                                            "4".to_string(),
2956                                        )],
2957                                    ),
2958                                    (
2959                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
2960                                        vec![lsp::TextEdit::new(
2961                                            lsp::Range::new(
2962                                                lsp::Position::new(0, 0),
2963                                                lsp::Position::new(0, 27),
2964                                            ),
2965                                            "".to_string(),
2966                                        )],
2967                                    ),
2968                                ]
2969                                .into_iter()
2970                                .collect(),
2971                            ),
2972                            ..Default::default()
2973                        }),
2974                        data: Some(json!({
2975                            "codeActionParams": {
2976                                "range": {
2977                                    "start": {"line": 1, "column": 31},
2978                                    "end": {"line": 1, "column": 31},
2979                                }
2980                            }
2981                        })),
2982                        ..Default::default()
2983                    },
2984                )])
2985            })
2986            .next()
2987            .await;
2988
2989        // Toggle code actions and wait for them to display.
2990        editor_b.update(&mut cx_b, |editor, cx| {
2991            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2992        });
2993        editor_b
2994            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2995            .await;
2996
2997        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
2998
2999        // Confirming the code action will trigger a resolve request.
3000        let confirm_action = workspace_b
3001            .update(&mut cx_b, |workspace, cx| {
3002                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3003            })
3004            .unwrap();
3005        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
3006            lsp::CodeAction {
3007                title: "Inline into all callers".to_string(),
3008                edit: Some(lsp::WorkspaceEdit {
3009                    changes: Some(
3010                        [
3011                            (
3012                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3013                                vec![lsp::TextEdit::new(
3014                                    lsp::Range::new(
3015                                        lsp::Position::new(1, 22),
3016                                        lsp::Position::new(1, 34),
3017                                    ),
3018                                    "4".to_string(),
3019                                )],
3020                            ),
3021                            (
3022                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3023                                vec![lsp::TextEdit::new(
3024                                    lsp::Range::new(
3025                                        lsp::Position::new(0, 0),
3026                                        lsp::Position::new(0, 27),
3027                                    ),
3028                                    "".to_string(),
3029                                )],
3030                            ),
3031                        ]
3032                        .into_iter()
3033                        .collect(),
3034                    ),
3035                    ..Default::default()
3036                }),
3037                ..Default::default()
3038            }
3039        });
3040
3041        // After the action is confirmed, an editor containing both modified files is opened.
3042        confirm_action.await.unwrap();
3043        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3044            workspace
3045                .active_item(cx)
3046                .unwrap()
3047                .downcast::<Editor>()
3048                .unwrap()
3049        });
3050        code_action_editor.update(&mut cx_b, |editor, cx| {
3051            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3052            editor.undo(&Undo, cx);
3053            assert_eq!(
3054                editor.text(cx),
3055                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3056            );
3057            editor.redo(&Redo, cx);
3058            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3059        });
3060    }
3061
3062    #[gpui::test(iterations = 10)]
3063    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3064        cx_a.foreground().forbid_parking();
3065        let mut lang_registry = Arc::new(LanguageRegistry::new());
3066        let fs = FakeFs::new(cx_a.background());
3067        let mut path_openers_b = Vec::new();
3068        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3069
3070        // Set up a fake language server.
3071        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3072        Arc::get_mut(&mut lang_registry)
3073            .unwrap()
3074            .add(Arc::new(Language::new(
3075                LanguageConfig {
3076                    name: "Rust".into(),
3077                    path_suffixes: vec!["rs".to_string()],
3078                    language_server: Some(language_server_config),
3079                    ..Default::default()
3080                },
3081                Some(tree_sitter_rust::language()),
3082            )));
3083
3084        // Connect to a server as 2 clients.
3085        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3086        let client_a = server.create_client(&mut cx_a, "user_a").await;
3087        let client_b = server.create_client(&mut cx_b, "user_b").await;
3088
3089        // Share a project as client A
3090        fs.insert_tree(
3091            "/dir",
3092            json!({
3093                ".zed.toml": r#"collaborators = ["user_b"]"#,
3094                "one.rs": "const ONE: usize = 1;",
3095                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3096            }),
3097        )
3098        .await;
3099        let project_a = cx_a.update(|cx| {
3100            Project::local(
3101                client_a.clone(),
3102                client_a.user_store.clone(),
3103                lang_registry.clone(),
3104                fs.clone(),
3105                cx,
3106            )
3107        });
3108        let (worktree_a, _) = project_a
3109            .update(&mut cx_a, |p, cx| {
3110                p.find_or_create_local_worktree("/dir", false, cx)
3111            })
3112            .await
3113            .unwrap();
3114        worktree_a
3115            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3116            .await;
3117        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3118        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3119        project_a
3120            .update(&mut cx_a, |p, cx| p.share(cx))
3121            .await
3122            .unwrap();
3123
3124        // Join the worktree as client B.
3125        let project_b = Project::remote(
3126            project_id,
3127            client_b.clone(),
3128            client_b.user_store.clone(),
3129            lang_registry.clone(),
3130            fs.clone(),
3131            &mut cx_b.to_async(),
3132        )
3133        .await
3134        .unwrap();
3135        let mut params = cx_b.update(WorkspaceParams::test);
3136        params.languages = lang_registry.clone();
3137        params.client = client_b.client.clone();
3138        params.user_store = client_b.user_store.clone();
3139        params.project = project_b;
3140        params.path_openers = path_openers_b.into();
3141
3142        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3143        let editor_b = workspace_b
3144            .update(&mut cx_b, |workspace, cx| {
3145                workspace.open_path((worktree_id, "one.rs").into(), cx)
3146            })
3147            .await
3148            .unwrap()
3149            .downcast::<Editor>()
3150            .unwrap();
3151        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3152
3153        // Move cursor to a location that can be renamed.
3154        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3155            editor.select_ranges([7..7], None, cx);
3156            editor.rename(&Rename, cx).unwrap()
3157        });
3158
3159        fake_language_server
3160            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
3161                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3162                assert_eq!(params.position, lsp::Position::new(0, 7));
3163                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3164                    lsp::Position::new(0, 6),
3165                    lsp::Position::new(0, 9),
3166                )))
3167            })
3168            .next()
3169            .await
3170            .unwrap();
3171        prepare_rename.await.unwrap();
3172        editor_b.update(&mut cx_b, |editor, cx| {
3173            let rename = editor.pending_rename().unwrap();
3174            let buffer = editor.buffer().read(cx).snapshot(cx);
3175            assert_eq!(
3176                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3177                6..9
3178            );
3179            rename.editor.update(cx, |rename_editor, cx| {
3180                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3181                    rename_buffer.edit([0..3], "THREE", cx);
3182                });
3183            });
3184        });
3185
3186        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3187            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3188        });
3189        fake_language_server
3190            .handle_request::<lsp::request::Rename, _>(|params| {
3191                assert_eq!(
3192                    params.text_document_position.text_document.uri.as_str(),
3193                    "file:///dir/one.rs"
3194                );
3195                assert_eq!(
3196                    params.text_document_position.position,
3197                    lsp::Position::new(0, 6)
3198                );
3199                assert_eq!(params.new_name, "THREE");
3200                Some(lsp::WorkspaceEdit {
3201                    changes: Some(
3202                        [
3203                            (
3204                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3205                                vec![lsp::TextEdit::new(
3206                                    lsp::Range::new(
3207                                        lsp::Position::new(0, 6),
3208                                        lsp::Position::new(0, 9),
3209                                    ),
3210                                    "THREE".to_string(),
3211                                )],
3212                            ),
3213                            (
3214                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3215                                vec![
3216                                    lsp::TextEdit::new(
3217                                        lsp::Range::new(
3218                                            lsp::Position::new(0, 24),
3219                                            lsp::Position::new(0, 27),
3220                                        ),
3221                                        "THREE".to_string(),
3222                                    ),
3223                                    lsp::TextEdit::new(
3224                                        lsp::Range::new(
3225                                            lsp::Position::new(0, 35),
3226                                            lsp::Position::new(0, 38),
3227                                        ),
3228                                        "THREE".to_string(),
3229                                    ),
3230                                ],
3231                            ),
3232                        ]
3233                        .into_iter()
3234                        .collect(),
3235                    ),
3236                    ..Default::default()
3237                })
3238            })
3239            .next()
3240            .await
3241            .unwrap();
3242        confirm_rename.await.unwrap();
3243
3244        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3245            workspace
3246                .active_item(cx)
3247                .unwrap()
3248                .downcast::<Editor>()
3249                .unwrap()
3250        });
3251        rename_editor.update(&mut cx_b, |editor, cx| {
3252            assert_eq!(
3253                editor.text(cx),
3254                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3255            );
3256            editor.undo(&Undo, cx);
3257            assert_eq!(
3258                editor.text(cx),
3259                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3260            );
3261            editor.redo(&Redo, cx);
3262            assert_eq!(
3263                editor.text(cx),
3264                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3265            );
3266        });
3267
3268        // Ensure temporary rename edits cannot be undone/redone.
3269        editor_b.update(&mut cx_b, |editor, cx| {
3270            editor.undo(&Undo, cx);
3271            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3272            editor.undo(&Undo, cx);
3273            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3274            editor.redo(&Redo, cx);
3275            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3276        })
3277    }
3278
3279    #[gpui::test(iterations = 10)]
3280    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3281        cx_a.foreground().forbid_parking();
3282
3283        // Connect to a server as 2 clients.
3284        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3285        let client_a = server.create_client(&mut cx_a, "user_a").await;
3286        let client_b = server.create_client(&mut cx_b, "user_b").await;
3287
3288        // Create an org that includes these 2 users.
3289        let db = &server.app_state.db;
3290        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3291        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3292            .await
3293            .unwrap();
3294        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3295            .await
3296            .unwrap();
3297
3298        // Create a channel that includes all the users.
3299        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3300        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3301            .await
3302            .unwrap();
3303        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3304            .await
3305            .unwrap();
3306        db.create_channel_message(
3307            channel_id,
3308            client_b.current_user_id(&cx_b),
3309            "hello A, it's B.",
3310            OffsetDateTime::now_utc(),
3311            1,
3312        )
3313        .await
3314        .unwrap();
3315
3316        let channels_a = cx_a
3317            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3318        channels_a
3319            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3320            .await;
3321        channels_a.read_with(&cx_a, |list, _| {
3322            assert_eq!(
3323                list.available_channels().unwrap(),
3324                &[ChannelDetails {
3325                    id: channel_id.to_proto(),
3326                    name: "test-channel".to_string()
3327                }]
3328            )
3329        });
3330        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3331            this.get_channel(channel_id.to_proto(), cx).unwrap()
3332        });
3333        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3334        channel_a
3335            .condition(&cx_a, |channel, _| {
3336                channel_messages(channel)
3337                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3338            })
3339            .await;
3340
3341        let channels_b = cx_b
3342            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3343        channels_b
3344            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3345            .await;
3346        channels_b.read_with(&cx_b, |list, _| {
3347            assert_eq!(
3348                list.available_channels().unwrap(),
3349                &[ChannelDetails {
3350                    id: channel_id.to_proto(),
3351                    name: "test-channel".to_string()
3352                }]
3353            )
3354        });
3355
3356        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3357            this.get_channel(channel_id.to_proto(), cx).unwrap()
3358        });
3359        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3360        channel_b
3361            .condition(&cx_b, |channel, _| {
3362                channel_messages(channel)
3363                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3364            })
3365            .await;
3366
3367        channel_a
3368            .update(&mut cx_a, |channel, cx| {
3369                channel
3370                    .send_message("oh, hi B.".to_string(), cx)
3371                    .unwrap()
3372                    .detach();
3373                let task = channel.send_message("sup".to_string(), cx).unwrap();
3374                assert_eq!(
3375                    channel_messages(channel),
3376                    &[
3377                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3378                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3379                        ("user_a".to_string(), "sup".to_string(), true)
3380                    ]
3381                );
3382                task
3383            })
3384            .await
3385            .unwrap();
3386
3387        channel_b
3388            .condition(&cx_b, |channel, _| {
3389                channel_messages(channel)
3390                    == [
3391                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3392                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3393                        ("user_a".to_string(), "sup".to_string(), false),
3394                    ]
3395            })
3396            .await;
3397
3398        assert_eq!(
3399            server
3400                .state()
3401                .await
3402                .channel(channel_id)
3403                .unwrap()
3404                .connection_ids
3405                .len(),
3406            2
3407        );
3408        cx_b.update(|_| drop(channel_b));
3409        server
3410            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3411            .await;
3412
3413        cx_a.update(|_| drop(channel_a));
3414        server
3415            .condition(|state| state.channel(channel_id).is_none())
3416            .await;
3417    }
3418
3419    #[gpui::test(iterations = 10)]
3420    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3421        cx_a.foreground().forbid_parking();
3422
3423        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3424        let client_a = server.create_client(&mut cx_a, "user_a").await;
3425
3426        let db = &server.app_state.db;
3427        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3428        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3429        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3430            .await
3431            .unwrap();
3432        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3433            .await
3434            .unwrap();
3435
3436        let channels_a = cx_a
3437            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3438        channels_a
3439            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3440            .await;
3441        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3442            this.get_channel(channel_id.to_proto(), cx).unwrap()
3443        });
3444
3445        // Messages aren't allowed to be too long.
3446        channel_a
3447            .update(&mut cx_a, |channel, cx| {
3448                let long_body = "this is long.\n".repeat(1024);
3449                channel.send_message(long_body, cx).unwrap()
3450            })
3451            .await
3452            .unwrap_err();
3453
3454        // Messages aren't allowed to be blank.
3455        channel_a.update(&mut cx_a, |channel, cx| {
3456            channel.send_message(String::new(), cx).unwrap_err()
3457        });
3458
3459        // Leading and trailing whitespace are trimmed.
3460        channel_a
3461            .update(&mut cx_a, |channel, cx| {
3462                channel
3463                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3464                    .unwrap()
3465            })
3466            .await
3467            .unwrap();
3468        assert_eq!(
3469            db.get_channel_messages(channel_id, 10, None)
3470                .await
3471                .unwrap()
3472                .iter()
3473                .map(|m| &m.body)
3474                .collect::<Vec<_>>(),
3475            &["surrounded by whitespace"]
3476        );
3477    }
3478
3479    #[gpui::test(iterations = 10)]
3480    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3481        cx_a.foreground().forbid_parking();
3482
3483        // Connect to a server as 2 clients.
3484        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3485        let client_a = server.create_client(&mut cx_a, "user_a").await;
3486        let client_b = server.create_client(&mut cx_b, "user_b").await;
3487        let mut status_b = client_b.status();
3488
3489        // Create an org that includes these 2 users.
3490        let db = &server.app_state.db;
3491        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3492        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3493            .await
3494            .unwrap();
3495        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3496            .await
3497            .unwrap();
3498
3499        // Create a channel that includes all the users.
3500        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3501        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3502            .await
3503            .unwrap();
3504        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3505            .await
3506            .unwrap();
3507        db.create_channel_message(
3508            channel_id,
3509            client_b.current_user_id(&cx_b),
3510            "hello A, it's B.",
3511            OffsetDateTime::now_utc(),
3512            2,
3513        )
3514        .await
3515        .unwrap();
3516
3517        let channels_a = cx_a
3518            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3519        channels_a
3520            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3521            .await;
3522
3523        channels_a.read_with(&cx_a, |list, _| {
3524            assert_eq!(
3525                list.available_channels().unwrap(),
3526                &[ChannelDetails {
3527                    id: channel_id.to_proto(),
3528                    name: "test-channel".to_string()
3529                }]
3530            )
3531        });
3532        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3533            this.get_channel(channel_id.to_proto(), cx).unwrap()
3534        });
3535        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3536        channel_a
3537            .condition(&cx_a, |channel, _| {
3538                channel_messages(channel)
3539                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3540            })
3541            .await;
3542
3543        let channels_b = cx_b
3544            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3545        channels_b
3546            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3547            .await;
3548        channels_b.read_with(&cx_b, |list, _| {
3549            assert_eq!(
3550                list.available_channels().unwrap(),
3551                &[ChannelDetails {
3552                    id: channel_id.to_proto(),
3553                    name: "test-channel".to_string()
3554                }]
3555            )
3556        });
3557
3558        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3559            this.get_channel(channel_id.to_proto(), cx).unwrap()
3560        });
3561        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3562        channel_b
3563            .condition(&cx_b, |channel, _| {
3564                channel_messages(channel)
3565                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3566            })
3567            .await;
3568
3569        // Disconnect client B, ensuring we can still access its cached channel data.
3570        server.forbid_connections();
3571        server.disconnect_client(client_b.current_user_id(&cx_b));
3572        while !matches!(
3573            status_b.next().await,
3574            Some(client::Status::ReconnectionError { .. })
3575        ) {}
3576
3577        channels_b.read_with(&cx_b, |channels, _| {
3578            assert_eq!(
3579                channels.available_channels().unwrap(),
3580                [ChannelDetails {
3581                    id: channel_id.to_proto(),
3582                    name: "test-channel".to_string()
3583                }]
3584            )
3585        });
3586        channel_b.read_with(&cx_b, |channel, _| {
3587            assert_eq!(
3588                channel_messages(channel),
3589                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3590            )
3591        });
3592
3593        // Send a message from client B while it is disconnected.
3594        channel_b
3595            .update(&mut cx_b, |channel, cx| {
3596                let task = channel
3597                    .send_message("can you see this?".to_string(), cx)
3598                    .unwrap();
3599                assert_eq!(
3600                    channel_messages(channel),
3601                    &[
3602                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3603                        ("user_b".to_string(), "can you see this?".to_string(), true)
3604                    ]
3605                );
3606                task
3607            })
3608            .await
3609            .unwrap_err();
3610
3611        // Send a message from client A while B is disconnected.
3612        channel_a
3613            .update(&mut cx_a, |channel, cx| {
3614                channel
3615                    .send_message("oh, hi B.".to_string(), cx)
3616                    .unwrap()
3617                    .detach();
3618                let task = channel.send_message("sup".to_string(), cx).unwrap();
3619                assert_eq!(
3620                    channel_messages(channel),
3621                    &[
3622                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3623                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3624                        ("user_a".to_string(), "sup".to_string(), true)
3625                    ]
3626                );
3627                task
3628            })
3629            .await
3630            .unwrap();
3631
3632        // Give client B a chance to reconnect.
3633        server.allow_connections();
3634        cx_b.foreground().advance_clock(Duration::from_secs(10));
3635
3636        // Verify that B sees the new messages upon reconnection, as well as the message client B
3637        // sent while offline.
3638        channel_b
3639            .condition(&cx_b, |channel, _| {
3640                channel_messages(channel)
3641                    == [
3642                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3643                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3644                        ("user_a".to_string(), "sup".to_string(), false),
3645                        ("user_b".to_string(), "can you see this?".to_string(), false),
3646                    ]
3647            })
3648            .await;
3649
3650        // Ensure client A and B can communicate normally after reconnection.
3651        channel_a
3652            .update(&mut cx_a, |channel, cx| {
3653                channel.send_message("you online?".to_string(), cx).unwrap()
3654            })
3655            .await
3656            .unwrap();
3657        channel_b
3658            .condition(&cx_b, |channel, _| {
3659                channel_messages(channel)
3660                    == [
3661                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3662                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3663                        ("user_a".to_string(), "sup".to_string(), false),
3664                        ("user_b".to_string(), "can you see this?".to_string(), false),
3665                        ("user_a".to_string(), "you online?".to_string(), false),
3666                    ]
3667            })
3668            .await;
3669
3670        channel_b
3671            .update(&mut cx_b, |channel, cx| {
3672                channel.send_message("yep".to_string(), cx).unwrap()
3673            })
3674            .await
3675            .unwrap();
3676        channel_a
3677            .condition(&cx_a, |channel, _| {
3678                channel_messages(channel)
3679                    == [
3680                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3681                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3682                        ("user_a".to_string(), "sup".to_string(), false),
3683                        ("user_b".to_string(), "can you see this?".to_string(), false),
3684                        ("user_a".to_string(), "you online?".to_string(), false),
3685                        ("user_b".to_string(), "yep".to_string(), false),
3686                    ]
3687            })
3688            .await;
3689    }
3690
3691    #[gpui::test(iterations = 10)]
3692    async fn test_contacts(
3693        mut cx_a: TestAppContext,
3694        mut cx_b: TestAppContext,
3695        mut cx_c: TestAppContext,
3696    ) {
3697        cx_a.foreground().forbid_parking();
3698        let lang_registry = Arc::new(LanguageRegistry::new());
3699        let fs = FakeFs::new(cx_a.background());
3700
3701        // Connect to a server as 3 clients.
3702        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3703        let client_a = server.create_client(&mut cx_a, "user_a").await;
3704        let client_b = server.create_client(&mut cx_b, "user_b").await;
3705        let client_c = server.create_client(&mut cx_c, "user_c").await;
3706
3707        // Share a worktree as client A.
3708        fs.insert_tree(
3709            "/a",
3710            json!({
3711                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3712            }),
3713        )
3714        .await;
3715
3716        let project_a = cx_a.update(|cx| {
3717            Project::local(
3718                client_a.clone(),
3719                client_a.user_store.clone(),
3720                lang_registry.clone(),
3721                fs.clone(),
3722                cx,
3723            )
3724        });
3725        let (worktree_a, _) = project_a
3726            .update(&mut cx_a, |p, cx| {
3727                p.find_or_create_local_worktree("/a", false, cx)
3728            })
3729            .await
3730            .unwrap();
3731        worktree_a
3732            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3733            .await;
3734
3735        client_a
3736            .user_store
3737            .condition(&cx_a, |user_store, _| {
3738                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3739            })
3740            .await;
3741        client_b
3742            .user_store
3743            .condition(&cx_b, |user_store, _| {
3744                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3745            })
3746            .await;
3747        client_c
3748            .user_store
3749            .condition(&cx_c, |user_store, _| {
3750                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3751            })
3752            .await;
3753
3754        let project_id = project_a
3755            .update(&mut cx_a, |project, _| project.next_remote_id())
3756            .await;
3757        project_a
3758            .update(&mut cx_a, |project, cx| project.share(cx))
3759            .await
3760            .unwrap();
3761
3762        let _project_b = Project::remote(
3763            project_id,
3764            client_b.clone(),
3765            client_b.user_store.clone(),
3766            lang_registry.clone(),
3767            fs.clone(),
3768            &mut cx_b.to_async(),
3769        )
3770        .await
3771        .unwrap();
3772
3773        client_a
3774            .user_store
3775            .condition(&cx_a, |user_store, _| {
3776                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3777            })
3778            .await;
3779        client_b
3780            .user_store
3781            .condition(&cx_b, |user_store, _| {
3782                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3783            })
3784            .await;
3785        client_c
3786            .user_store
3787            .condition(&cx_c, |user_store, _| {
3788                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3789            })
3790            .await;
3791
3792        project_a
3793            .condition(&cx_a, |project, _| {
3794                project.collaborators().contains_key(&client_b.peer_id)
3795            })
3796            .await;
3797
3798        cx_a.update(move |_| drop(project_a));
3799        client_a
3800            .user_store
3801            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3802            .await;
3803        client_b
3804            .user_store
3805            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3806            .await;
3807        client_c
3808            .user_store
3809            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3810            .await;
3811
3812        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3813            user_store
3814                .contacts()
3815                .iter()
3816                .map(|contact| {
3817                    let worktrees = contact
3818                        .projects
3819                        .iter()
3820                        .map(|p| {
3821                            (
3822                                p.worktree_root_names[0].as_str(),
3823                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3824                            )
3825                        })
3826                        .collect();
3827                    (contact.user.github_login.as_str(), worktrees)
3828                })
3829                .collect()
3830        }
3831    }
3832
3833    #[gpui::test(iterations = 100)]
3834    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
3835        cx.foreground().forbid_parking();
3836        let max_peers = env::var("MAX_PEERS")
3837            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3838            .unwrap_or(5);
3839        let max_operations = env::var("OPERATIONS")
3840            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3841            .unwrap_or(10);
3842
3843        let rng = Arc::new(Mutex::new(rng));
3844
3845        let guest_lang_registry = Arc::new(LanguageRegistry::new());
3846        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
3847
3848        let fs = FakeFs::new(cx.background());
3849        fs.insert_tree(
3850            "/_collab",
3851            json!({
3852                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3853            }),
3854        )
3855        .await;
3856
3857        let operations = Rc::new(Cell::new(0));
3858        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
3859        let mut clients = Vec::new();
3860
3861        let mut next_entity_id = 100000;
3862        let mut host_cx = TestAppContext::new(
3863            cx.foreground_platform(),
3864            cx.platform(),
3865            cx.foreground(),
3866            cx.background(),
3867            cx.font_cache(),
3868            next_entity_id,
3869        );
3870        let host = server.create_client(&mut host_cx, "host").await;
3871        let host_project = host_cx.update(|cx| {
3872            Project::local(
3873                host.client.clone(),
3874                host.user_store.clone(),
3875                Arc::new(LanguageRegistry::new()),
3876                fs.clone(),
3877                cx,
3878            )
3879        });
3880        let host_project_id = host_project
3881            .update(&mut host_cx, |p, _| p.next_remote_id())
3882            .await;
3883
3884        let (collab_worktree, _) = host_project
3885            .update(&mut host_cx, |project, cx| {
3886                project.find_or_create_local_worktree("/_collab", false, cx)
3887            })
3888            .await
3889            .unwrap();
3890        collab_worktree
3891            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3892            .await;
3893        host_project
3894            .update(&mut host_cx, |project, cx| project.share(cx))
3895            .await
3896            .unwrap();
3897
3898        clients.push(cx.foreground().spawn(host.simulate_host(
3899            host_project.clone(),
3900            language_server_config,
3901            operations.clone(),
3902            max_operations,
3903            rng.clone(),
3904            host_cx.clone(),
3905        )));
3906
3907        while operations.get() < max_operations {
3908            cx.background().simulate_random_delay().await;
3909            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
3910                operations.set(operations.get() + 1);
3911
3912                let guest_id = clients.len();
3913                log::info!("Adding guest {}", guest_id);
3914                next_entity_id += 100000;
3915                let mut guest_cx = TestAppContext::new(
3916                    cx.foreground_platform(),
3917                    cx.platform(),
3918                    cx.foreground(),
3919                    cx.background(),
3920                    cx.font_cache(),
3921                    next_entity_id,
3922                );
3923                let guest = server
3924                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3925                    .await;
3926                let guest_project = Project::remote(
3927                    host_project_id,
3928                    guest.client.clone(),
3929                    guest.user_store.clone(),
3930                    guest_lang_registry.clone(),
3931                    fs.clone(),
3932                    &mut guest_cx.to_async(),
3933                )
3934                .await
3935                .unwrap();
3936                clients.push(cx.foreground().spawn(guest.simulate_guest(
3937                    guest_id,
3938                    guest_project,
3939                    operations.clone(),
3940                    max_operations,
3941                    rng.clone(),
3942                    guest_cx,
3943                )));
3944
3945                log::info!("Guest {} added", guest_id);
3946            }
3947        }
3948
3949        let clients = futures::future::join_all(clients).await;
3950        cx.foreground().run_until_parked();
3951
3952        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3953            project
3954                .worktrees(cx)
3955                .map(|worktree| {
3956                    let snapshot = worktree.read(cx).snapshot();
3957                    (snapshot.id(), snapshot)
3958                })
3959                .collect::<BTreeMap<_, _>>()
3960        });
3961
3962        for (guest_client, guest_cx) in clients.iter().skip(1) {
3963            let guest_id = guest_client.client.id();
3964            let worktree_snapshots =
3965                guest_client
3966                    .project
3967                    .as_ref()
3968                    .unwrap()
3969                    .read_with(guest_cx, |project, cx| {
3970                        project
3971                            .worktrees(cx)
3972                            .map(|worktree| {
3973                                let worktree = worktree.read(cx);
3974                                assert!(
3975                                    !worktree.as_remote().unwrap().has_pending_updates(),
3976                                    "Guest {} worktree {:?} contains deferred updates",
3977                                    guest_id,
3978                                    worktree.id()
3979                                );
3980                                (worktree.id(), worktree.snapshot())
3981                            })
3982                            .collect::<BTreeMap<_, _>>()
3983                    });
3984
3985            assert_eq!(
3986                worktree_snapshots.keys().collect::<Vec<_>>(),
3987                host_worktree_snapshots.keys().collect::<Vec<_>>(),
3988                "guest {} has different worktrees than the host",
3989                guest_id
3990            );
3991            for (id, host_snapshot) in &host_worktree_snapshots {
3992                let guest_snapshot = &worktree_snapshots[id];
3993                assert_eq!(
3994                    guest_snapshot.root_name(),
3995                    host_snapshot.root_name(),
3996                    "guest {} has different root name than the host for worktree {}",
3997                    guest_id,
3998                    id
3999                );
4000                assert_eq!(
4001                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4002                    host_snapshot.entries(false).collect::<Vec<_>>(),
4003                    "guest {} has different snapshot than the host for worktree {}",
4004                    guest_id,
4005                    id
4006                );
4007            }
4008
4009            guest_client
4010                .project
4011                .as_ref()
4012                .unwrap()
4013                .read_with(guest_cx, |project, _| {
4014                    assert!(
4015                        !project.has_buffered_operations(),
4016                        "guest {} has buffered operations ",
4017                        guest_id,
4018                    );
4019                });
4020
4021            for guest_buffer in &guest_client.buffers {
4022                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4023                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4024                    project
4025                        .shared_buffer(guest_client.peer_id, buffer_id)
4026                        .expect(&format!(
4027                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4028                            guest_id, guest_client.peer_id, buffer_id
4029                        ))
4030                });
4031                assert_eq!(
4032                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4033                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4034                    "guest {} buffer {} differs from the host's buffer",
4035                    guest_id,
4036                    buffer_id,
4037                );
4038            }
4039        }
4040    }
4041
4042    struct TestServer {
4043        peer: Arc<Peer>,
4044        app_state: Arc<AppState>,
4045        server: Arc<Server>,
4046        foreground: Rc<executor::Foreground>,
4047        notifications: mpsc::UnboundedReceiver<()>,
4048        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4049        forbid_connections: Arc<AtomicBool>,
4050        _test_db: TestDb,
4051    }
4052
4053    impl TestServer {
4054        async fn start(
4055            foreground: Rc<executor::Foreground>,
4056            background: Arc<executor::Background>,
4057        ) -> Self {
4058            let test_db = TestDb::fake(background);
4059            let app_state = Self::build_app_state(&test_db).await;
4060            let peer = Peer::new();
4061            let notifications = mpsc::unbounded();
4062            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4063            Self {
4064                peer,
4065                app_state,
4066                server,
4067                foreground,
4068                notifications: notifications.1,
4069                connection_killers: Default::default(),
4070                forbid_connections: Default::default(),
4071                _test_db: test_db,
4072            }
4073        }
4074
4075        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4076            let http = FakeHttpClient::with_404_response();
4077            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4078            let client_name = name.to_string();
4079            let mut client = Client::new(http.clone());
4080            let server = self.server.clone();
4081            let connection_killers = self.connection_killers.clone();
4082            let forbid_connections = self.forbid_connections.clone();
4083            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4084
4085            Arc::get_mut(&mut client)
4086                .unwrap()
4087                .override_authenticate(move |cx| {
4088                    cx.spawn(|_| async move {
4089                        let access_token = "the-token".to_string();
4090                        Ok(Credentials {
4091                            user_id: user_id.0 as u64,
4092                            access_token,
4093                        })
4094                    })
4095                })
4096                .override_establish_connection(move |credentials, cx| {
4097                    assert_eq!(credentials.user_id, user_id.0 as u64);
4098                    assert_eq!(credentials.access_token, "the-token");
4099
4100                    let server = server.clone();
4101                    let connection_killers = connection_killers.clone();
4102                    let forbid_connections = forbid_connections.clone();
4103                    let client_name = client_name.clone();
4104                    let connection_id_tx = connection_id_tx.clone();
4105                    cx.spawn(move |cx| async move {
4106                        if forbid_connections.load(SeqCst) {
4107                            Err(EstablishConnectionError::other(anyhow!(
4108                                "server is forbidding connections"
4109                            )))
4110                        } else {
4111                            let (client_conn, server_conn, kill_conn) =
4112                                Connection::in_memory(cx.background());
4113                            connection_killers.lock().insert(user_id, kill_conn);
4114                            cx.background()
4115                                .spawn(server.handle_connection(
4116                                    server_conn,
4117                                    client_name,
4118                                    user_id,
4119                                    Some(connection_id_tx),
4120                                    cx.background(),
4121                                ))
4122                                .detach();
4123                            Ok(client_conn)
4124                        }
4125                    })
4126                });
4127
4128            client
4129                .authenticate_and_connect(&cx.to_async())
4130                .await
4131                .unwrap();
4132
4133            Channel::init(&client);
4134            Project::init(&client);
4135
4136            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4137            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4138            let mut authed_user =
4139                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4140            while authed_user.next().await.unwrap().is_none() {}
4141
4142            TestClient {
4143                client,
4144                peer_id,
4145                user_store,
4146                project: Default::default(),
4147                buffers: Default::default(),
4148            }
4149        }
4150
4151        fn disconnect_client(&self, user_id: UserId) {
4152            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4153                let _ = kill_conn.try_send(Some(()));
4154            }
4155        }
4156
4157        fn forbid_connections(&self) {
4158            self.forbid_connections.store(true, SeqCst);
4159        }
4160
4161        fn allow_connections(&self) {
4162            self.forbid_connections.store(false, SeqCst);
4163        }
4164
4165        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4166            let mut config = Config::default();
4167            config.session_secret = "a".repeat(32);
4168            config.database_url = test_db.url.clone();
4169            let github_client = github::AppClient::test();
4170            Arc::new(AppState {
4171                db: test_db.db().clone(),
4172                handlebars: Default::default(),
4173                auth_client: auth::build_client("", ""),
4174                repo_client: github::RepoClient::test(&github_client),
4175                github_client,
4176                config,
4177            })
4178        }
4179
4180        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4181            self.server.store.read()
4182        }
4183
4184        async fn condition<F>(&mut self, mut predicate: F)
4185        where
4186            F: FnMut(&Store) -> bool,
4187        {
4188            async_std::future::timeout(Duration::from_millis(500), async {
4189                while !(predicate)(&*self.server.store.read()) {
4190                    self.foreground.start_waiting();
4191                    self.notifications.next().await;
4192                    self.foreground.finish_waiting();
4193                }
4194            })
4195            .await
4196            .expect("condition timed out");
4197        }
4198    }
4199
4200    impl Drop for TestServer {
4201        fn drop(&mut self) {
4202            self.peer.reset();
4203        }
4204    }
4205
4206    struct TestClient {
4207        client: Arc<Client>,
4208        pub peer_id: PeerId,
4209        pub user_store: ModelHandle<UserStore>,
4210        project: Option<ModelHandle<Project>>,
4211        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4212    }
4213
4214    impl Deref for TestClient {
4215        type Target = Arc<Client>;
4216
4217        fn deref(&self) -> &Self::Target {
4218            &self.client
4219        }
4220    }
4221
4222    impl TestClient {
4223        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4224            UserId::from_proto(
4225                self.user_store
4226                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4227            )
4228        }
4229
4230        fn simulate_host(
4231            mut self,
4232            project: ModelHandle<Project>,
4233            mut language_server_config: LanguageServerConfig,
4234            operations: Rc<Cell<usize>>,
4235            max_operations: usize,
4236            rng: Arc<Mutex<StdRng>>,
4237            mut cx: TestAppContext,
4238        ) -> impl Future<Output = (Self, TestAppContext)> {
4239            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4240
4241            // Set up a fake language server.
4242            language_server_config.set_fake_initializer({
4243                let rng = rng.clone();
4244                let files = files.clone();
4245                move |fake_server| {
4246                    fake_server.handle_request::<lsp::request::Completion, _>(|_| {
4247                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4248                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4249                                range: lsp::Range::new(
4250                                    lsp::Position::new(0, 0),
4251                                    lsp::Position::new(0, 0),
4252                                ),
4253                                new_text: "the-new-text".to_string(),
4254                            })),
4255                            ..Default::default()
4256                        }]))
4257                    });
4258
4259                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
4260                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4261                            lsp::CodeAction {
4262                                title: "the-code-action".to_string(),
4263                                ..Default::default()
4264                            },
4265                        )])
4266                    });
4267
4268                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
4269                        Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4270                            params.position,
4271                            params.position,
4272                        )))
4273                    });
4274
4275                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4276                        let files = files.clone();
4277                        let rng = rng.clone();
4278                        move |_| {
4279                            let files = files.lock();
4280                            let mut rng = rng.lock();
4281                            let count = rng.gen_range::<usize, _>(1..3);
4282                            Some(lsp::GotoDefinitionResponse::Array(
4283                                (0..count)
4284                                    .map(|_| {
4285                                        let file = files.choose(&mut *rng).unwrap().as_path();
4286                                        lsp::Location {
4287                                            uri: lsp::Url::from_file_path(file).unwrap(),
4288                                            range: Default::default(),
4289                                        }
4290                                    })
4291                                    .collect(),
4292                            ))
4293                        }
4294                    });
4295                }
4296            });
4297
4298            project.update(&mut cx, |project, _| {
4299                project.languages().add(Arc::new(Language::new(
4300                    LanguageConfig {
4301                        name: "Rust".into(),
4302                        path_suffixes: vec!["rs".to_string()],
4303                        language_server: Some(language_server_config),
4304                        ..Default::default()
4305                    },
4306                    None,
4307                )));
4308            });
4309
4310            async move {
4311                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4312                while operations.get() < max_operations {
4313                    operations.set(operations.get() + 1);
4314
4315                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4316                    match distribution {
4317                        0..=20 if !files.lock().is_empty() => {
4318                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4319                            let mut path = path.as_path();
4320                            while let Some(parent_path) = path.parent() {
4321                                path = parent_path;
4322                                if rng.lock().gen() {
4323                                    break;
4324                                }
4325                            }
4326
4327                            log::info!("Host: find/create local worktree {:?}", path);
4328                            project
4329                                .update(&mut cx, |project, cx| {
4330                                    project.find_or_create_local_worktree(path, false, cx)
4331                                })
4332                                .await
4333                                .unwrap();
4334                        }
4335                        10..=80 if !files.lock().is_empty() => {
4336                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4337                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4338                                let (worktree, path) = project
4339                                    .update(&mut cx, |project, cx| {
4340                                        project.find_or_create_local_worktree(file, false, cx)
4341                                    })
4342                                    .await
4343                                    .unwrap();
4344                                let project_path =
4345                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4346                                log::info!("Host: opening path {:?}", project_path);
4347                                let buffer = project
4348                                    .update(&mut cx, |project, cx| {
4349                                        project.open_buffer(project_path, cx)
4350                                    })
4351                                    .await
4352                                    .unwrap();
4353                                self.buffers.insert(buffer.clone());
4354                                buffer
4355                            } else {
4356                                self.buffers
4357                                    .iter()
4358                                    .choose(&mut *rng.lock())
4359                                    .unwrap()
4360                                    .clone()
4361                            };
4362
4363                            if rng.lock().gen_bool(0.1) {
4364                                cx.update(|cx| {
4365                                    log::info!(
4366                                        "Host: dropping buffer {:?}",
4367                                        buffer.read(cx).file().unwrap().full_path(cx)
4368                                    );
4369                                    self.buffers.remove(&buffer);
4370                                    drop(buffer);
4371                                });
4372                            } else {
4373                                buffer.update(&mut cx, |buffer, cx| {
4374                                    log::info!(
4375                                        "Host: updating buffer {:?}",
4376                                        buffer.file().unwrap().full_path(cx)
4377                                    );
4378                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4379                                });
4380                            }
4381                        }
4382                        _ => loop {
4383                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4384                            let mut path = PathBuf::new();
4385                            path.push("/");
4386                            for _ in 0..path_component_count {
4387                                let letter = rng.lock().gen_range(b'a'..=b'z');
4388                                path.push(std::str::from_utf8(&[letter]).unwrap());
4389                            }
4390                            path.set_extension("rs");
4391                            let parent_path = path.parent().unwrap();
4392
4393                            log::info!("Host: creating file {:?}", path,);
4394
4395                            if fs.create_dir(&parent_path).await.is_ok()
4396                                && fs.create_file(&path, Default::default()).await.is_ok()
4397                            {
4398                                files.lock().push(path);
4399                                break;
4400                            } else {
4401                                log::info!("Host: cannot create file");
4402                            }
4403                        },
4404                    }
4405
4406                    cx.background().simulate_random_delay().await;
4407                }
4408
4409                self.project = Some(project);
4410                (self, cx)
4411            }
4412        }
4413
4414        pub async fn simulate_guest(
4415            mut self,
4416            guest_id: usize,
4417            project: ModelHandle<Project>,
4418            operations: Rc<Cell<usize>>,
4419            max_operations: usize,
4420            rng: Arc<Mutex<StdRng>>,
4421            mut cx: TestAppContext,
4422        ) -> (Self, TestAppContext) {
4423            while operations.get() < max_operations {
4424                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4425                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4426                        project
4427                            .worktrees(&cx)
4428                            .filter(|worktree| {
4429                                worktree.read(cx).entries(false).any(|e| e.is_file())
4430                            })
4431                            .choose(&mut *rng.lock())
4432                    }) {
4433                        worktree
4434                    } else {
4435                        cx.background().simulate_random_delay().await;
4436                        continue;
4437                    };
4438
4439                    operations.set(operations.get() + 1);
4440                    let project_path = worktree.read_with(&cx, |worktree, _| {
4441                        let entry = worktree
4442                            .entries(false)
4443                            .filter(|e| e.is_file())
4444                            .choose(&mut *rng.lock())
4445                            .unwrap();
4446                        (worktree.id(), entry.path.clone())
4447                    });
4448                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4449                    let buffer = project
4450                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4451                        .await
4452                        .unwrap();
4453                    self.buffers.insert(buffer.clone());
4454                    buffer
4455                } else {
4456                    operations.set(operations.get() + 1);
4457
4458                    self.buffers
4459                        .iter()
4460                        .choose(&mut *rng.lock())
4461                        .unwrap()
4462                        .clone()
4463                };
4464
4465                let choice = rng.lock().gen_range(0..100);
4466                match choice {
4467                    0..=9 => {
4468                        cx.update(|cx| {
4469                            log::info!(
4470                                "Guest {}: dropping buffer {:?}",
4471                                guest_id,
4472                                buffer.read(cx).file().unwrap().full_path(cx)
4473                            );
4474                            self.buffers.remove(&buffer);
4475                            drop(buffer);
4476                        });
4477                    }
4478                    10..=19 => {
4479                        let completions = project.update(&mut cx, |project, cx| {
4480                            log::info!(
4481                                "Guest {}: requesting completions for buffer {:?}",
4482                                guest_id,
4483                                buffer.read(cx).file().unwrap().full_path(cx)
4484                            );
4485                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4486                            project.completions(&buffer, offset, cx)
4487                        });
4488                        let completions = cx.background().spawn(async move {
4489                            completions.await.expect("completions request failed");
4490                        });
4491                        if rng.lock().gen_bool(0.3) {
4492                            log::info!("Guest {}: detaching completions request", guest_id);
4493                            completions.detach();
4494                        } else {
4495                            completions.await;
4496                        }
4497                    }
4498                    20..=29 => {
4499                        let code_actions = project.update(&mut cx, |project, cx| {
4500                            log::info!(
4501                                "Guest {}: requesting code actions for buffer {:?}",
4502                                guest_id,
4503                                buffer.read(cx).file().unwrap().full_path(cx)
4504                            );
4505                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4506                            project.code_actions(&buffer, range, cx)
4507                        });
4508                        let code_actions = cx.background().spawn(async move {
4509                            code_actions.await.expect("code actions request failed");
4510                        });
4511                        if rng.lock().gen_bool(0.3) {
4512                            log::info!("Guest {}: detaching code actions request", guest_id);
4513                            code_actions.detach();
4514                        } else {
4515                            code_actions.await;
4516                        }
4517                    }
4518                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4519                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4520                            log::info!(
4521                                "Guest {}: saving buffer {:?}",
4522                                guest_id,
4523                                buffer.file().unwrap().full_path(cx)
4524                            );
4525                            (buffer.version(), buffer.save(cx))
4526                        });
4527                        let save = cx.spawn(|cx| async move {
4528                            let (saved_version, _) = save.await.expect("save request failed");
4529                            buffer.read_with(&cx, |buffer, _| {
4530                                assert!(buffer.version().observed_all(&saved_version));
4531                                assert!(saved_version.observed_all(&requested_version));
4532                            });
4533                        });
4534                        if rng.lock().gen_bool(0.3) {
4535                            log::info!("Guest {}: detaching save request", guest_id);
4536                            save.detach();
4537                        } else {
4538                            save.await;
4539                        }
4540                    }
4541                    40..=45 => {
4542                        let prepare_rename = project.update(&mut cx, |project, cx| {
4543                            log::info!(
4544                                "Guest {}: preparing rename for buffer {:?}",
4545                                guest_id,
4546                                buffer.read(cx).file().unwrap().full_path(cx)
4547                            );
4548                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4549                            project.prepare_rename(buffer, offset, cx)
4550                        });
4551                        let prepare_rename = cx.background().spawn(async move {
4552                            prepare_rename.await.expect("prepare rename request failed");
4553                        });
4554                        if rng.lock().gen_bool(0.3) {
4555                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4556                            prepare_rename.detach();
4557                        } else {
4558                            prepare_rename.await;
4559                        }
4560                    }
4561                    46..=49 => {
4562                        let definitions = project.update(&mut cx, |project, cx| {
4563                            log::info!(
4564                                "Guest {}: requesting defintions for buffer {:?}",
4565                                guest_id,
4566                                buffer.read(cx).file().unwrap().full_path(cx)
4567                            );
4568                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4569                            project.definition(&buffer, offset, cx)
4570                        });
4571                        let definitions = cx.background().spawn(async move {
4572                            definitions.await.expect("definitions request failed");
4573                        });
4574                        if rng.lock().gen_bool(0.3) {
4575                            log::info!("Guest {}: detaching definitions request", guest_id);
4576                            definitions.detach();
4577                        } else {
4578                            definitions.await;
4579                        }
4580                    }
4581                    _ => {
4582                        buffer.update(&mut cx, |buffer, cx| {
4583                            log::info!(
4584                                "Guest {}: updating buffer {:?}",
4585                                guest_id,
4586                                buffer.file().unwrap().full_path(cx)
4587                            );
4588                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4589                        });
4590                    }
4591                }
4592                cx.background().simulate_random_delay().await;
4593            }
4594
4595            self.project = Some(project);
4596            (self, cx)
4597        }
4598    }
4599
4600    impl Executor for Arc<gpui::executor::Background> {
4601        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4602            self.spawn(future).detach();
4603        }
4604    }
4605
4606    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4607        channel
4608            .messages()
4609            .cursor::<()>()
4610            .map(|m| {
4611                (
4612                    m.sender.github_login.clone(),
4613                    m.body.clone(),
4614                    m.is_pending(),
4615                )
4616            })
4617            .collect()
4618    }
4619
4620    struct EmptyView;
4621
4622    impl gpui::Entity for EmptyView {
4623        type Event = ();
4624    }
4625
4626    impl gpui::View for EmptyView {
4627        fn ui_name() -> &'static str {
4628            "empty view"
4629        }
4630
4631        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4632            gpui::Element::boxed(gpui::elements::Empty)
4633        }
4634    }
4635}