rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_channels)
  89            .add_handler(Server::get_users)
  90            .add_handler(Server::join_channel)
  91            .add_handler(Server::leave_channel)
  92            .add_handler(Server::send_channel_message)
  93            .add_handler(Server::get_channel_messages);
  94
  95        Arc::new(server)
  96    }
  97
  98    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  99    where
 100        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 101        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 102        M: EnvelopedMessage,
 103    {
 104        let prev_handler = self.handlers.insert(
 105            TypeId::of::<M>(),
 106            Box::new(move |server, envelope| {
 107                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 108                (handler)(server, *envelope).boxed()
 109            }),
 110        );
 111        if prev_handler.is_some() {
 112            panic!("registered a handler for the same message twice");
 113        }
 114        self
 115    }
 116
 117    pub fn handle_connection(
 118        self: &Arc<Self>,
 119        connection: Connection,
 120        addr: String,
 121        user_id: UserId,
 122        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 123    ) -> impl Future<Output = ()> {
 124        let mut this = self.clone();
 125        async move {
 126            let (connection_id, handle_io, mut incoming_rx) =
 127                this.peer.add_connection(connection).await;
 128
 129            if let Some(send_connection_id) = send_connection_id.as_mut() {
 130                let _ = send_connection_id.send(connection_id).await;
 131            }
 132
 133            this.state_mut().add_connection(connection_id, user_id);
 134            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 135                log::error!("error updating contacts for {:?}: {}", user_id, err);
 136            }
 137
 138            let handle_io = handle_io.fuse();
 139            futures::pin_mut!(handle_io);
 140            loop {
 141                let next_message = incoming_rx.next().fuse();
 142                futures::pin_mut!(next_message);
 143                futures::select_biased! {
 144                    message = next_message => {
 145                        if let Some(message) = message {
 146                            let start_time = Instant::now();
 147                            log::info!("RPC message received: {}", message.payload_type_name());
 148                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 149                                if let Err(err) = (handler)(this.clone(), message).await {
 150                                    log::error!("error handling message: {:?}", err);
 151                                } else {
 152                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 153                                }
 154
 155                                if let Some(mut notifications) = this.notifications.clone() {
 156                                    let _ = notifications.send(()).await;
 157                                }
 158                            } else {
 159                                log::warn!("unhandled message: {}", message.payload_type_name());
 160                            }
 161                        } else {
 162                            log::info!("rpc connection closed {:?}", addr);
 163                            break;
 164                        }
 165                    }
 166                    handle_io = handle_io => {
 167                        if let Err(err) = handle_io {
 168                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 169                        }
 170                        break;
 171                    }
 172                }
 173            }
 174
 175            if let Err(err) = this.sign_out(connection_id).await {
 176                log::error!("error signing out connection {:?} - {:?}", addr, err);
 177            }
 178        }
 179    }
 180
 181    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 182        self.peer.disconnect(connection_id);
 183        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 184
 185        for (project_id, project) in removed_connection.hosted_projects {
 186            if let Some(share) = project.share {
 187                broadcast(
 188                    connection_id,
 189                    share.guests.keys().copied().collect(),
 190                    |conn_id| {
 191                        self.peer
 192                            .send(conn_id, proto::UnshareProject { project_id })
 193                    },
 194                )
 195                .await?;
 196            }
 197        }
 198
 199        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 200            broadcast(connection_id, peer_ids, |conn_id| {
 201                self.peer.send(
 202                    conn_id,
 203                    proto::RemoveProjectCollaborator {
 204                        project_id,
 205                        peer_id: connection_id.0,
 206                    },
 207                )
 208            })
 209            .await?;
 210        }
 211
 212        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 213            .await?;
 214
 215        Ok(())
 216    }
 217
 218    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 219        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 220        Ok(())
 221    }
 222
 223    async fn register_project(
 224        mut self: Arc<Server>,
 225        request: TypedEnvelope<proto::RegisterProject>,
 226    ) -> tide::Result<()> {
 227        let project_id = {
 228            let mut state = self.state_mut();
 229            let user_id = state.user_id_for_connection(request.sender_id)?;
 230            state.register_project(request.sender_id, user_id)
 231        };
 232        self.peer
 233            .respond(
 234                request.receipt(),
 235                proto::RegisterProjectResponse { project_id },
 236            )
 237            .await?;
 238        Ok(())
 239    }
 240
 241    async fn unregister_project(
 242        mut self: Arc<Server>,
 243        request: TypedEnvelope<proto::UnregisterProject>,
 244    ) -> tide::Result<()> {
 245        let project = self
 246            .state_mut()
 247            .unregister_project(request.payload.project_id, request.sender_id)
 248            .ok_or_else(|| anyhow!("no such project"))?;
 249        self.update_contacts_for_users(project.authorized_user_ids().iter())
 250            .await?;
 251        Ok(())
 252    }
 253
 254    async fn share_project(
 255        mut self: Arc<Server>,
 256        request: TypedEnvelope<proto::ShareProject>,
 257    ) -> tide::Result<()> {
 258        self.state_mut()
 259            .share_project(request.payload.project_id, request.sender_id);
 260        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 261        Ok(())
 262    }
 263
 264    async fn unshare_project(
 265        mut self: Arc<Server>,
 266        request: TypedEnvelope<proto::UnshareProject>,
 267    ) -> tide::Result<()> {
 268        let project_id = request.payload.project_id;
 269        let project = self
 270            .state_mut()
 271            .unshare_project(project_id, request.sender_id)?;
 272
 273        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 274            self.peer
 275                .send(conn_id, proto::UnshareProject { project_id })
 276        })
 277        .await?;
 278        self.update_contacts_for_users(&project.authorized_user_ids)
 279            .await?;
 280
 281        Ok(())
 282    }
 283
 284    async fn join_project(
 285        mut self: Arc<Server>,
 286        request: TypedEnvelope<proto::JoinProject>,
 287    ) -> tide::Result<()> {
 288        let project_id = request.payload.project_id;
 289
 290        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 291        let response_data = self
 292            .state_mut()
 293            .join_project(request.sender_id, user_id, project_id)
 294            .and_then(|joined| {
 295                let share = joined.project.share()?;
 296                let peer_count = share.guests.len();
 297                let mut collaborators = Vec::with_capacity(peer_count);
 298                collaborators.push(proto::Collaborator {
 299                    peer_id: joined.project.host_connection_id.0,
 300                    replica_id: 0,
 301                    user_id: joined.project.host_user_id.to_proto(),
 302                });
 303                let worktrees = joined
 304                    .project
 305                    .worktrees
 306                    .iter()
 307                    .filter_map(|(id, worktree)| {
 308                        worktree.share.as_ref().map(|share| proto::Worktree {
 309                            id: *id,
 310                            root_name: worktree.root_name.clone(),
 311                            entries: share.entries.values().cloned().collect(),
 312                            diagnostic_summaries: share
 313                                .diagnostic_summaries
 314                                .values()
 315                                .cloned()
 316                                .collect(),
 317                            weak: worktree.weak,
 318                        })
 319                    })
 320                    .collect();
 321                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 322                    if *peer_conn_id != request.sender_id {
 323                        collaborators.push(proto::Collaborator {
 324                            peer_id: peer_conn_id.0,
 325                            replica_id: *peer_replica_id as u32,
 326                            user_id: peer_user_id.to_proto(),
 327                        });
 328                    }
 329                }
 330                let response = proto::JoinProjectResponse {
 331                    worktrees,
 332                    replica_id: joined.replica_id as u32,
 333                    collaborators,
 334                };
 335                let connection_ids = joined.project.connection_ids();
 336                let contact_user_ids = joined.project.authorized_user_ids();
 337                Ok((response, connection_ids, contact_user_ids))
 338            });
 339
 340        match response_data {
 341            Ok((response, connection_ids, contact_user_ids)) => {
 342                broadcast(request.sender_id, connection_ids, |conn_id| {
 343                    self.peer.send(
 344                        conn_id,
 345                        proto::AddProjectCollaborator {
 346                            project_id,
 347                            collaborator: Some(proto::Collaborator {
 348                                peer_id: request.sender_id.0,
 349                                replica_id: response.replica_id,
 350                                user_id: user_id.to_proto(),
 351                            }),
 352                        },
 353                    )
 354                })
 355                .await?;
 356                self.peer.respond(request.receipt(), response).await?;
 357                self.update_contacts_for_users(&contact_user_ids).await?;
 358            }
 359            Err(error) => {
 360                self.peer
 361                    .respond_with_error(
 362                        request.receipt(),
 363                        proto::Error {
 364                            message: error.to_string(),
 365                        },
 366                    )
 367                    .await?;
 368            }
 369        }
 370
 371        Ok(())
 372    }
 373
 374    async fn leave_project(
 375        mut self: Arc<Server>,
 376        request: TypedEnvelope<proto::LeaveProject>,
 377    ) -> tide::Result<()> {
 378        let sender_id = request.sender_id;
 379        let project_id = request.payload.project_id;
 380        let worktree = self.state_mut().leave_project(sender_id, project_id);
 381        if let Some(worktree) = worktree {
 382            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 383                self.peer.send(
 384                    conn_id,
 385                    proto::RemoveProjectCollaborator {
 386                        project_id,
 387                        peer_id: sender_id.0,
 388                    },
 389                )
 390            })
 391            .await?;
 392            self.update_contacts_for_users(&worktree.authorized_user_ids)
 393                .await?;
 394        }
 395        Ok(())
 396    }
 397
 398    async fn register_worktree(
 399        mut self: Arc<Server>,
 400        request: TypedEnvelope<proto::RegisterWorktree>,
 401    ) -> tide::Result<()> {
 402        let receipt = request.receipt();
 403        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 404
 405        let mut contact_user_ids = HashSet::default();
 406        contact_user_ids.insert(host_user_id);
 407        for github_login in request.payload.authorized_logins {
 408            match self.app_state.db.create_user(&github_login, false).await {
 409                Ok(contact_user_id) => {
 410                    contact_user_ids.insert(contact_user_id);
 411                }
 412                Err(err) => {
 413                    let message = err.to_string();
 414                    self.peer
 415                        .respond_with_error(receipt, proto::Error { message })
 416                        .await?;
 417                    return Ok(());
 418                }
 419            }
 420        }
 421
 422        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 423        let ok = self.state_mut().register_worktree(
 424            request.payload.project_id,
 425            request.payload.worktree_id,
 426            Worktree {
 427                authorized_user_ids: contact_user_ids.clone(),
 428                root_name: request.payload.root_name,
 429                share: None,
 430                weak: false,
 431            },
 432        );
 433
 434        if ok {
 435            self.peer.respond(receipt, proto::Ack {}).await?;
 436            self.update_contacts_for_users(&contact_user_ids).await?;
 437        } else {
 438            self.peer
 439                .respond_with_error(
 440                    receipt,
 441                    proto::Error {
 442                        message: NO_SUCH_PROJECT.to_string(),
 443                    },
 444                )
 445                .await?;
 446        }
 447
 448        Ok(())
 449    }
 450
 451    async fn unregister_worktree(
 452        mut self: Arc<Server>,
 453        request: TypedEnvelope<proto::UnregisterWorktree>,
 454    ) -> tide::Result<()> {
 455        let project_id = request.payload.project_id;
 456        let worktree_id = request.payload.worktree_id;
 457        let (worktree, guest_connection_ids) =
 458            self.state_mut()
 459                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 460
 461        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 462            self.peer.send(
 463                conn_id,
 464                proto::UnregisterWorktree {
 465                    project_id,
 466                    worktree_id,
 467                },
 468            )
 469        })
 470        .await?;
 471        self.update_contacts_for_users(&worktree.authorized_user_ids)
 472            .await?;
 473        Ok(())
 474    }
 475
 476    async fn share_worktree(
 477        mut self: Arc<Server>,
 478        mut request: TypedEnvelope<proto::ShareWorktree>,
 479    ) -> tide::Result<()> {
 480        let worktree = request
 481            .payload
 482            .worktree
 483            .as_mut()
 484            .ok_or_else(|| anyhow!("missing worktree"))?;
 485        let entries = worktree
 486            .entries
 487            .iter()
 488            .map(|entry| (entry.id, entry.clone()))
 489            .collect();
 490        let diagnostic_summaries = worktree
 491            .diagnostic_summaries
 492            .iter()
 493            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 494            .collect();
 495
 496        let shared_worktree = self.state_mut().share_worktree(
 497            request.payload.project_id,
 498            worktree.id,
 499            request.sender_id,
 500            entries,
 501            diagnostic_summaries,
 502        );
 503        if let Some(shared_worktree) = shared_worktree {
 504            broadcast(
 505                request.sender_id,
 506                shared_worktree.connection_ids,
 507                |connection_id| {
 508                    self.peer.forward_send(
 509                        request.sender_id,
 510                        connection_id,
 511                        request.payload.clone(),
 512                    )
 513                },
 514            )
 515            .await?;
 516            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 517            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
 518                .await?;
 519        } else {
 520            self.peer
 521                .respond_with_error(
 522                    request.receipt(),
 523                    proto::Error {
 524                        message: "no such worktree".to_string(),
 525                    },
 526                )
 527                .await?;
 528        }
 529        Ok(())
 530    }
 531
 532    async fn update_worktree(
 533        mut self: Arc<Server>,
 534        request: TypedEnvelope<proto::UpdateWorktree>,
 535    ) -> tide::Result<()> {
 536        let connection_ids = self
 537            .state_mut()
 538            .update_worktree(
 539                request.sender_id,
 540                request.payload.project_id,
 541                request.payload.worktree_id,
 542                &request.payload.removed_entries,
 543                &request.payload.updated_entries,
 544            )
 545            .ok_or_else(|| anyhow!("no such worktree"))?;
 546
 547        broadcast(request.sender_id, connection_ids, |connection_id| {
 548            self.peer
 549                .forward_send(request.sender_id, connection_id, request.payload.clone())
 550        })
 551        .await?;
 552
 553        Ok(())
 554    }
 555
 556    async fn update_diagnostic_summary(
 557        mut self: Arc<Server>,
 558        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 559    ) -> tide::Result<()> {
 560        let receiver_ids = request
 561            .payload
 562            .summary
 563            .clone()
 564            .and_then(|summary| {
 565                self.state_mut().update_diagnostic_summary(
 566                    request.payload.project_id,
 567                    request.payload.worktree_id,
 568                    request.sender_id,
 569                    summary,
 570                )
 571            })
 572            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 573
 574        broadcast(request.sender_id, receiver_ids, |connection_id| {
 575            self.peer
 576                .forward_send(request.sender_id, connection_id, request.payload.clone())
 577        })
 578        .await?;
 579        Ok(())
 580    }
 581
 582    async fn disk_based_diagnostics_updating(
 583        self: Arc<Server>,
 584        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 585    ) -> tide::Result<()> {
 586        let receiver_ids = self
 587            .state()
 588            .project_connection_ids(request.payload.project_id, request.sender_id)
 589            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 590        broadcast(request.sender_id, receiver_ids, |connection_id| {
 591            self.peer
 592                .forward_send(request.sender_id, connection_id, request.payload.clone())
 593        })
 594        .await?;
 595        Ok(())
 596    }
 597
 598    async fn disk_based_diagnostics_updated(
 599        self: Arc<Server>,
 600        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 601    ) -> tide::Result<()> {
 602        let receiver_ids = self
 603            .state()
 604            .project_connection_ids(request.payload.project_id, request.sender_id)
 605            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 606        broadcast(request.sender_id, receiver_ids, |connection_id| {
 607            self.peer
 608                .forward_send(request.sender_id, connection_id, request.payload.clone())
 609        })
 610        .await?;
 611        Ok(())
 612    }
 613
 614    async fn get_definition(
 615        self: Arc<Server>,
 616        request: TypedEnvelope<proto::GetDefinition>,
 617    ) -> tide::Result<()> {
 618        let receipt = request.receipt();
 619        let host_connection_id = self
 620            .state()
 621            .read_project(request.payload.project_id, request.sender_id)
 622            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 623            .host_connection_id;
 624        let response = self
 625            .peer
 626            .forward_request(request.sender_id, host_connection_id, request.payload)
 627            .await?;
 628        self.peer.respond(receipt, response).await?;
 629        Ok(())
 630    }
 631
 632    async fn open_buffer(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::OpenBuffer>,
 635    ) -> tide::Result<()> {
 636        let receipt = request.receipt();
 637        let host_connection_id = self
 638            .state()
 639            .read_project(request.payload.project_id, request.sender_id)
 640            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 641            .host_connection_id;
 642        let response = self
 643            .peer
 644            .forward_request(request.sender_id, host_connection_id, request.payload)
 645            .await?;
 646        self.peer.respond(receipt, response).await?;
 647        Ok(())
 648    }
 649
 650    async fn close_buffer(
 651        self: Arc<Server>,
 652        request: TypedEnvelope<proto::CloseBuffer>,
 653    ) -> tide::Result<()> {
 654        let host_connection_id = self
 655            .state()
 656            .read_project(request.payload.project_id, request.sender_id)
 657            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 658            .host_connection_id;
 659        self.peer
 660            .forward_send(request.sender_id, host_connection_id, request.payload)
 661            .await?;
 662        Ok(())
 663    }
 664
 665    async fn save_buffer(
 666        self: Arc<Server>,
 667        request: TypedEnvelope<proto::SaveBuffer>,
 668    ) -> tide::Result<()> {
 669        let host;
 670        let guests;
 671        {
 672            let state = self.state();
 673            let project = state
 674                .read_project(request.payload.project_id, request.sender_id)
 675                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 676            host = project.host_connection_id;
 677            guests = project.guest_connection_ids()
 678        }
 679
 680        let sender = request.sender_id;
 681        let receipt = request.receipt();
 682        let response = self
 683            .peer
 684            .forward_request(sender, host, request.payload.clone())
 685            .await?;
 686
 687        broadcast(host, guests, |conn_id| {
 688            let response = response.clone();
 689            let peer = &self.peer;
 690            async move {
 691                if conn_id == sender {
 692                    peer.respond(receipt, response).await
 693                } else {
 694                    peer.forward_send(host, conn_id, response).await
 695                }
 696            }
 697        })
 698        .await?;
 699
 700        Ok(())
 701    }
 702
 703    async fn format_buffer(
 704        self: Arc<Server>,
 705        request: TypedEnvelope<proto::FormatBuffer>,
 706    ) -> tide::Result<()> {
 707        let host;
 708        {
 709            let state = self.state();
 710            let project = state
 711                .read_project(request.payload.project_id, request.sender_id)
 712                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 713            host = project.host_connection_id;
 714        }
 715
 716        let sender = request.sender_id;
 717        let receipt = request.receipt();
 718        let response = self
 719            .peer
 720            .forward_request(sender, host, request.payload.clone())
 721            .await?;
 722        self.peer.respond(receipt, response).await?;
 723
 724        Ok(())
 725    }
 726
 727    async fn get_completions(
 728        self: Arc<Server>,
 729        request: TypedEnvelope<proto::GetCompletions>,
 730    ) -> tide::Result<()> {
 731        let host;
 732        {
 733            let state = self.state();
 734            let project = state
 735                .read_project(request.payload.project_id, request.sender_id)
 736                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 737            host = project.host_connection_id;
 738        }
 739
 740        let sender = request.sender_id;
 741        let receipt = request.receipt();
 742        let response = self
 743            .peer
 744            .forward_request(sender, host, request.payload.clone())
 745            .await?;
 746        self.peer.respond(receipt, response).await?;
 747
 748        Ok(())
 749    }
 750
 751    async fn apply_additional_edits_for_completion(
 752        self: Arc<Server>,
 753        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 754    ) -> tide::Result<()> {
 755        let host;
 756        {
 757            let state = self.state();
 758            let project = state
 759                .read_project(request.payload.project_id, request.sender_id)
 760                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 761            host = project.host_connection_id;
 762        }
 763
 764        let sender = request.sender_id;
 765        let receipt = request.receipt();
 766        let response = self
 767            .peer
 768            .forward_request(sender, host, request.payload.clone())
 769            .await?;
 770        self.peer.respond(receipt, response).await?;
 771
 772        Ok(())
 773    }
 774
 775    async fn update_buffer(
 776        self: Arc<Server>,
 777        request: TypedEnvelope<proto::UpdateBuffer>,
 778    ) -> tide::Result<()> {
 779        let receiver_ids = self
 780            .state()
 781            .project_connection_ids(request.payload.project_id, request.sender_id)
 782            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 783        broadcast(request.sender_id, receiver_ids, |connection_id| {
 784            self.peer
 785                .forward_send(request.sender_id, connection_id, request.payload.clone())
 786        })
 787        .await?;
 788        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 789        Ok(())
 790    }
 791
 792    async fn update_buffer_file(
 793        self: Arc<Server>,
 794        request: TypedEnvelope<proto::UpdateBufferFile>,
 795    ) -> tide::Result<()> {
 796        let receiver_ids = self
 797            .state()
 798            .project_connection_ids(request.payload.project_id, request.sender_id)
 799            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 800        broadcast(request.sender_id, receiver_ids, |connection_id| {
 801            self.peer
 802                .forward_send(request.sender_id, connection_id, request.payload.clone())
 803        })
 804        .await?;
 805        Ok(())
 806    }
 807
 808    async fn buffer_reloaded(
 809        self: Arc<Server>,
 810        request: TypedEnvelope<proto::BufferReloaded>,
 811    ) -> tide::Result<()> {
 812        let receiver_ids = self
 813            .state()
 814            .project_connection_ids(request.payload.project_id, request.sender_id)
 815            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 816        broadcast(request.sender_id, receiver_ids, |connection_id| {
 817            self.peer
 818                .forward_send(request.sender_id, connection_id, request.payload.clone())
 819        })
 820        .await?;
 821        Ok(())
 822    }
 823
 824    async fn buffer_saved(
 825        self: Arc<Server>,
 826        request: TypedEnvelope<proto::BufferSaved>,
 827    ) -> tide::Result<()> {
 828        let receiver_ids = self
 829            .state()
 830            .project_connection_ids(request.payload.project_id, request.sender_id)
 831            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 832        broadcast(request.sender_id, receiver_ids, |connection_id| {
 833            self.peer
 834                .forward_send(request.sender_id, connection_id, request.payload.clone())
 835        })
 836        .await?;
 837        Ok(())
 838    }
 839
 840    async fn get_channels(
 841        self: Arc<Server>,
 842        request: TypedEnvelope<proto::GetChannels>,
 843    ) -> tide::Result<()> {
 844        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 845        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 846        self.peer
 847            .respond(
 848                request.receipt(),
 849                proto::GetChannelsResponse {
 850                    channels: channels
 851                        .into_iter()
 852                        .map(|chan| proto::Channel {
 853                            id: chan.id.to_proto(),
 854                            name: chan.name,
 855                        })
 856                        .collect(),
 857                },
 858            )
 859            .await?;
 860        Ok(())
 861    }
 862
 863    async fn get_users(
 864        self: Arc<Server>,
 865        request: TypedEnvelope<proto::GetUsers>,
 866    ) -> tide::Result<()> {
 867        let receipt = request.receipt();
 868        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 869        let users = self
 870            .app_state
 871            .db
 872            .get_users_by_ids(user_ids)
 873            .await?
 874            .into_iter()
 875            .map(|user| proto::User {
 876                id: user.id.to_proto(),
 877                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 878                github_login: user.github_login,
 879            })
 880            .collect();
 881        self.peer
 882            .respond(receipt, proto::GetUsersResponse { users })
 883            .await?;
 884        Ok(())
 885    }
 886
 887    async fn update_contacts_for_users<'a>(
 888        self: &Arc<Server>,
 889        user_ids: impl IntoIterator<Item = &'a UserId>,
 890    ) -> tide::Result<()> {
 891        let mut send_futures = Vec::new();
 892
 893        {
 894            let state = self.state();
 895            for user_id in user_ids {
 896                let contacts = state.contacts_for_user(*user_id);
 897                for connection_id in state.connection_ids_for_user(*user_id) {
 898                    send_futures.push(self.peer.send(
 899                        connection_id,
 900                        proto::UpdateContacts {
 901                            contacts: contacts.clone(),
 902                        },
 903                    ));
 904                }
 905            }
 906        }
 907        futures::future::try_join_all(send_futures).await?;
 908
 909        Ok(())
 910    }
 911
 912    async fn join_channel(
 913        mut self: Arc<Self>,
 914        request: TypedEnvelope<proto::JoinChannel>,
 915    ) -> tide::Result<()> {
 916        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 917        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 918        if !self
 919            .app_state
 920            .db
 921            .can_user_access_channel(user_id, channel_id)
 922            .await?
 923        {
 924            Err(anyhow!("access denied"))?;
 925        }
 926
 927        self.state_mut().join_channel(request.sender_id, channel_id);
 928        let messages = self
 929            .app_state
 930            .db
 931            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 932            .await?
 933            .into_iter()
 934            .map(|msg| proto::ChannelMessage {
 935                id: msg.id.to_proto(),
 936                body: msg.body,
 937                timestamp: msg.sent_at.unix_timestamp() as u64,
 938                sender_id: msg.sender_id.to_proto(),
 939                nonce: Some(msg.nonce.as_u128().into()),
 940            })
 941            .collect::<Vec<_>>();
 942        self.peer
 943            .respond(
 944                request.receipt(),
 945                proto::JoinChannelResponse {
 946                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 947                    messages,
 948                },
 949            )
 950            .await?;
 951        Ok(())
 952    }
 953
 954    async fn leave_channel(
 955        mut self: Arc<Self>,
 956        request: TypedEnvelope<proto::LeaveChannel>,
 957    ) -> tide::Result<()> {
 958        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 959        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 960        if !self
 961            .app_state
 962            .db
 963            .can_user_access_channel(user_id, channel_id)
 964            .await?
 965        {
 966            Err(anyhow!("access denied"))?;
 967        }
 968
 969        self.state_mut()
 970            .leave_channel(request.sender_id, channel_id);
 971
 972        Ok(())
 973    }
 974
 975    async fn send_channel_message(
 976        self: Arc<Self>,
 977        request: TypedEnvelope<proto::SendChannelMessage>,
 978    ) -> tide::Result<()> {
 979        let receipt = request.receipt();
 980        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 981        let user_id;
 982        let connection_ids;
 983        {
 984            let state = self.state();
 985            user_id = state.user_id_for_connection(request.sender_id)?;
 986            if let Some(ids) = state.channel_connection_ids(channel_id) {
 987                connection_ids = ids;
 988            } else {
 989                return Ok(());
 990            }
 991        }
 992
 993        // Validate the message body.
 994        let body = request.payload.body.trim().to_string();
 995        if body.len() > MAX_MESSAGE_LEN {
 996            self.peer
 997                .respond_with_error(
 998                    receipt,
 999                    proto::Error {
1000                        message: "message is too long".to_string(),
1001                    },
1002                )
1003                .await?;
1004            return Ok(());
1005        }
1006        if body.is_empty() {
1007            self.peer
1008                .respond_with_error(
1009                    receipt,
1010                    proto::Error {
1011                        message: "message can't be blank".to_string(),
1012                    },
1013                )
1014                .await?;
1015            return Ok(());
1016        }
1017
1018        let timestamp = OffsetDateTime::now_utc();
1019        let nonce = if let Some(nonce) = request.payload.nonce {
1020            nonce
1021        } else {
1022            self.peer
1023                .respond_with_error(
1024                    receipt,
1025                    proto::Error {
1026                        message: "nonce can't be blank".to_string(),
1027                    },
1028                )
1029                .await?;
1030            return Ok(());
1031        };
1032
1033        let message_id = self
1034            .app_state
1035            .db
1036            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1037            .await?
1038            .to_proto();
1039        let message = proto::ChannelMessage {
1040            sender_id: user_id.to_proto(),
1041            id: message_id,
1042            body,
1043            timestamp: timestamp.unix_timestamp() as u64,
1044            nonce: Some(nonce),
1045        };
1046        broadcast(request.sender_id, connection_ids, |conn_id| {
1047            self.peer.send(
1048                conn_id,
1049                proto::ChannelMessageSent {
1050                    channel_id: channel_id.to_proto(),
1051                    message: Some(message.clone()),
1052                },
1053            )
1054        })
1055        .await?;
1056        self.peer
1057            .respond(
1058                receipt,
1059                proto::SendChannelMessageResponse {
1060                    message: Some(message),
1061                },
1062            )
1063            .await?;
1064        Ok(())
1065    }
1066
1067    async fn get_channel_messages(
1068        self: Arc<Self>,
1069        request: TypedEnvelope<proto::GetChannelMessages>,
1070    ) -> tide::Result<()> {
1071        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1072        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1073        if !self
1074            .app_state
1075            .db
1076            .can_user_access_channel(user_id, channel_id)
1077            .await?
1078        {
1079            Err(anyhow!("access denied"))?;
1080        }
1081
1082        let messages = self
1083            .app_state
1084            .db
1085            .get_channel_messages(
1086                channel_id,
1087                MESSAGE_COUNT_PER_PAGE,
1088                Some(MessageId::from_proto(request.payload.before_message_id)),
1089            )
1090            .await?
1091            .into_iter()
1092            .map(|msg| proto::ChannelMessage {
1093                id: msg.id.to_proto(),
1094                body: msg.body,
1095                timestamp: msg.sent_at.unix_timestamp() as u64,
1096                sender_id: msg.sender_id.to_proto(),
1097                nonce: Some(msg.nonce.as_u128().into()),
1098            })
1099            .collect::<Vec<_>>();
1100        self.peer
1101            .respond(
1102                request.receipt(),
1103                proto::GetChannelMessagesResponse {
1104                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1105                    messages,
1106                },
1107            )
1108            .await?;
1109        Ok(())
1110    }
1111
1112    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1113        self.store.read()
1114    }
1115
1116    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1117        self.store.write()
1118    }
1119}
1120
1121pub async fn broadcast<F, T>(
1122    sender_id: ConnectionId,
1123    receiver_ids: Vec<ConnectionId>,
1124    mut f: F,
1125) -> anyhow::Result<()>
1126where
1127    F: FnMut(ConnectionId) -> T,
1128    T: Future<Output = anyhow::Result<()>>,
1129{
1130    let futures = receiver_ids
1131        .into_iter()
1132        .filter(|id| *id != sender_id)
1133        .map(|id| f(id));
1134    futures::future::try_join_all(futures).await?;
1135    Ok(())
1136}
1137
1138pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1139    let server = Server::new(app.state().clone(), rpc.clone(), None);
1140    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1141        let server = server.clone();
1142        async move {
1143            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1144
1145            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1146            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1147            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1148            let client_protocol_version: Option<u32> = request
1149                .header("X-Zed-Protocol-Version")
1150                .and_then(|v| v.as_str().parse().ok());
1151
1152            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1153                return Ok(Response::new(StatusCode::UpgradeRequired));
1154            }
1155
1156            let header = match request.header("Sec-Websocket-Key") {
1157                Some(h) => h.as_str(),
1158                None => return Err(anyhow!("expected sec-websocket-key"))?,
1159            };
1160
1161            let user_id = process_auth_header(&request).await?;
1162
1163            let mut response = Response::new(StatusCode::SwitchingProtocols);
1164            response.insert_header(UPGRADE, "websocket");
1165            response.insert_header(CONNECTION, "Upgrade");
1166            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1167            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1168            response.insert_header("Sec-Websocket-Version", "13");
1169
1170            let http_res: &mut tide::http::Response = response.as_mut();
1171            let upgrade_receiver = http_res.recv_upgrade().await;
1172            let addr = request.remote().unwrap_or("unknown").to_string();
1173            task::spawn(async move {
1174                if let Some(stream) = upgrade_receiver.await {
1175                    server
1176                        .handle_connection(
1177                            Connection::new(
1178                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1179                            ),
1180                            addr,
1181                            user_id,
1182                            None,
1183                        )
1184                        .await;
1185                }
1186            });
1187
1188            Ok(response)
1189        }
1190    });
1191}
1192
1193fn header_contains_ignore_case<T>(
1194    request: &tide::Request<T>,
1195    header_name: HeaderName,
1196    value: &str,
1197) -> bool {
1198    request
1199        .header(header_name)
1200        .map(|h| {
1201            h.as_str()
1202                .split(',')
1203                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1204        })
1205        .unwrap_or(false)
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211    use crate::{
1212        auth,
1213        db::{tests::TestDb, UserId},
1214        github, AppState, Config,
1215    };
1216    use ::rpc::Peer;
1217    use async_std::task;
1218    use gpui::{executor, ModelHandle, TestAppContext};
1219    use parking_lot::Mutex;
1220    use postage::{mpsc, watch};
1221    use rand::prelude::*;
1222    use rpc::PeerId;
1223    use serde_json::json;
1224    use sqlx::types::time::OffsetDateTime;
1225    use std::{
1226        ops::Deref,
1227        path::Path,
1228        rc::Rc,
1229        sync::{
1230            atomic::{AtomicBool, Ordering::SeqCst},
1231            Arc,
1232        },
1233        time::Duration,
1234    };
1235    use zed::{
1236        client::{
1237            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1238            EstablishConnectionError, UserStore,
1239        },
1240        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1241        fs::{FakeFs, Fs as _},
1242        language::{
1243            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1244            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1245        },
1246        lsp,
1247        project::{DiagnosticSummary, Project, ProjectPath},
1248    };
1249
1250    #[gpui::test]
1251    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1252        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1253        let lang_registry = Arc::new(LanguageRegistry::new());
1254        let fs = Arc::new(FakeFs::new(cx_a.background()));
1255        cx_a.foreground().forbid_parking();
1256
1257        // Connect to a server as 2 clients.
1258        let mut server = TestServer::start(cx_a.foreground()).await;
1259        let client_a = server.create_client(&mut cx_a, "user_a").await;
1260        let client_b = server.create_client(&mut cx_b, "user_b").await;
1261
1262        // Share a project as client A
1263        fs.insert_tree(
1264            "/a",
1265            json!({
1266                ".zed.toml": r#"collaborators = ["user_b"]"#,
1267                "a.txt": "a-contents",
1268                "b.txt": "b-contents",
1269            }),
1270        )
1271        .await;
1272        let project_a = cx_a.update(|cx| {
1273            Project::local(
1274                client_a.clone(),
1275                client_a.user_store.clone(),
1276                lang_registry.clone(),
1277                fs.clone(),
1278                cx,
1279            )
1280        });
1281        let (worktree_a, _) = project_a
1282            .update(&mut cx_a, |p, cx| {
1283                p.find_or_create_local_worktree("/a", false, cx)
1284            })
1285            .await
1286            .unwrap();
1287        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1288        worktree_a
1289            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1290            .await;
1291        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1292        project_a
1293            .update(&mut cx_a, |p, cx| p.share(cx))
1294            .await
1295            .unwrap();
1296
1297        // Join that project as client B
1298        let project_b = Project::remote(
1299            project_id,
1300            client_b.clone(),
1301            client_b.user_store.clone(),
1302            lang_registry.clone(),
1303            fs.clone(),
1304            &mut cx_b.to_async(),
1305        )
1306        .await
1307        .unwrap();
1308
1309        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1310            assert_eq!(
1311                project
1312                    .collaborators()
1313                    .get(&client_a.peer_id)
1314                    .unwrap()
1315                    .user
1316                    .github_login,
1317                "user_a"
1318            );
1319            project.replica_id()
1320        });
1321        project_a
1322            .condition(&cx_a, |tree, _| {
1323                tree.collaborators()
1324                    .get(&client_b.peer_id)
1325                    .map_or(false, |collaborator| {
1326                        collaborator.replica_id == replica_id_b
1327                            && collaborator.user.github_login == "user_b"
1328                    })
1329            })
1330            .await;
1331
1332        // Open the same file as client B and client A.
1333        let buffer_b = project_b
1334            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1335            .await
1336            .unwrap();
1337        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1338        buffer_b.read_with(&cx_b, |buf, cx| {
1339            assert_eq!(buf.read(cx).text(), "b-contents")
1340        });
1341        project_a.read_with(&cx_a, |project, cx| {
1342            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1343        });
1344        let buffer_a = project_a
1345            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1346            .await
1347            .unwrap();
1348
1349        let editor_b = cx_b.add_view(window_b, |cx| {
1350            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1351        });
1352
1353        // TODO
1354        // // Create a selection set as client B and see that selection set as client A.
1355        // buffer_a
1356        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1357        //     .await;
1358
1359        // Edit the buffer as client B and see that edit as client A.
1360        editor_b.update(&mut cx_b, |editor, cx| {
1361            editor.handle_input(&Input("ok, ".into()), cx)
1362        });
1363        buffer_a
1364            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1365            .await;
1366
1367        // TODO
1368        // // Remove the selection set as client B, see those selections disappear as client A.
1369        cx_b.update(move |_| drop(editor_b));
1370        // buffer_a
1371        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1372        //     .await;
1373
1374        // Close the buffer as client A, see that the buffer is closed.
1375        cx_a.update(move |_| drop(buffer_a));
1376        project_a
1377            .condition(&cx_a, |project, cx| {
1378                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1379            })
1380            .await;
1381
1382        // Dropping the client B's project removes client B from client A's collaborators.
1383        cx_b.update(move |_| drop(project_b));
1384        project_a
1385            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1386            .await;
1387    }
1388
1389    #[gpui::test]
1390    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1391        let lang_registry = Arc::new(LanguageRegistry::new());
1392        let fs = Arc::new(FakeFs::new(cx_a.background()));
1393        cx_a.foreground().forbid_parking();
1394
1395        // Connect to a server as 2 clients.
1396        let mut server = TestServer::start(cx_a.foreground()).await;
1397        let client_a = server.create_client(&mut cx_a, "user_a").await;
1398        let client_b = server.create_client(&mut cx_b, "user_b").await;
1399
1400        // Share a project as client A
1401        fs.insert_tree(
1402            "/a",
1403            json!({
1404                ".zed.toml": r#"collaborators = ["user_b"]"#,
1405                "a.txt": "a-contents",
1406                "b.txt": "b-contents",
1407            }),
1408        )
1409        .await;
1410        let project_a = cx_a.update(|cx| {
1411            Project::local(
1412                client_a.clone(),
1413                client_a.user_store.clone(),
1414                lang_registry.clone(),
1415                fs.clone(),
1416                cx,
1417            )
1418        });
1419        let (worktree_a, _) = project_a
1420            .update(&mut cx_a, |p, cx| {
1421                p.find_or_create_local_worktree("/a", false, cx)
1422            })
1423            .await
1424            .unwrap();
1425        worktree_a
1426            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1427            .await;
1428        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1429        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1430        project_a
1431            .update(&mut cx_a, |p, cx| p.share(cx))
1432            .await
1433            .unwrap();
1434        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1435
1436        // Join that project as client B
1437        let project_b = Project::remote(
1438            project_id,
1439            client_b.clone(),
1440            client_b.user_store.clone(),
1441            lang_registry.clone(),
1442            fs.clone(),
1443            &mut cx_b.to_async(),
1444        )
1445        .await
1446        .unwrap();
1447        project_b
1448            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1449            .await
1450            .unwrap();
1451
1452        // Unshare the project as client A
1453        project_a
1454            .update(&mut cx_a, |project, cx| project.unshare(cx))
1455            .await
1456            .unwrap();
1457        project_b
1458            .condition(&mut cx_b, |project, _| project.is_read_only())
1459            .await;
1460        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1461        drop(project_b);
1462
1463        // Share the project again and ensure guests can still join.
1464        project_a
1465            .update(&mut cx_a, |project, cx| project.share(cx))
1466            .await
1467            .unwrap();
1468        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1469
1470        let project_c = Project::remote(
1471            project_id,
1472            client_b.clone(),
1473            client_b.user_store.clone(),
1474            lang_registry.clone(),
1475            fs.clone(),
1476            &mut cx_b.to_async(),
1477        )
1478        .await
1479        .unwrap();
1480        project_c
1481            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1482            .await
1483            .unwrap();
1484    }
1485
1486    #[gpui::test]
1487    async fn test_propagate_saves_and_fs_changes(
1488        mut cx_a: TestAppContext,
1489        mut cx_b: TestAppContext,
1490        mut cx_c: TestAppContext,
1491    ) {
1492        let lang_registry = Arc::new(LanguageRegistry::new());
1493        let fs = Arc::new(FakeFs::new(cx_a.background()));
1494        cx_a.foreground().forbid_parking();
1495
1496        // Connect to a server as 3 clients.
1497        let mut server = TestServer::start(cx_a.foreground()).await;
1498        let client_a = server.create_client(&mut cx_a, "user_a").await;
1499        let client_b = server.create_client(&mut cx_b, "user_b").await;
1500        let client_c = server.create_client(&mut cx_c, "user_c").await;
1501
1502        // Share a worktree as client A.
1503        fs.insert_tree(
1504            "/a",
1505            json!({
1506                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1507                "file1": "",
1508                "file2": ""
1509            }),
1510        )
1511        .await;
1512        let project_a = cx_a.update(|cx| {
1513            Project::local(
1514                client_a.clone(),
1515                client_a.user_store.clone(),
1516                lang_registry.clone(),
1517                fs.clone(),
1518                cx,
1519            )
1520        });
1521        let (worktree_a, _) = project_a
1522            .update(&mut cx_a, |p, cx| {
1523                p.find_or_create_local_worktree("/a", false, cx)
1524            })
1525            .await
1526            .unwrap();
1527        worktree_a
1528            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529            .await;
1530        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1531        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1532        project_a
1533            .update(&mut cx_a, |p, cx| p.share(cx))
1534            .await
1535            .unwrap();
1536
1537        // Join that worktree as clients B and C.
1538        let project_b = Project::remote(
1539            project_id,
1540            client_b.clone(),
1541            client_b.user_store.clone(),
1542            lang_registry.clone(),
1543            fs.clone(),
1544            &mut cx_b.to_async(),
1545        )
1546        .await
1547        .unwrap();
1548        let project_c = Project::remote(
1549            project_id,
1550            client_c.clone(),
1551            client_c.user_store.clone(),
1552            lang_registry.clone(),
1553            fs.clone(),
1554            &mut cx_c.to_async(),
1555        )
1556        .await
1557        .unwrap();
1558        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1559        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1560
1561        // Open and edit a buffer as both guests B and C.
1562        let buffer_b = project_b
1563            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1564            .await
1565            .unwrap();
1566        let buffer_c = project_c
1567            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1568            .await
1569            .unwrap();
1570        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1571        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1572
1573        // Open and edit that buffer as the host.
1574        let buffer_a = project_a
1575            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1576            .await
1577            .unwrap();
1578
1579        buffer_a
1580            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1581            .await;
1582        buffer_a.update(&mut cx_a, |buf, cx| {
1583            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1584        });
1585
1586        // Wait for edits to propagate
1587        buffer_a
1588            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1589            .await;
1590        buffer_b
1591            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1592            .await;
1593        buffer_c
1594            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1595            .await;
1596
1597        // Edit the buffer as the host and concurrently save as guest B.
1598        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1599        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1600        save_b.await.unwrap();
1601        assert_eq!(
1602            fs.load("/a/file1".as_ref()).await.unwrap(),
1603            "hi-a, i-am-c, i-am-b, i-am-a"
1604        );
1605        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1606        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1607        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1608
1609        // Make changes on host's file system, see those changes on guest worktrees.
1610        fs.rename(
1611            "/a/file1".as_ref(),
1612            "/a/file1-renamed".as_ref(),
1613            Default::default(),
1614        )
1615        .await
1616        .unwrap();
1617        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1618            .await
1619            .unwrap();
1620        fs.insert_file(Path::new("/a/file4"), "4".into())
1621            .await
1622            .unwrap();
1623
1624        worktree_a
1625            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1626            .await;
1627        worktree_b
1628            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1629            .await;
1630        worktree_c
1631            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1632            .await;
1633        worktree_a.read_with(&cx_a, |tree, _| {
1634            assert_eq!(
1635                tree.paths()
1636                    .map(|p| p.to_string_lossy())
1637                    .collect::<Vec<_>>(),
1638                &[".zed.toml", "file1-renamed", "file3", "file4"]
1639            )
1640        });
1641        worktree_b.read_with(&cx_b, |tree, _| {
1642            assert_eq!(
1643                tree.paths()
1644                    .map(|p| p.to_string_lossy())
1645                    .collect::<Vec<_>>(),
1646                &[".zed.toml", "file1-renamed", "file3", "file4"]
1647            )
1648        });
1649        worktree_c.read_with(&cx_c, |tree, _| {
1650            assert_eq!(
1651                tree.paths()
1652                    .map(|p| p.to_string_lossy())
1653                    .collect::<Vec<_>>(),
1654                &[".zed.toml", "file1-renamed", "file3", "file4"]
1655            )
1656        });
1657
1658        // Ensure buffer files are updated as well.
1659        buffer_a
1660            .condition(&cx_a, |buf, _| {
1661                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1662            })
1663            .await;
1664        buffer_b
1665            .condition(&cx_b, |buf, _| {
1666                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1667            })
1668            .await;
1669        buffer_c
1670            .condition(&cx_c, |buf, _| {
1671                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1672            })
1673            .await;
1674    }
1675
1676    #[gpui::test]
1677    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1678        cx_a.foreground().forbid_parking();
1679        let lang_registry = Arc::new(LanguageRegistry::new());
1680        let fs = Arc::new(FakeFs::new(cx_a.background()));
1681
1682        // Connect to a server as 2 clients.
1683        let mut server = TestServer::start(cx_a.foreground()).await;
1684        let client_a = server.create_client(&mut cx_a, "user_a").await;
1685        let client_b = server.create_client(&mut cx_b, "user_b").await;
1686
1687        // Share a project as client A
1688        fs.insert_tree(
1689            "/dir",
1690            json!({
1691                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1692                "a.txt": "a-contents",
1693            }),
1694        )
1695        .await;
1696
1697        let project_a = cx_a.update(|cx| {
1698            Project::local(
1699                client_a.clone(),
1700                client_a.user_store.clone(),
1701                lang_registry.clone(),
1702                fs.clone(),
1703                cx,
1704            )
1705        });
1706        let (worktree_a, _) = project_a
1707            .update(&mut cx_a, |p, cx| {
1708                p.find_or_create_local_worktree("/dir", false, cx)
1709            })
1710            .await
1711            .unwrap();
1712        worktree_a
1713            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1714            .await;
1715        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1716        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1717        project_a
1718            .update(&mut cx_a, |p, cx| p.share(cx))
1719            .await
1720            .unwrap();
1721
1722        // Join that project as client B
1723        let project_b = Project::remote(
1724            project_id,
1725            client_b.clone(),
1726            client_b.user_store.clone(),
1727            lang_registry.clone(),
1728            fs.clone(),
1729            &mut cx_b.to_async(),
1730        )
1731        .await
1732        .unwrap();
1733        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1734
1735        // Open a buffer as client B
1736        let buffer_b = project_b
1737            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1738            .await
1739            .unwrap();
1740        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1741
1742        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1743        buffer_b.read_with(&cx_b, |buf, _| {
1744            assert!(buf.is_dirty());
1745            assert!(!buf.has_conflict());
1746        });
1747
1748        buffer_b
1749            .update(&mut cx_b, |buf, cx| buf.save(cx))
1750            .await
1751            .unwrap();
1752        worktree_b
1753            .condition(&cx_b, |_, cx| {
1754                buffer_b.read(cx).file().unwrap().mtime() != mtime
1755            })
1756            .await;
1757        buffer_b.read_with(&cx_b, |buf, _| {
1758            assert!(!buf.is_dirty());
1759            assert!(!buf.has_conflict());
1760        });
1761
1762        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1763        buffer_b.read_with(&cx_b, |buf, _| {
1764            assert!(buf.is_dirty());
1765            assert!(!buf.has_conflict());
1766        });
1767    }
1768
1769    #[gpui::test]
1770    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1771        cx_a.foreground().forbid_parking();
1772        let lang_registry = Arc::new(LanguageRegistry::new());
1773        let fs = Arc::new(FakeFs::new(cx_a.background()));
1774
1775        // Connect to a server as 2 clients.
1776        let mut server = TestServer::start(cx_a.foreground()).await;
1777        let client_a = server.create_client(&mut cx_a, "user_a").await;
1778        let client_b = server.create_client(&mut cx_b, "user_b").await;
1779
1780        // Share a project as client A
1781        fs.insert_tree(
1782            "/dir",
1783            json!({
1784                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1785                "a.txt": "a-contents",
1786            }),
1787        )
1788        .await;
1789
1790        let project_a = cx_a.update(|cx| {
1791            Project::local(
1792                client_a.clone(),
1793                client_a.user_store.clone(),
1794                lang_registry.clone(),
1795                fs.clone(),
1796                cx,
1797            )
1798        });
1799        let (worktree_a, _) = project_a
1800            .update(&mut cx_a, |p, cx| {
1801                p.find_or_create_local_worktree("/dir", false, cx)
1802            })
1803            .await
1804            .unwrap();
1805        worktree_a
1806            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1807            .await;
1808        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1809        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1810        project_a
1811            .update(&mut cx_a, |p, cx| p.share(cx))
1812            .await
1813            .unwrap();
1814
1815        // Join that project as client B
1816        let project_b = Project::remote(
1817            project_id,
1818            client_b.clone(),
1819            client_b.user_store.clone(),
1820            lang_registry.clone(),
1821            fs.clone(),
1822            &mut cx_b.to_async(),
1823        )
1824        .await
1825        .unwrap();
1826        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1827
1828        // Open a buffer as client B
1829        let buffer_b = project_b
1830            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1831            .await
1832            .unwrap();
1833        buffer_b.read_with(&cx_b, |buf, _| {
1834            assert!(!buf.is_dirty());
1835            assert!(!buf.has_conflict());
1836        });
1837
1838        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1839            .await
1840            .unwrap();
1841        buffer_b
1842            .condition(&cx_b, |buf, _| {
1843                buf.text() == "new contents" && !buf.is_dirty()
1844            })
1845            .await;
1846        buffer_b.read_with(&cx_b, |buf, _| {
1847            assert!(!buf.has_conflict());
1848        });
1849    }
1850
1851    #[gpui::test(iterations = 100)]
1852    async fn test_editing_while_guest_opens_buffer(
1853        mut cx_a: TestAppContext,
1854        mut cx_b: TestAppContext,
1855    ) {
1856        cx_a.foreground().forbid_parking();
1857        let lang_registry = Arc::new(LanguageRegistry::new());
1858        let fs = Arc::new(FakeFs::new(cx_a.background()));
1859
1860        // Connect to a server as 2 clients.
1861        let mut server = TestServer::start(cx_a.foreground()).await;
1862        let client_a = server.create_client(&mut cx_a, "user_a").await;
1863        let client_b = server.create_client(&mut cx_b, "user_b").await;
1864
1865        // Share a project as client A
1866        fs.insert_tree(
1867            "/dir",
1868            json!({
1869                ".zed.toml": r#"collaborators = ["user_b"]"#,
1870                "a.txt": "a-contents",
1871            }),
1872        )
1873        .await;
1874        let project_a = cx_a.update(|cx| {
1875            Project::local(
1876                client_a.clone(),
1877                client_a.user_store.clone(),
1878                lang_registry.clone(),
1879                fs.clone(),
1880                cx,
1881            )
1882        });
1883        let (worktree_a, _) = project_a
1884            .update(&mut cx_a, |p, cx| {
1885                p.find_or_create_local_worktree("/dir", false, cx)
1886            })
1887            .await
1888            .unwrap();
1889        worktree_a
1890            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1891            .await;
1892        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1893        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1894        project_a
1895            .update(&mut cx_a, |p, cx| p.share(cx))
1896            .await
1897            .unwrap();
1898
1899        // Join that project as client B
1900        let project_b = Project::remote(
1901            project_id,
1902            client_b.clone(),
1903            client_b.user_store.clone(),
1904            lang_registry.clone(),
1905            fs.clone(),
1906            &mut cx_b.to_async(),
1907        )
1908        .await
1909        .unwrap();
1910
1911        // Open a buffer as client A
1912        let buffer_a = project_a
1913            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1914            .await
1915            .unwrap();
1916
1917        // Start opening the same buffer as client B
1918        let buffer_b = cx_b
1919            .background()
1920            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1921        task::yield_now().await;
1922
1923        // Edit the buffer as client A while client B is still opening it.
1924        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1925
1926        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1927        let buffer_b = buffer_b.await.unwrap();
1928        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1929    }
1930
1931    #[gpui::test]
1932    async fn test_leaving_worktree_while_opening_buffer(
1933        mut cx_a: TestAppContext,
1934        mut cx_b: TestAppContext,
1935    ) {
1936        cx_a.foreground().forbid_parking();
1937        let lang_registry = Arc::new(LanguageRegistry::new());
1938        let fs = Arc::new(FakeFs::new(cx_a.background()));
1939
1940        // Connect to a server as 2 clients.
1941        let mut server = TestServer::start(cx_a.foreground()).await;
1942        let client_a = server.create_client(&mut cx_a, "user_a").await;
1943        let client_b = server.create_client(&mut cx_b, "user_b").await;
1944
1945        // Share a project as client A
1946        fs.insert_tree(
1947            "/dir",
1948            json!({
1949                ".zed.toml": r#"collaborators = ["user_b"]"#,
1950                "a.txt": "a-contents",
1951            }),
1952        )
1953        .await;
1954        let project_a = cx_a.update(|cx| {
1955            Project::local(
1956                client_a.clone(),
1957                client_a.user_store.clone(),
1958                lang_registry.clone(),
1959                fs.clone(),
1960                cx,
1961            )
1962        });
1963        let (worktree_a, _) = project_a
1964            .update(&mut cx_a, |p, cx| {
1965                p.find_or_create_local_worktree("/dir", false, cx)
1966            })
1967            .await
1968            .unwrap();
1969        worktree_a
1970            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1971            .await;
1972        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1973        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1974        project_a
1975            .update(&mut cx_a, |p, cx| p.share(cx))
1976            .await
1977            .unwrap();
1978
1979        // Join that project as client B
1980        let project_b = Project::remote(
1981            project_id,
1982            client_b.clone(),
1983            client_b.user_store.clone(),
1984            lang_registry.clone(),
1985            fs.clone(),
1986            &mut cx_b.to_async(),
1987        )
1988        .await
1989        .unwrap();
1990
1991        // See that a guest has joined as client A.
1992        project_a
1993            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1994            .await;
1995
1996        // Begin opening a buffer as client B, but leave the project before the open completes.
1997        let buffer_b = cx_b
1998            .background()
1999            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2000        cx_b.update(|_| drop(project_b));
2001        drop(buffer_b);
2002
2003        // See that the guest has left.
2004        project_a
2005            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2006            .await;
2007    }
2008
2009    #[gpui::test]
2010    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2011        cx_a.foreground().forbid_parking();
2012        let lang_registry = Arc::new(LanguageRegistry::new());
2013        let fs = Arc::new(FakeFs::new(cx_a.background()));
2014
2015        // Connect to a server as 2 clients.
2016        let mut server = TestServer::start(cx_a.foreground()).await;
2017        let client_a = server.create_client(&mut cx_a, "user_a").await;
2018        let client_b = server.create_client(&mut cx_b, "user_b").await;
2019
2020        // Share a project as client A
2021        fs.insert_tree(
2022            "/a",
2023            json!({
2024                ".zed.toml": r#"collaborators = ["user_b"]"#,
2025                "a.txt": "a-contents",
2026                "b.txt": "b-contents",
2027            }),
2028        )
2029        .await;
2030        let project_a = cx_a.update(|cx| {
2031            Project::local(
2032                client_a.clone(),
2033                client_a.user_store.clone(),
2034                lang_registry.clone(),
2035                fs.clone(),
2036                cx,
2037            )
2038        });
2039        let (worktree_a, _) = project_a
2040            .update(&mut cx_a, |p, cx| {
2041                p.find_or_create_local_worktree("/a", false, cx)
2042            })
2043            .await
2044            .unwrap();
2045        worktree_a
2046            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2047            .await;
2048        let project_id = project_a
2049            .update(&mut cx_a, |project, _| project.next_remote_id())
2050            .await;
2051        project_a
2052            .update(&mut cx_a, |project, cx| project.share(cx))
2053            .await
2054            .unwrap();
2055
2056        // Join that project as client B
2057        let _project_b = Project::remote(
2058            project_id,
2059            client_b.clone(),
2060            client_b.user_store.clone(),
2061            lang_registry.clone(),
2062            fs.clone(),
2063            &mut cx_b.to_async(),
2064        )
2065        .await
2066        .unwrap();
2067
2068        // See that a guest has joined as client A.
2069        project_a
2070            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2071            .await;
2072
2073        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2074        client_b.disconnect(&cx_b.to_async()).unwrap();
2075        project_a
2076            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2077            .await;
2078    }
2079
2080    #[gpui::test]
2081    async fn test_collaborating_with_diagnostics(
2082        mut cx_a: TestAppContext,
2083        mut cx_b: TestAppContext,
2084    ) {
2085        cx_a.foreground().forbid_parking();
2086        let mut lang_registry = Arc::new(LanguageRegistry::new());
2087        let fs = Arc::new(FakeFs::new(cx_a.background()));
2088
2089        // Set up a fake language server.
2090        let (language_server_config, mut fake_language_server) =
2091            LanguageServerConfig::fake(cx_a.background()).await;
2092        Arc::get_mut(&mut lang_registry)
2093            .unwrap()
2094            .add(Arc::new(Language::new(
2095                LanguageConfig {
2096                    name: "Rust".to_string(),
2097                    path_suffixes: vec!["rs".to_string()],
2098                    language_server: Some(language_server_config),
2099                    ..Default::default()
2100                },
2101                Some(tree_sitter_rust::language()),
2102            )));
2103
2104        // Connect to a server as 2 clients.
2105        let mut server = TestServer::start(cx_a.foreground()).await;
2106        let client_a = server.create_client(&mut cx_a, "user_a").await;
2107        let client_b = server.create_client(&mut cx_b, "user_b").await;
2108
2109        // Share a project as client A
2110        fs.insert_tree(
2111            "/a",
2112            json!({
2113                ".zed.toml": r#"collaborators = ["user_b"]"#,
2114                "a.rs": "let one = two",
2115                "other.rs": "",
2116            }),
2117        )
2118        .await;
2119        let project_a = cx_a.update(|cx| {
2120            Project::local(
2121                client_a.clone(),
2122                client_a.user_store.clone(),
2123                lang_registry.clone(),
2124                fs.clone(),
2125                cx,
2126            )
2127        });
2128        let (worktree_a, _) = project_a
2129            .update(&mut cx_a, |p, cx| {
2130                p.find_or_create_local_worktree("/a", false, cx)
2131            })
2132            .await
2133            .unwrap();
2134        worktree_a
2135            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2136            .await;
2137        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2138        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2139        project_a
2140            .update(&mut cx_a, |p, cx| p.share(cx))
2141            .await
2142            .unwrap();
2143
2144        // Cause the language server to start.
2145        let _ = cx_a
2146            .background()
2147            .spawn(project_a.update(&mut cx_a, |project, cx| {
2148                project.open_buffer(
2149                    ProjectPath {
2150                        worktree_id,
2151                        path: Path::new("other.rs").into(),
2152                    },
2153                    cx,
2154                )
2155            }))
2156            .await
2157            .unwrap();
2158
2159        // Simulate a language server reporting errors for a file.
2160        fake_language_server
2161            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2162                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2163                version: None,
2164                diagnostics: vec![lsp::Diagnostic {
2165                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2166                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2167                    message: "message 1".to_string(),
2168                    ..Default::default()
2169                }],
2170            })
2171            .await;
2172
2173        // Wait for server to see the diagnostics update.
2174        server
2175            .condition(|store| {
2176                let worktree = store
2177                    .project(project_id)
2178                    .unwrap()
2179                    .worktrees
2180                    .get(&worktree_id.to_proto())
2181                    .unwrap();
2182
2183                !worktree
2184                    .share
2185                    .as_ref()
2186                    .unwrap()
2187                    .diagnostic_summaries
2188                    .is_empty()
2189            })
2190            .await;
2191
2192        // Join the worktree as client B.
2193        let project_b = Project::remote(
2194            project_id,
2195            client_b.clone(),
2196            client_b.user_store.clone(),
2197            lang_registry.clone(),
2198            fs.clone(),
2199            &mut cx_b.to_async(),
2200        )
2201        .await
2202        .unwrap();
2203
2204        project_b.read_with(&cx_b, |project, cx| {
2205            assert_eq!(
2206                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2207                &[(
2208                    ProjectPath {
2209                        worktree_id,
2210                        path: Arc::from(Path::new("a.rs")),
2211                    },
2212                    DiagnosticSummary {
2213                        error_count: 1,
2214                        warning_count: 0,
2215                        ..Default::default()
2216                    },
2217                )]
2218            )
2219        });
2220
2221        // Simulate a language server reporting more errors for a file.
2222        fake_language_server
2223            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2224                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2225                version: None,
2226                diagnostics: vec![
2227                    lsp::Diagnostic {
2228                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2229                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2230                        message: "message 1".to_string(),
2231                        ..Default::default()
2232                    },
2233                    lsp::Diagnostic {
2234                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2235                        range: lsp::Range::new(
2236                            lsp::Position::new(0, 10),
2237                            lsp::Position::new(0, 13),
2238                        ),
2239                        message: "message 2".to_string(),
2240                        ..Default::default()
2241                    },
2242                ],
2243            })
2244            .await;
2245
2246        // Client b gets the updated summaries
2247        project_b
2248            .condition(&cx_b, |project, cx| {
2249                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2250                    == &[(
2251                        ProjectPath {
2252                            worktree_id,
2253                            path: Arc::from(Path::new("a.rs")),
2254                        },
2255                        DiagnosticSummary {
2256                            error_count: 1,
2257                            warning_count: 1,
2258                            ..Default::default()
2259                        },
2260                    )]
2261            })
2262            .await;
2263
2264        // Open the file with the errors on client B. They should be present.
2265        let buffer_b = cx_b
2266            .background()
2267            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2268            .await
2269            .unwrap();
2270
2271        buffer_b.read_with(&cx_b, |buffer, _| {
2272            assert_eq!(
2273                buffer
2274                    .snapshot()
2275                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2276                    .map(|entry| entry)
2277                    .collect::<Vec<_>>(),
2278                &[
2279                    DiagnosticEntry {
2280                        range: Point::new(0, 4)..Point::new(0, 7),
2281                        diagnostic: Diagnostic {
2282                            group_id: 0,
2283                            message: "message 1".to_string(),
2284                            severity: lsp::DiagnosticSeverity::ERROR,
2285                            is_primary: true,
2286                            ..Default::default()
2287                        }
2288                    },
2289                    DiagnosticEntry {
2290                        range: Point::new(0, 10)..Point::new(0, 13),
2291                        diagnostic: Diagnostic {
2292                            group_id: 1,
2293                            severity: lsp::DiagnosticSeverity::WARNING,
2294                            message: "message 2".to_string(),
2295                            is_primary: true,
2296                            ..Default::default()
2297                        }
2298                    }
2299                ]
2300            );
2301        });
2302    }
2303
2304    #[gpui::test]
2305    async fn test_collaborating_with_completion(
2306        mut cx_a: TestAppContext,
2307        mut cx_b: TestAppContext,
2308    ) {
2309        cx_a.foreground().forbid_parking();
2310        let mut lang_registry = Arc::new(LanguageRegistry::new());
2311        let fs = Arc::new(FakeFs::new(cx_a.background()));
2312
2313        // Set up a fake language server.
2314        let (language_server_config, mut fake_language_server) =
2315            LanguageServerConfig::fake_with_capabilities(
2316                lsp::ServerCapabilities {
2317                    completion_provider: Some(lsp::CompletionOptions {
2318                        trigger_characters: Some(vec![".".to_string()]),
2319                        ..Default::default()
2320                    }),
2321                    ..Default::default()
2322                },
2323                cx_a.background(),
2324            )
2325            .await;
2326        Arc::get_mut(&mut lang_registry)
2327            .unwrap()
2328            .add(Arc::new(Language::new(
2329                LanguageConfig {
2330                    name: "Rust".to_string(),
2331                    path_suffixes: vec!["rs".to_string()],
2332                    language_server: Some(language_server_config),
2333                    ..Default::default()
2334                },
2335                Some(tree_sitter_rust::language()),
2336            )));
2337
2338        // Connect to a server as 2 clients.
2339        let mut server = TestServer::start(cx_a.foreground()).await;
2340        let client_a = server.create_client(&mut cx_a, "user_a").await;
2341        let client_b = server.create_client(&mut cx_b, "user_b").await;
2342
2343        // Share a project as client A
2344        fs.insert_tree(
2345            "/a",
2346            json!({
2347                ".zed.toml": r#"collaborators = ["user_b"]"#,
2348                "main.rs": "fn main() { a }",
2349                "other.rs": "",
2350            }),
2351        )
2352        .await;
2353        let project_a = cx_a.update(|cx| {
2354            Project::local(
2355                client_a.clone(),
2356                client_a.user_store.clone(),
2357                lang_registry.clone(),
2358                fs.clone(),
2359                cx,
2360            )
2361        });
2362        let (worktree_a, _) = project_a
2363            .update(&mut cx_a, |p, cx| {
2364                p.find_or_create_local_worktree("/a", false, cx)
2365            })
2366            .await
2367            .unwrap();
2368        worktree_a
2369            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2370            .await;
2371        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2372        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2373        project_a
2374            .update(&mut cx_a, |p, cx| p.share(cx))
2375            .await
2376            .unwrap();
2377
2378        // Join the worktree as client B.
2379        let project_b = Project::remote(
2380            project_id,
2381            client_b.clone(),
2382            client_b.user_store.clone(),
2383            lang_registry.clone(),
2384            fs.clone(),
2385            &mut cx_b.to_async(),
2386        )
2387        .await
2388        .unwrap();
2389
2390        // Open a file in an editor as the guest.
2391        let buffer_b = project_b
2392            .update(&mut cx_b, |p, cx| {
2393                p.open_buffer((worktree_id, "main.rs"), cx)
2394            })
2395            .await
2396            .unwrap();
2397        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2398        let editor_b = cx_b.add_view(window_b, |cx| {
2399            Editor::for_buffer(
2400                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2401                Arc::new(|cx| EditorSettings::test(cx)),
2402                cx,
2403            )
2404        });
2405
2406        // Type a completion trigger character as the guest.
2407        editor_b.update(&mut cx_b, |editor, cx| {
2408            editor.select_ranges([13..13], None, cx);
2409            editor.handle_input(&Input(".".into()), cx);
2410            cx.focus(&editor_b);
2411        });
2412
2413        // Receive a completion request as the host's language server.
2414        let (request_id, params) = fake_language_server
2415            .receive_request::<lsp::request::Completion>()
2416            .await;
2417        assert_eq!(
2418            params.text_document_position.text_document.uri,
2419            lsp::Url::from_file_path("/a/main.rs").unwrap(),
2420        );
2421        assert_eq!(
2422            params.text_document_position.position,
2423            lsp::Position::new(0, 14),
2424        );
2425
2426        // Return some completions from the host's language server.
2427        fake_language_server
2428            .respond(
2429                request_id,
2430                Some(lsp::CompletionResponse::Array(vec![
2431                    lsp::CompletionItem {
2432                        label: "first_method(…)".into(),
2433                        detail: Some("fn(&mut self, B) -> C".into()),
2434                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2435                            new_text: "first_method($1)".to_string(),
2436                            range: lsp::Range::new(
2437                                lsp::Position::new(0, 14),
2438                                lsp::Position::new(0, 14),
2439                            ),
2440                        })),
2441                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2442                        ..Default::default()
2443                    },
2444                    lsp::CompletionItem {
2445                        label: "second_method(…)".into(),
2446                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2447                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2448                            new_text: "second_method()".to_string(),
2449                            range: lsp::Range::new(
2450                                lsp::Position::new(0, 14),
2451                                lsp::Position::new(0, 14),
2452                            ),
2453                        })),
2454                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2455                        ..Default::default()
2456                    },
2457                ])),
2458            )
2459            .await;
2460
2461        // Open the buffer on the host.
2462        let buffer_a = project_a
2463            .update(&mut cx_a, |p, cx| {
2464                p.open_buffer((worktree_id, "main.rs"), cx)
2465            })
2466            .await
2467            .unwrap();
2468        buffer_a
2469            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2470            .await;
2471
2472        // Confirm a completion on the guest.
2473        editor_b.next_notification(&cx_b).await;
2474        editor_b.update(&mut cx_b, |editor, cx| {
2475            assert!(editor.showing_context_menu());
2476            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2477            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2478        });
2479
2480        buffer_a
2481            .condition(&cx_a, |buffer, _| {
2482                buffer.text() == "fn main() { a.first_method() }"
2483            })
2484            .await;
2485
2486        // Receive a request resolve the selected completion on the host's language server.
2487        let (request_id, params) = fake_language_server
2488            .receive_request::<lsp::request::ResolveCompletionItem>()
2489            .await;
2490        assert_eq!(params.label, "first_method(…)");
2491
2492        // Return a resolved completion from the host's language server.
2493        // The resolved completion has an additional text edit.
2494        fake_language_server
2495            .respond(
2496                request_id,
2497                lsp::CompletionItem {
2498                    label: "first_method(…)".into(),
2499                    detail: Some("fn(&mut self, B) -> C".into()),
2500                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2501                        new_text: "first_method($1)".to_string(),
2502                        range: lsp::Range::new(
2503                            lsp::Position::new(0, 14),
2504                            lsp::Position::new(0, 14),
2505                        ),
2506                    })),
2507                    additional_text_edits: Some(vec![lsp::TextEdit {
2508                        new_text: "use d::SomeTrait;\n".to_string(),
2509                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2510                    }]),
2511                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2512                    ..Default::default()
2513                },
2514            )
2515            .await;
2516
2517        // The additional edit is applied.
2518        buffer_b
2519            .condition(&cx_b, |buffer, _| {
2520                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2521            })
2522            .await;
2523        assert_eq!(
2524            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2525            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2526        );
2527    }
2528
2529    #[gpui::test]
2530    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2531        cx_a.foreground().forbid_parking();
2532        let mut lang_registry = Arc::new(LanguageRegistry::new());
2533        let fs = Arc::new(FakeFs::new(cx_a.background()));
2534
2535        // Set up a fake language server.
2536        let (language_server_config, mut fake_language_server) =
2537            LanguageServerConfig::fake(cx_a.background()).await;
2538        Arc::get_mut(&mut lang_registry)
2539            .unwrap()
2540            .add(Arc::new(Language::new(
2541                LanguageConfig {
2542                    name: "Rust".to_string(),
2543                    path_suffixes: vec!["rs".to_string()],
2544                    language_server: Some(language_server_config),
2545                    ..Default::default()
2546                },
2547                Some(tree_sitter_rust::language()),
2548            )));
2549
2550        // Connect to a server as 2 clients.
2551        let mut server = TestServer::start(cx_a.foreground()).await;
2552        let client_a = server.create_client(&mut cx_a, "user_a").await;
2553        let client_b = server.create_client(&mut cx_b, "user_b").await;
2554
2555        // Share a project as client A
2556        fs.insert_tree(
2557            "/a",
2558            json!({
2559                ".zed.toml": r#"collaborators = ["user_b"]"#,
2560                "a.rs": "let one = two",
2561            }),
2562        )
2563        .await;
2564        let project_a = cx_a.update(|cx| {
2565            Project::local(
2566                client_a.clone(),
2567                client_a.user_store.clone(),
2568                lang_registry.clone(),
2569                fs.clone(),
2570                cx,
2571            )
2572        });
2573        let (worktree_a, _) = project_a
2574            .update(&mut cx_a, |p, cx| {
2575                p.find_or_create_local_worktree("/a", false, cx)
2576            })
2577            .await
2578            .unwrap();
2579        worktree_a
2580            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2581            .await;
2582        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2583        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2584        project_a
2585            .update(&mut cx_a, |p, cx| p.share(cx))
2586            .await
2587            .unwrap();
2588
2589        // Join the worktree as client B.
2590        let project_b = Project::remote(
2591            project_id,
2592            client_b.clone(),
2593            client_b.user_store.clone(),
2594            lang_registry.clone(),
2595            fs.clone(),
2596            &mut cx_b.to_async(),
2597        )
2598        .await
2599        .unwrap();
2600
2601        let buffer_b = cx_b
2602            .background()
2603            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2604            .await
2605            .unwrap();
2606
2607        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2608        let (request_id, _) = fake_language_server
2609            .receive_request::<lsp::request::Formatting>()
2610            .await;
2611        fake_language_server
2612            .respond(
2613                request_id,
2614                Some(vec![
2615                    lsp::TextEdit {
2616                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2617                        new_text: "h".to_string(),
2618                    },
2619                    lsp::TextEdit {
2620                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2621                        new_text: "y".to_string(),
2622                    },
2623                ]),
2624            )
2625            .await;
2626        format.await.unwrap();
2627        assert_eq!(
2628            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2629            "let honey = two"
2630        );
2631    }
2632
2633    #[gpui::test]
2634    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2635        cx_a.foreground().forbid_parking();
2636        let mut lang_registry = Arc::new(LanguageRegistry::new());
2637        let fs = Arc::new(FakeFs::new(cx_a.background()));
2638        fs.insert_tree(
2639            "/root-1",
2640            json!({
2641                ".zed.toml": r#"collaborators = ["user_b"]"#,
2642                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2643            }),
2644        )
2645        .await;
2646        fs.insert_tree(
2647            "/root-2",
2648            json!({
2649                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2650            }),
2651        )
2652        .await;
2653
2654        // Set up a fake language server.
2655        let (language_server_config, mut fake_language_server) =
2656            LanguageServerConfig::fake(cx_a.background()).await;
2657        Arc::get_mut(&mut lang_registry)
2658            .unwrap()
2659            .add(Arc::new(Language::new(
2660                LanguageConfig {
2661                    name: "Rust".to_string(),
2662                    path_suffixes: vec!["rs".to_string()],
2663                    language_server: Some(language_server_config),
2664                    ..Default::default()
2665                },
2666                Some(tree_sitter_rust::language()),
2667            )));
2668
2669        // Connect to a server as 2 clients.
2670        let mut server = TestServer::start(cx_a.foreground()).await;
2671        let client_a = server.create_client(&mut cx_a, "user_a").await;
2672        let client_b = server.create_client(&mut cx_b, "user_b").await;
2673
2674        // Share a project as client A
2675        let project_a = cx_a.update(|cx| {
2676            Project::local(
2677                client_a.clone(),
2678                client_a.user_store.clone(),
2679                lang_registry.clone(),
2680                fs.clone(),
2681                cx,
2682            )
2683        });
2684        let (worktree_a, _) = project_a
2685            .update(&mut cx_a, |p, cx| {
2686                p.find_or_create_local_worktree("/root-1", false, cx)
2687            })
2688            .await
2689            .unwrap();
2690        worktree_a
2691            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2692            .await;
2693        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2694        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2695        project_a
2696            .update(&mut cx_a, |p, cx| p.share(cx))
2697            .await
2698            .unwrap();
2699
2700        // Join the worktree as client B.
2701        let project_b = Project::remote(
2702            project_id,
2703            client_b.clone(),
2704            client_b.user_store.clone(),
2705            lang_registry.clone(),
2706            fs.clone(),
2707            &mut cx_b.to_async(),
2708        )
2709        .await
2710        .unwrap();
2711
2712        // Open the file to be formatted on client B.
2713        let buffer_b = cx_b
2714            .background()
2715            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2716            .await
2717            .unwrap();
2718
2719        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2720        let (request_id, _) = fake_language_server
2721            .receive_request::<lsp::request::GotoDefinition>()
2722            .await;
2723        fake_language_server
2724            .respond(
2725                request_id,
2726                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2727                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2728                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2729                ))),
2730            )
2731            .await;
2732        let definitions_1 = definitions_1.await.unwrap();
2733        cx_b.read(|cx| {
2734            assert_eq!(definitions_1.len(), 1);
2735            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2736            let target_buffer = definitions_1[0].target_buffer.read(cx);
2737            assert_eq!(
2738                target_buffer.text(),
2739                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2740            );
2741            assert_eq!(
2742                definitions_1[0].target_range.to_point(target_buffer),
2743                Point::new(0, 6)..Point::new(0, 9)
2744            );
2745        });
2746
2747        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2748        // the previous call to `definition`.
2749        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2750        let (request_id, _) = fake_language_server
2751            .receive_request::<lsp::request::GotoDefinition>()
2752            .await;
2753        fake_language_server
2754            .respond(
2755                request_id,
2756                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2757                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2758                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2759                ))),
2760            )
2761            .await;
2762        let definitions_2 = definitions_2.await.unwrap();
2763        cx_b.read(|cx| {
2764            assert_eq!(definitions_2.len(), 1);
2765            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2766            let target_buffer = definitions_2[0].target_buffer.read(cx);
2767            assert_eq!(
2768                target_buffer.text(),
2769                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2770            );
2771            assert_eq!(
2772                definitions_2[0].target_range.to_point(target_buffer),
2773                Point::new(1, 6)..Point::new(1, 11)
2774            );
2775        });
2776        assert_eq!(
2777            definitions_1[0].target_buffer,
2778            definitions_2[0].target_buffer
2779        );
2780
2781        cx_b.update(|_| {
2782            drop(definitions_1);
2783            drop(definitions_2);
2784        });
2785        project_b
2786            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2787            .await;
2788    }
2789
2790    #[gpui::test]
2791    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2792        mut cx_a: TestAppContext,
2793        mut cx_b: TestAppContext,
2794        mut rng: StdRng,
2795    ) {
2796        cx_a.foreground().forbid_parking();
2797        let mut lang_registry = Arc::new(LanguageRegistry::new());
2798        let fs = Arc::new(FakeFs::new(cx_a.background()));
2799        fs.insert_tree(
2800            "/root",
2801            json!({
2802                ".zed.toml": r#"collaborators = ["user_b"]"#,
2803                "a.rs": "const ONE: usize = b::TWO;",
2804                "b.rs": "const TWO: usize = 2",
2805            }),
2806        )
2807        .await;
2808
2809        // Set up a fake language server.
2810        let (language_server_config, mut fake_language_server) =
2811            LanguageServerConfig::fake(cx_a.background()).await;
2812        Arc::get_mut(&mut lang_registry)
2813            .unwrap()
2814            .add(Arc::new(Language::new(
2815                LanguageConfig {
2816                    name: "Rust".to_string(),
2817                    path_suffixes: vec!["rs".to_string()],
2818                    language_server: Some(language_server_config),
2819                    ..Default::default()
2820                },
2821                Some(tree_sitter_rust::language()),
2822            )));
2823
2824        // Connect to a server as 2 clients.
2825        let mut server = TestServer::start(cx_a.foreground()).await;
2826        let client_a = server.create_client(&mut cx_a, "user_a").await;
2827        let client_b = server.create_client(&mut cx_b, "user_b").await;
2828
2829        // Share a project as client A
2830        let project_a = cx_a.update(|cx| {
2831            Project::local(
2832                client_a.clone(),
2833                client_a.user_store.clone(),
2834                lang_registry.clone(),
2835                fs.clone(),
2836                cx,
2837            )
2838        });
2839        let (worktree_a, _) = project_a
2840            .update(&mut cx_a, |p, cx| {
2841                p.find_or_create_local_worktree("/root", false, cx)
2842            })
2843            .await
2844            .unwrap();
2845        worktree_a
2846            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2847            .await;
2848        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2849        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2850        project_a
2851            .update(&mut cx_a, |p, cx| p.share(cx))
2852            .await
2853            .unwrap();
2854
2855        // Join the worktree as client B.
2856        let project_b = Project::remote(
2857            project_id,
2858            client_b.clone(),
2859            client_b.user_store.clone(),
2860            lang_registry.clone(),
2861            fs.clone(),
2862            &mut cx_b.to_async(),
2863        )
2864        .await
2865        .unwrap();
2866
2867        let buffer_b1 = cx_b
2868            .background()
2869            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2870            .await
2871            .unwrap();
2872
2873        let definitions;
2874        let buffer_b2;
2875        if rng.gen() {
2876            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2877            buffer_b2 =
2878                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2879        } else {
2880            buffer_b2 =
2881                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2882            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2883        }
2884
2885        let (request_id, _) = fake_language_server
2886            .receive_request::<lsp::request::GotoDefinition>()
2887            .await;
2888        fake_language_server
2889            .respond(
2890                request_id,
2891                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2892                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2893                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2894                ))),
2895            )
2896            .await;
2897
2898        let buffer_b2 = buffer_b2.await.unwrap();
2899        let definitions = definitions.await.unwrap();
2900        assert_eq!(definitions.len(), 1);
2901        assert_eq!(definitions[0].target_buffer, buffer_b2);
2902    }
2903
2904    #[gpui::test]
2905    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2906        cx_a.foreground().forbid_parking();
2907
2908        // Connect to a server as 2 clients.
2909        let mut server = TestServer::start(cx_a.foreground()).await;
2910        let client_a = server.create_client(&mut cx_a, "user_a").await;
2911        let client_b = server.create_client(&mut cx_b, "user_b").await;
2912
2913        // Create an org that includes these 2 users.
2914        let db = &server.app_state.db;
2915        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2916        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2917            .await
2918            .unwrap();
2919        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2920            .await
2921            .unwrap();
2922
2923        // Create a channel that includes all the users.
2924        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2925        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2926            .await
2927            .unwrap();
2928        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2929            .await
2930            .unwrap();
2931        db.create_channel_message(
2932            channel_id,
2933            client_b.current_user_id(&cx_b),
2934            "hello A, it's B.",
2935            OffsetDateTime::now_utc(),
2936            1,
2937        )
2938        .await
2939        .unwrap();
2940
2941        let channels_a = cx_a
2942            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2943        channels_a
2944            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2945            .await;
2946        channels_a.read_with(&cx_a, |list, _| {
2947            assert_eq!(
2948                list.available_channels().unwrap(),
2949                &[ChannelDetails {
2950                    id: channel_id.to_proto(),
2951                    name: "test-channel".to_string()
2952                }]
2953            )
2954        });
2955        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2956            this.get_channel(channel_id.to_proto(), cx).unwrap()
2957        });
2958        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2959        channel_a
2960            .condition(&cx_a, |channel, _| {
2961                channel_messages(channel)
2962                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2963            })
2964            .await;
2965
2966        let channels_b = cx_b
2967            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2968        channels_b
2969            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2970            .await;
2971        channels_b.read_with(&cx_b, |list, _| {
2972            assert_eq!(
2973                list.available_channels().unwrap(),
2974                &[ChannelDetails {
2975                    id: channel_id.to_proto(),
2976                    name: "test-channel".to_string()
2977                }]
2978            )
2979        });
2980
2981        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2982            this.get_channel(channel_id.to_proto(), cx).unwrap()
2983        });
2984        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2985        channel_b
2986            .condition(&cx_b, |channel, _| {
2987                channel_messages(channel)
2988                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2989            })
2990            .await;
2991
2992        channel_a
2993            .update(&mut cx_a, |channel, cx| {
2994                channel
2995                    .send_message("oh, hi B.".to_string(), cx)
2996                    .unwrap()
2997                    .detach();
2998                let task = channel.send_message("sup".to_string(), cx).unwrap();
2999                assert_eq!(
3000                    channel_messages(channel),
3001                    &[
3002                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3003                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3004                        ("user_a".to_string(), "sup".to_string(), true)
3005                    ]
3006                );
3007                task
3008            })
3009            .await
3010            .unwrap();
3011
3012        channel_b
3013            .condition(&cx_b, |channel, _| {
3014                channel_messages(channel)
3015                    == [
3016                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3017                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3018                        ("user_a".to_string(), "sup".to_string(), false),
3019                    ]
3020            })
3021            .await;
3022
3023        assert_eq!(
3024            server
3025                .state()
3026                .await
3027                .channel(channel_id)
3028                .unwrap()
3029                .connection_ids
3030                .len(),
3031            2
3032        );
3033        cx_b.update(|_| drop(channel_b));
3034        server
3035            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3036            .await;
3037
3038        cx_a.update(|_| drop(channel_a));
3039        server
3040            .condition(|state| state.channel(channel_id).is_none())
3041            .await;
3042    }
3043
3044    #[gpui::test]
3045    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3046        cx_a.foreground().forbid_parking();
3047
3048        let mut server = TestServer::start(cx_a.foreground()).await;
3049        let client_a = server.create_client(&mut cx_a, "user_a").await;
3050
3051        let db = &server.app_state.db;
3052        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3053        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3054        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3055            .await
3056            .unwrap();
3057        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3058            .await
3059            .unwrap();
3060
3061        let channels_a = cx_a
3062            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3063        channels_a
3064            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3065            .await;
3066        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3067            this.get_channel(channel_id.to_proto(), cx).unwrap()
3068        });
3069
3070        // Messages aren't allowed to be too long.
3071        channel_a
3072            .update(&mut cx_a, |channel, cx| {
3073                let long_body = "this is long.\n".repeat(1024);
3074                channel.send_message(long_body, cx).unwrap()
3075            })
3076            .await
3077            .unwrap_err();
3078
3079        // Messages aren't allowed to be blank.
3080        channel_a.update(&mut cx_a, |channel, cx| {
3081            channel.send_message(String::new(), cx).unwrap_err()
3082        });
3083
3084        // Leading and trailing whitespace are trimmed.
3085        channel_a
3086            .update(&mut cx_a, |channel, cx| {
3087                channel
3088                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3089                    .unwrap()
3090            })
3091            .await
3092            .unwrap();
3093        assert_eq!(
3094            db.get_channel_messages(channel_id, 10, None)
3095                .await
3096                .unwrap()
3097                .iter()
3098                .map(|m| &m.body)
3099                .collect::<Vec<_>>(),
3100            &["surrounded by whitespace"]
3101        );
3102    }
3103
3104    #[gpui::test]
3105    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3106        cx_a.foreground().forbid_parking();
3107
3108        // Connect to a server as 2 clients.
3109        let mut server = TestServer::start(cx_a.foreground()).await;
3110        let client_a = server.create_client(&mut cx_a, "user_a").await;
3111        let client_b = server.create_client(&mut cx_b, "user_b").await;
3112        let mut status_b = client_b.status();
3113
3114        // Create an org that includes these 2 users.
3115        let db = &server.app_state.db;
3116        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3117        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3118            .await
3119            .unwrap();
3120        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3121            .await
3122            .unwrap();
3123
3124        // Create a channel that includes all the users.
3125        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3126        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3127            .await
3128            .unwrap();
3129        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3130            .await
3131            .unwrap();
3132        db.create_channel_message(
3133            channel_id,
3134            client_b.current_user_id(&cx_b),
3135            "hello A, it's B.",
3136            OffsetDateTime::now_utc(),
3137            2,
3138        )
3139        .await
3140        .unwrap();
3141
3142        let channels_a = cx_a
3143            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3144        channels_a
3145            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3146            .await;
3147
3148        channels_a.read_with(&cx_a, |list, _| {
3149            assert_eq!(
3150                list.available_channels().unwrap(),
3151                &[ChannelDetails {
3152                    id: channel_id.to_proto(),
3153                    name: "test-channel".to_string()
3154                }]
3155            )
3156        });
3157        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3158            this.get_channel(channel_id.to_proto(), cx).unwrap()
3159        });
3160        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3161        channel_a
3162            .condition(&cx_a, |channel, _| {
3163                channel_messages(channel)
3164                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3165            })
3166            .await;
3167
3168        let channels_b = cx_b
3169            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3170        channels_b
3171            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3172            .await;
3173        channels_b.read_with(&cx_b, |list, _| {
3174            assert_eq!(
3175                list.available_channels().unwrap(),
3176                &[ChannelDetails {
3177                    id: channel_id.to_proto(),
3178                    name: "test-channel".to_string()
3179                }]
3180            )
3181        });
3182
3183        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3184            this.get_channel(channel_id.to_proto(), cx).unwrap()
3185        });
3186        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3187        channel_b
3188            .condition(&cx_b, |channel, _| {
3189                channel_messages(channel)
3190                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3191            })
3192            .await;
3193
3194        // Disconnect client B, ensuring we can still access its cached channel data.
3195        server.forbid_connections();
3196        server.disconnect_client(client_b.current_user_id(&cx_b));
3197        while !matches!(
3198            status_b.next().await,
3199            Some(client::Status::ReconnectionError { .. })
3200        ) {}
3201
3202        channels_b.read_with(&cx_b, |channels, _| {
3203            assert_eq!(
3204                channels.available_channels().unwrap(),
3205                [ChannelDetails {
3206                    id: channel_id.to_proto(),
3207                    name: "test-channel".to_string()
3208                }]
3209            )
3210        });
3211        channel_b.read_with(&cx_b, |channel, _| {
3212            assert_eq!(
3213                channel_messages(channel),
3214                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3215            )
3216        });
3217
3218        // Send a message from client B while it is disconnected.
3219        channel_b
3220            .update(&mut cx_b, |channel, cx| {
3221                let task = channel
3222                    .send_message("can you see this?".to_string(), cx)
3223                    .unwrap();
3224                assert_eq!(
3225                    channel_messages(channel),
3226                    &[
3227                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3228                        ("user_b".to_string(), "can you see this?".to_string(), true)
3229                    ]
3230                );
3231                task
3232            })
3233            .await
3234            .unwrap_err();
3235
3236        // Send a message from client A while B is disconnected.
3237        channel_a
3238            .update(&mut cx_a, |channel, cx| {
3239                channel
3240                    .send_message("oh, hi B.".to_string(), cx)
3241                    .unwrap()
3242                    .detach();
3243                let task = channel.send_message("sup".to_string(), cx).unwrap();
3244                assert_eq!(
3245                    channel_messages(channel),
3246                    &[
3247                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3248                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3249                        ("user_a".to_string(), "sup".to_string(), true)
3250                    ]
3251                );
3252                task
3253            })
3254            .await
3255            .unwrap();
3256
3257        // Give client B a chance to reconnect.
3258        server.allow_connections();
3259        cx_b.foreground().advance_clock(Duration::from_secs(10));
3260
3261        // Verify that B sees the new messages upon reconnection, as well as the message client B
3262        // sent while offline.
3263        channel_b
3264            .condition(&cx_b, |channel, _| {
3265                channel_messages(channel)
3266                    == [
3267                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3268                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3269                        ("user_a".to_string(), "sup".to_string(), false),
3270                        ("user_b".to_string(), "can you see this?".to_string(), false),
3271                    ]
3272            })
3273            .await;
3274
3275        // Ensure client A and B can communicate normally after reconnection.
3276        channel_a
3277            .update(&mut cx_a, |channel, cx| {
3278                channel.send_message("you online?".to_string(), cx).unwrap()
3279            })
3280            .await
3281            .unwrap();
3282        channel_b
3283            .condition(&cx_b, |channel, _| {
3284                channel_messages(channel)
3285                    == [
3286                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3287                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3288                        ("user_a".to_string(), "sup".to_string(), false),
3289                        ("user_b".to_string(), "can you see this?".to_string(), false),
3290                        ("user_a".to_string(), "you online?".to_string(), false),
3291                    ]
3292            })
3293            .await;
3294
3295        channel_b
3296            .update(&mut cx_b, |channel, cx| {
3297                channel.send_message("yep".to_string(), cx).unwrap()
3298            })
3299            .await
3300            .unwrap();
3301        channel_a
3302            .condition(&cx_a, |channel, _| {
3303                channel_messages(channel)
3304                    == [
3305                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3306                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3307                        ("user_a".to_string(), "sup".to_string(), false),
3308                        ("user_b".to_string(), "can you see this?".to_string(), false),
3309                        ("user_a".to_string(), "you online?".to_string(), false),
3310                        ("user_b".to_string(), "yep".to_string(), false),
3311                    ]
3312            })
3313            .await;
3314    }
3315
3316    #[gpui::test]
3317    async fn test_contacts(
3318        mut cx_a: TestAppContext,
3319        mut cx_b: TestAppContext,
3320        mut cx_c: TestAppContext,
3321    ) {
3322        cx_a.foreground().forbid_parking();
3323        let lang_registry = Arc::new(LanguageRegistry::new());
3324        let fs = Arc::new(FakeFs::new(cx_a.background()));
3325
3326        // Connect to a server as 3 clients.
3327        let mut server = TestServer::start(cx_a.foreground()).await;
3328        let client_a = server.create_client(&mut cx_a, "user_a").await;
3329        let client_b = server.create_client(&mut cx_b, "user_b").await;
3330        let client_c = server.create_client(&mut cx_c, "user_c").await;
3331
3332        // Share a worktree as client A.
3333        fs.insert_tree(
3334            "/a",
3335            json!({
3336                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3337            }),
3338        )
3339        .await;
3340
3341        let project_a = cx_a.update(|cx| {
3342            Project::local(
3343                client_a.clone(),
3344                client_a.user_store.clone(),
3345                lang_registry.clone(),
3346                fs.clone(),
3347                cx,
3348            )
3349        });
3350        let (worktree_a, _) = project_a
3351            .update(&mut cx_a, |p, cx| {
3352                p.find_or_create_local_worktree("/a", false, cx)
3353            })
3354            .await
3355            .unwrap();
3356        worktree_a
3357            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3358            .await;
3359
3360        client_a
3361            .user_store
3362            .condition(&cx_a, |user_store, _| {
3363                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3364            })
3365            .await;
3366        client_b
3367            .user_store
3368            .condition(&cx_b, |user_store, _| {
3369                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3370            })
3371            .await;
3372        client_c
3373            .user_store
3374            .condition(&cx_c, |user_store, _| {
3375                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3376            })
3377            .await;
3378
3379        let project_id = project_a
3380            .update(&mut cx_a, |project, _| project.next_remote_id())
3381            .await;
3382        project_a
3383            .update(&mut cx_a, |project, cx| project.share(cx))
3384            .await
3385            .unwrap();
3386
3387        let _project_b = Project::remote(
3388            project_id,
3389            client_b.clone(),
3390            client_b.user_store.clone(),
3391            lang_registry.clone(),
3392            fs.clone(),
3393            &mut cx_b.to_async(),
3394        )
3395        .await
3396        .unwrap();
3397
3398        client_a
3399            .user_store
3400            .condition(&cx_a, |user_store, _| {
3401                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3402            })
3403            .await;
3404        client_b
3405            .user_store
3406            .condition(&cx_b, |user_store, _| {
3407                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3408            })
3409            .await;
3410        client_c
3411            .user_store
3412            .condition(&cx_c, |user_store, _| {
3413                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3414            })
3415            .await;
3416
3417        project_a
3418            .condition(&cx_a, |project, _| {
3419                project.collaborators().contains_key(&client_b.peer_id)
3420            })
3421            .await;
3422
3423        cx_a.update(move |_| drop(project_a));
3424        client_a
3425            .user_store
3426            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3427            .await;
3428        client_b
3429            .user_store
3430            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3431            .await;
3432        client_c
3433            .user_store
3434            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3435            .await;
3436
3437        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3438            user_store
3439                .contacts()
3440                .iter()
3441                .map(|contact| {
3442                    let worktrees = contact
3443                        .projects
3444                        .iter()
3445                        .map(|p| {
3446                            (
3447                                p.worktree_root_names[0].as_str(),
3448                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3449                            )
3450                        })
3451                        .collect();
3452                    (contact.user.github_login.as_str(), worktrees)
3453                })
3454                .collect()
3455        }
3456    }
3457
3458    struct TestServer {
3459        peer: Arc<Peer>,
3460        app_state: Arc<AppState>,
3461        server: Arc<Server>,
3462        foreground: Rc<executor::Foreground>,
3463        notifications: mpsc::Receiver<()>,
3464        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3465        forbid_connections: Arc<AtomicBool>,
3466        _test_db: TestDb,
3467    }
3468
3469    impl TestServer {
3470        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3471            let test_db = TestDb::new();
3472            let app_state = Self::build_app_state(&test_db).await;
3473            let peer = Peer::new();
3474            let notifications = mpsc::channel(128);
3475            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3476            Self {
3477                peer,
3478                app_state,
3479                server,
3480                foreground,
3481                notifications: notifications.1,
3482                connection_killers: Default::default(),
3483                forbid_connections: Default::default(),
3484                _test_db: test_db,
3485            }
3486        }
3487
3488        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3489            let http = FakeHttpClient::with_404_response();
3490            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3491            let client_name = name.to_string();
3492            let mut client = Client::new(http.clone());
3493            let server = self.server.clone();
3494            let connection_killers = self.connection_killers.clone();
3495            let forbid_connections = self.forbid_connections.clone();
3496            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3497
3498            Arc::get_mut(&mut client)
3499                .unwrap()
3500                .override_authenticate(move |cx| {
3501                    cx.spawn(|_| async move {
3502                        let access_token = "the-token".to_string();
3503                        Ok(Credentials {
3504                            user_id: user_id.0 as u64,
3505                            access_token,
3506                        })
3507                    })
3508                })
3509                .override_establish_connection(move |credentials, cx| {
3510                    assert_eq!(credentials.user_id, user_id.0 as u64);
3511                    assert_eq!(credentials.access_token, "the-token");
3512
3513                    let server = server.clone();
3514                    let connection_killers = connection_killers.clone();
3515                    let forbid_connections = forbid_connections.clone();
3516                    let client_name = client_name.clone();
3517                    let connection_id_tx = connection_id_tx.clone();
3518                    cx.spawn(move |cx| async move {
3519                        if forbid_connections.load(SeqCst) {
3520                            Err(EstablishConnectionError::other(anyhow!(
3521                                "server is forbidding connections"
3522                            )))
3523                        } else {
3524                            let (client_conn, server_conn, kill_conn) =
3525                                Connection::in_memory(cx.background());
3526                            connection_killers.lock().insert(user_id, kill_conn);
3527                            cx.background()
3528                                .spawn(server.handle_connection(
3529                                    server_conn,
3530                                    client_name,
3531                                    user_id,
3532                                    Some(connection_id_tx),
3533                                ))
3534                                .detach();
3535                            Ok(client_conn)
3536                        }
3537                    })
3538                });
3539
3540            client
3541                .authenticate_and_connect(&cx.to_async())
3542                .await
3543                .unwrap();
3544
3545            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3546            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3547            let mut authed_user =
3548                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3549            while authed_user.next().await.unwrap().is_none() {}
3550
3551            TestClient {
3552                client,
3553                peer_id,
3554                user_store,
3555            }
3556        }
3557
3558        fn disconnect_client(&self, user_id: UserId) {
3559            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3560                let _ = kill_conn.try_send(Some(()));
3561            }
3562        }
3563
3564        fn forbid_connections(&self) {
3565            self.forbid_connections.store(true, SeqCst);
3566        }
3567
3568        fn allow_connections(&self) {
3569            self.forbid_connections.store(false, SeqCst);
3570        }
3571
3572        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3573            let mut config = Config::default();
3574            config.session_secret = "a".repeat(32);
3575            config.database_url = test_db.url.clone();
3576            let github_client = github::AppClient::test();
3577            Arc::new(AppState {
3578                db: test_db.db().clone(),
3579                handlebars: Default::default(),
3580                auth_client: auth::build_client("", ""),
3581                repo_client: github::RepoClient::test(&github_client),
3582                github_client,
3583                config,
3584            })
3585        }
3586
3587        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3588            self.server.store.read()
3589        }
3590
3591        async fn condition<F>(&mut self, mut predicate: F)
3592        where
3593            F: FnMut(&Store) -> bool,
3594        {
3595            async_std::future::timeout(Duration::from_millis(500), async {
3596                while !(predicate)(&*self.server.store.read()) {
3597                    self.foreground.start_waiting();
3598                    self.notifications.next().await;
3599                    self.foreground.finish_waiting();
3600                }
3601            })
3602            .await
3603            .expect("condition timed out");
3604        }
3605    }
3606
3607    impl Drop for TestServer {
3608        fn drop(&mut self) {
3609            self.peer.reset();
3610        }
3611    }
3612
3613    struct TestClient {
3614        client: Arc<Client>,
3615        pub peer_id: PeerId,
3616        pub user_store: ModelHandle<UserStore>,
3617    }
3618
3619    impl Deref for TestClient {
3620        type Target = Arc<Client>;
3621
3622        fn deref(&self) -> &Self::Target {
3623            &self.client
3624        }
3625    }
3626
3627    impl TestClient {
3628        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3629            UserId::from_proto(
3630                self.user_store
3631                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3632            )
3633        }
3634    }
3635
3636    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3637        channel
3638            .messages()
3639            .cursor::<()>()
3640            .map(|m| {
3641                (
3642                    m.sender.github_login.clone(),
3643                    m.body.clone(),
3644                    m.is_pending(),
3645                )
3646            })
3647            .collect()
3648    }
3649
3650    struct EmptyView;
3651
3652    impl gpui::Entity for EmptyView {
3653        type Event = ();
3654    }
3655
3656    impl gpui::View for EmptyView {
3657        fn ui_name() -> &'static str {
3658            "empty view"
3659        }
3660
3661        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3662            gpui::Element::boxed(gpui::elements::Empty)
3663        }
3664    }
3665}