rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_channels)
  89            .add_handler(Server::get_users)
  90            .add_handler(Server::join_channel)
  91            .add_handler(Server::leave_channel)
  92            .add_handler(Server::send_channel_message)
  93            .add_handler(Server::get_channel_messages);
  94
  95        Arc::new(server)
  96    }
  97
  98    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  99    where
 100        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 101        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 102        M: EnvelopedMessage,
 103    {
 104        let prev_handler = self.handlers.insert(
 105            TypeId::of::<M>(),
 106            Box::new(move |server, envelope| {
 107                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 108                (handler)(server, *envelope).boxed()
 109            }),
 110        );
 111        if prev_handler.is_some() {
 112            panic!("registered a handler for the same message twice");
 113        }
 114        self
 115    }
 116
 117    pub fn handle_connection(
 118        self: &Arc<Self>,
 119        connection: Connection,
 120        addr: String,
 121        user_id: UserId,
 122        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 123    ) -> impl Future<Output = ()> {
 124        let mut this = self.clone();
 125        async move {
 126            let (connection_id, handle_io, mut incoming_rx) =
 127                this.peer.add_connection(connection).await;
 128
 129            if let Some(send_connection_id) = send_connection_id.as_mut() {
 130                let _ = send_connection_id.send(connection_id).await;
 131            }
 132
 133            this.state_mut().add_connection(connection_id, user_id);
 134            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
 135                log::error!("error updating contacts for {:?}: {}", user_id, err);
 136            }
 137
 138            let handle_io = handle_io.fuse();
 139            futures::pin_mut!(handle_io);
 140            loop {
 141                let next_message = incoming_rx.next().fuse();
 142                futures::pin_mut!(next_message);
 143                futures::select_biased! {
 144                    message = next_message => {
 145                        if let Some(message) = message {
 146                            let start_time = Instant::now();
 147                            log::info!("RPC message received: {}", message.payload_type_name());
 148                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 149                                if let Err(err) = (handler)(this.clone(), message).await {
 150                                    log::error!("error handling message: {:?}", err);
 151                                } else {
 152                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 153                                }
 154
 155                                if let Some(mut notifications) = this.notifications.clone() {
 156                                    let _ = notifications.send(()).await;
 157                                }
 158                            } else {
 159                                log::warn!("unhandled message: {}", message.payload_type_name());
 160                            }
 161                        } else {
 162                            log::info!("rpc connection closed {:?}", addr);
 163                            break;
 164                        }
 165                    }
 166                    handle_io = handle_io => {
 167                        if let Err(err) = handle_io {
 168                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 169                        }
 170                        break;
 171                    }
 172                }
 173            }
 174
 175            if let Err(err) = this.sign_out(connection_id).await {
 176                log::error!("error signing out connection {:?} - {:?}", addr, err);
 177            }
 178        }
 179    }
 180
 181    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 182        self.peer.disconnect(connection_id);
 183        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 184
 185        for (project_id, project) in removed_connection.hosted_projects {
 186            if let Some(share) = project.share {
 187                broadcast(
 188                    connection_id,
 189                    share.guests.keys().copied().collect(),
 190                    |conn_id| {
 191                        self.peer
 192                            .send(conn_id, proto::UnshareProject { project_id })
 193                    },
 194                )
 195                .await?;
 196            }
 197        }
 198
 199        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 200            broadcast(connection_id, peer_ids, |conn_id| {
 201                self.peer.send(
 202                    conn_id,
 203                    proto::RemoveProjectCollaborator {
 204                        project_id,
 205                        peer_id: connection_id.0,
 206                    },
 207                )
 208            })
 209            .await?;
 210        }
 211
 212        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 213            .await?;
 214
 215        Ok(())
 216    }
 217
 218    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 219        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 220        Ok(())
 221    }
 222
 223    async fn register_project(
 224        mut self: Arc<Server>,
 225        request: TypedEnvelope<proto::RegisterProject>,
 226    ) -> tide::Result<()> {
 227        let project_id = {
 228            let mut state = self.state_mut();
 229            let user_id = state.user_id_for_connection(request.sender_id)?;
 230            state.register_project(request.sender_id, user_id)
 231        };
 232        self.peer
 233            .respond(
 234                request.receipt(),
 235                proto::RegisterProjectResponse { project_id },
 236            )
 237            .await?;
 238        Ok(())
 239    }
 240
 241    async fn unregister_project(
 242        mut self: Arc<Server>,
 243        request: TypedEnvelope<proto::UnregisterProject>,
 244    ) -> tide::Result<()> {
 245        let project = self
 246            .state_mut()
 247            .unregister_project(request.payload.project_id, request.sender_id)
 248            .ok_or_else(|| anyhow!("no such project"))?;
 249        self.update_contacts_for_users(project.authorized_user_ids().iter())
 250            .await?;
 251        Ok(())
 252    }
 253
 254    async fn share_project(
 255        mut self: Arc<Server>,
 256        request: TypedEnvelope<proto::ShareProject>,
 257    ) -> tide::Result<()> {
 258        self.state_mut()
 259            .share_project(request.payload.project_id, request.sender_id);
 260        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 261        Ok(())
 262    }
 263
 264    async fn unshare_project(
 265        mut self: Arc<Server>,
 266        request: TypedEnvelope<proto::UnshareProject>,
 267    ) -> tide::Result<()> {
 268        let project_id = request.payload.project_id;
 269        let project = self
 270            .state_mut()
 271            .unshare_project(project_id, request.sender_id)?;
 272
 273        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 274            self.peer
 275                .send(conn_id, proto::UnshareProject { project_id })
 276        })
 277        .await?;
 278        self.update_contacts_for_users(&project.authorized_user_ids)
 279            .await?;
 280
 281        Ok(())
 282    }
 283
 284    async fn join_project(
 285        mut self: Arc<Server>,
 286        request: TypedEnvelope<proto::JoinProject>,
 287    ) -> tide::Result<()> {
 288        let project_id = request.payload.project_id;
 289
 290        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 291        let response_data = self
 292            .state_mut()
 293            .join_project(request.sender_id, user_id, project_id)
 294            .and_then(|joined| {
 295                let share = joined.project.share()?;
 296                let peer_count = share.guests.len();
 297                let mut collaborators = Vec::with_capacity(peer_count);
 298                collaborators.push(proto::Collaborator {
 299                    peer_id: joined.project.host_connection_id.0,
 300                    replica_id: 0,
 301                    user_id: joined.project.host_user_id.to_proto(),
 302                });
 303                let worktrees = joined
 304                    .project
 305                    .worktrees
 306                    .iter()
 307                    .filter_map(|(id, worktree)| {
 308                        worktree.share.as_ref().map(|share| proto::Worktree {
 309                            id: *id,
 310                            root_name: worktree.root_name.clone(),
 311                            entries: share.entries.values().cloned().collect(),
 312                            diagnostic_summaries: share
 313                                .diagnostic_summaries
 314                                .values()
 315                                .cloned()
 316                                .collect(),
 317                            weak: worktree.weak,
 318                        })
 319                    })
 320                    .collect();
 321                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 322                    if *peer_conn_id != request.sender_id {
 323                        collaborators.push(proto::Collaborator {
 324                            peer_id: peer_conn_id.0,
 325                            replica_id: *peer_replica_id as u32,
 326                            user_id: peer_user_id.to_proto(),
 327                        });
 328                    }
 329                }
 330                let response = proto::JoinProjectResponse {
 331                    worktrees,
 332                    replica_id: joined.replica_id as u32,
 333                    collaborators,
 334                };
 335                let connection_ids = joined.project.connection_ids();
 336                let contact_user_ids = joined.project.authorized_user_ids();
 337                Ok((response, connection_ids, contact_user_ids))
 338            });
 339
 340        match response_data {
 341            Ok((response, connection_ids, contact_user_ids)) => {
 342                broadcast(request.sender_id, connection_ids, |conn_id| {
 343                    self.peer.send(
 344                        conn_id,
 345                        proto::AddProjectCollaborator {
 346                            project_id,
 347                            collaborator: Some(proto::Collaborator {
 348                                peer_id: request.sender_id.0,
 349                                replica_id: response.replica_id,
 350                                user_id: user_id.to_proto(),
 351                            }),
 352                        },
 353                    )
 354                })
 355                .await?;
 356                self.peer.respond(request.receipt(), response).await?;
 357                self.update_contacts_for_users(&contact_user_ids).await?;
 358            }
 359            Err(error) => {
 360                self.peer
 361                    .respond_with_error(
 362                        request.receipt(),
 363                        proto::Error {
 364                            message: error.to_string(),
 365                        },
 366                    )
 367                    .await?;
 368            }
 369        }
 370
 371        Ok(())
 372    }
 373
 374    async fn leave_project(
 375        mut self: Arc<Server>,
 376        request: TypedEnvelope<proto::LeaveProject>,
 377    ) -> tide::Result<()> {
 378        let sender_id = request.sender_id;
 379        let project_id = request.payload.project_id;
 380        let worktree = self.state_mut().leave_project(sender_id, project_id);
 381        if let Some(worktree) = worktree {
 382            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 383                self.peer.send(
 384                    conn_id,
 385                    proto::RemoveProjectCollaborator {
 386                        project_id,
 387                        peer_id: sender_id.0,
 388                    },
 389                )
 390            })
 391            .await?;
 392            self.update_contacts_for_users(&worktree.authorized_user_ids)
 393                .await?;
 394        }
 395        Ok(())
 396    }
 397
 398    async fn register_worktree(
 399        mut self: Arc<Server>,
 400        request: TypedEnvelope<proto::RegisterWorktree>,
 401    ) -> tide::Result<()> {
 402        let receipt = request.receipt();
 403        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 404
 405        let mut contact_user_ids = HashSet::default();
 406        contact_user_ids.insert(host_user_id);
 407        for github_login in request.payload.authorized_logins {
 408            match self.app_state.db.create_user(&github_login, false).await {
 409                Ok(contact_user_id) => {
 410                    contact_user_ids.insert(contact_user_id);
 411                }
 412                Err(err) => {
 413                    let message = err.to_string();
 414                    self.peer
 415                        .respond_with_error(receipt, proto::Error { message })
 416                        .await?;
 417                    return Ok(());
 418                }
 419            }
 420        }
 421
 422        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 423        let ok = self.state_mut().register_worktree(
 424            request.payload.project_id,
 425            request.payload.worktree_id,
 426            Worktree {
 427                authorized_user_ids: contact_user_ids.clone(),
 428                root_name: request.payload.root_name,
 429                share: None,
 430                weak: false,
 431            },
 432        );
 433
 434        if ok {
 435            self.peer.respond(receipt, proto::Ack {}).await?;
 436            self.update_contacts_for_users(&contact_user_ids).await?;
 437        } else {
 438            self.peer
 439                .respond_with_error(
 440                    receipt,
 441                    proto::Error {
 442                        message: NO_SUCH_PROJECT.to_string(),
 443                    },
 444                )
 445                .await?;
 446        }
 447
 448        Ok(())
 449    }
 450
 451    async fn unregister_worktree(
 452        mut self: Arc<Server>,
 453        request: TypedEnvelope<proto::UnregisterWorktree>,
 454    ) -> tide::Result<()> {
 455        let project_id = request.payload.project_id;
 456        let worktree_id = request.payload.worktree_id;
 457        let (worktree, guest_connection_ids) =
 458            self.state_mut()
 459                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 460
 461        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 462            self.peer.send(
 463                conn_id,
 464                proto::UnregisterWorktree {
 465                    project_id,
 466                    worktree_id,
 467                },
 468            )
 469        })
 470        .await?;
 471        self.update_contacts_for_users(&worktree.authorized_user_ids)
 472            .await?;
 473        Ok(())
 474    }
 475
 476    async fn share_worktree(
 477        mut self: Arc<Server>,
 478        mut request: TypedEnvelope<proto::ShareWorktree>,
 479    ) -> tide::Result<()> {
 480        let worktree = request
 481            .payload
 482            .worktree
 483            .as_mut()
 484            .ok_or_else(|| anyhow!("missing worktree"))?;
 485        let entries = worktree
 486            .entries
 487            .iter()
 488            .map(|entry| (entry.id, entry.clone()))
 489            .collect();
 490        let diagnostic_summaries = worktree
 491            .diagnostic_summaries
 492            .iter()
 493            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 494            .collect();
 495
 496        let shared_worktree = self.state_mut().share_worktree(
 497            request.payload.project_id,
 498            worktree.id,
 499            request.sender_id,
 500            entries,
 501            diagnostic_summaries,
 502        );
 503        if let Some(shared_worktree) = shared_worktree {
 504            broadcast(
 505                request.sender_id,
 506                shared_worktree.connection_ids,
 507                |connection_id| {
 508                    self.peer.forward_send(
 509                        request.sender_id,
 510                        connection_id,
 511                        request.payload.clone(),
 512                    )
 513                },
 514            )
 515            .await?;
 516            self.peer.respond(request.receipt(), proto::Ack {}).await?;
 517            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
 518                .await?;
 519        } else {
 520            self.peer
 521                .respond_with_error(
 522                    request.receipt(),
 523                    proto::Error {
 524                        message: "no such worktree".to_string(),
 525                    },
 526                )
 527                .await?;
 528        }
 529        Ok(())
 530    }
 531
 532    async fn update_worktree(
 533        mut self: Arc<Server>,
 534        request: TypedEnvelope<proto::UpdateWorktree>,
 535    ) -> tide::Result<()> {
 536        let connection_ids = self
 537            .state_mut()
 538            .update_worktree(
 539                request.sender_id,
 540                request.payload.project_id,
 541                request.payload.worktree_id,
 542                &request.payload.removed_entries,
 543                &request.payload.updated_entries,
 544            )
 545            .ok_or_else(|| anyhow!("no such worktree"))?;
 546
 547        broadcast(request.sender_id, connection_ids, |connection_id| {
 548            self.peer
 549                .forward_send(request.sender_id, connection_id, request.payload.clone())
 550        })
 551        .await?;
 552
 553        Ok(())
 554    }
 555
 556    async fn update_diagnostic_summary(
 557        mut self: Arc<Server>,
 558        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 559    ) -> tide::Result<()> {
 560        let receiver_ids = request
 561            .payload
 562            .summary
 563            .clone()
 564            .and_then(|summary| {
 565                self.state_mut().update_diagnostic_summary(
 566                    request.payload.project_id,
 567                    request.payload.worktree_id,
 568                    request.sender_id,
 569                    summary,
 570                )
 571            })
 572            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 573
 574        broadcast(request.sender_id, receiver_ids, |connection_id| {
 575            self.peer
 576                .forward_send(request.sender_id, connection_id, request.payload.clone())
 577        })
 578        .await?;
 579        Ok(())
 580    }
 581
 582    async fn disk_based_diagnostics_updating(
 583        self: Arc<Server>,
 584        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 585    ) -> tide::Result<()> {
 586        let receiver_ids = self
 587            .state()
 588            .project_connection_ids(request.payload.project_id, request.sender_id)
 589            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 590        broadcast(request.sender_id, receiver_ids, |connection_id| {
 591            self.peer
 592                .forward_send(request.sender_id, connection_id, request.payload.clone())
 593        })
 594        .await?;
 595        Ok(())
 596    }
 597
 598    async fn disk_based_diagnostics_updated(
 599        self: Arc<Server>,
 600        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 601    ) -> tide::Result<()> {
 602        let receiver_ids = self
 603            .state()
 604            .project_connection_ids(request.payload.project_id, request.sender_id)
 605            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 606        broadcast(request.sender_id, receiver_ids, |connection_id| {
 607            self.peer
 608                .forward_send(request.sender_id, connection_id, request.payload.clone())
 609        })
 610        .await?;
 611        Ok(())
 612    }
 613
 614    async fn get_definition(
 615        self: Arc<Server>,
 616        request: TypedEnvelope<proto::GetDefinition>,
 617    ) -> tide::Result<()> {
 618        let receipt = request.receipt();
 619        let host_connection_id = self
 620            .state()
 621            .read_project(request.payload.project_id, request.sender_id)
 622            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 623            .host_connection_id;
 624        let response = self
 625            .peer
 626            .forward_request(request.sender_id, host_connection_id, request.payload)
 627            .await?;
 628        self.peer.respond(receipt, response).await?;
 629        Ok(())
 630    }
 631
 632    async fn open_buffer(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::OpenBuffer>,
 635    ) -> tide::Result<()> {
 636        let receipt = request.receipt();
 637        let host_connection_id = self
 638            .state()
 639            .read_project(request.payload.project_id, request.sender_id)
 640            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 641            .host_connection_id;
 642        let response = self
 643            .peer
 644            .forward_request(request.sender_id, host_connection_id, request.payload)
 645            .await?;
 646        self.peer.respond(receipt, response).await?;
 647        Ok(())
 648    }
 649
 650    async fn close_buffer(
 651        self: Arc<Server>,
 652        request: TypedEnvelope<proto::CloseBuffer>,
 653    ) -> tide::Result<()> {
 654        let host_connection_id = self
 655            .state()
 656            .read_project(request.payload.project_id, request.sender_id)
 657            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 658            .host_connection_id;
 659        self.peer
 660            .forward_send(request.sender_id, host_connection_id, request.payload)
 661            .await?;
 662        Ok(())
 663    }
 664
 665    async fn save_buffer(
 666        self: Arc<Server>,
 667        request: TypedEnvelope<proto::SaveBuffer>,
 668    ) -> tide::Result<()> {
 669        let host;
 670        let guests;
 671        {
 672            let state = self.state();
 673            let project = state
 674                .read_project(request.payload.project_id, request.sender_id)
 675                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 676            host = project.host_connection_id;
 677            guests = project.guest_connection_ids()
 678        }
 679
 680        let sender = request.sender_id;
 681        let receipt = request.receipt();
 682        let response = self
 683            .peer
 684            .forward_request(sender, host, request.payload.clone())
 685            .await?;
 686
 687        broadcast(host, guests, |conn_id| {
 688            let response = response.clone();
 689            let peer = &self.peer;
 690            async move {
 691                if conn_id == sender {
 692                    peer.respond(receipt, response).await
 693                } else {
 694                    peer.forward_send(host, conn_id, response).await
 695                }
 696            }
 697        })
 698        .await?;
 699
 700        Ok(())
 701    }
 702
 703    async fn format_buffer(
 704        self: Arc<Server>,
 705        request: TypedEnvelope<proto::FormatBuffer>,
 706    ) -> tide::Result<()> {
 707        let host;
 708        {
 709            let state = self.state();
 710            let project = state
 711                .read_project(request.payload.project_id, request.sender_id)
 712                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 713            host = project.host_connection_id;
 714        }
 715
 716        let sender = request.sender_id;
 717        let receipt = request.receipt();
 718        let response = self
 719            .peer
 720            .forward_request(sender, host, request.payload.clone())
 721            .await?;
 722        self.peer.respond(receipt, response).await?;
 723
 724        Ok(())
 725    }
 726
 727    async fn get_completions(
 728        self: Arc<Server>,
 729        request: TypedEnvelope<proto::GetCompletions>,
 730    ) -> tide::Result<()> {
 731        let host;
 732        {
 733            let state = self.state();
 734            let project = state
 735                .read_project(request.payload.project_id, request.sender_id)
 736                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 737            host = project.host_connection_id;
 738        }
 739
 740        let sender = request.sender_id;
 741        let receipt = request.receipt();
 742        let response = self
 743            .peer
 744            .forward_request(sender, host, request.payload.clone())
 745            .await?;
 746        self.peer.respond(receipt, response).await?;
 747
 748        Ok(())
 749    }
 750
 751    async fn apply_additional_edits_for_completion(
 752        self: Arc<Server>,
 753        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 754    ) -> tide::Result<()> {
 755        let host;
 756        {
 757            let state = self.state();
 758            let project = state
 759                .read_project(request.payload.project_id, request.sender_id)
 760                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 761            host = project.host_connection_id;
 762        }
 763
 764        let sender = request.sender_id;
 765        let receipt = request.receipt();
 766        let response = self
 767            .peer
 768            .forward_request(sender, host, request.payload.clone())
 769            .await?;
 770        self.peer.respond(receipt, response).await?;
 771
 772        Ok(())
 773    }
 774
 775    async fn update_buffer(
 776        self: Arc<Server>,
 777        request: TypedEnvelope<proto::UpdateBuffer>,
 778    ) -> tide::Result<()> {
 779        let receiver_ids = self
 780            .state()
 781            .project_connection_ids(request.payload.project_id, request.sender_id)
 782            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 783        broadcast(request.sender_id, receiver_ids, |connection_id| {
 784            self.peer
 785                .forward_send(request.sender_id, connection_id, request.payload.clone())
 786        })
 787        .await?;
 788        self.peer.respond(request.receipt(), proto::Ack {}).await?;
 789        Ok(())
 790    }
 791
 792    async fn update_buffer_file(
 793        self: Arc<Server>,
 794        request: TypedEnvelope<proto::UpdateBufferFile>,
 795    ) -> tide::Result<()> {
 796        let receiver_ids = self
 797            .state()
 798            .project_connection_ids(request.payload.project_id, request.sender_id)
 799            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 800        broadcast(request.sender_id, receiver_ids, |connection_id| {
 801            self.peer
 802                .forward_send(request.sender_id, connection_id, request.payload.clone())
 803        })
 804        .await?;
 805        Ok(())
 806    }
 807
 808    async fn buffer_reloaded(
 809        self: Arc<Server>,
 810        request: TypedEnvelope<proto::BufferReloaded>,
 811    ) -> tide::Result<()> {
 812        let receiver_ids = self
 813            .state()
 814            .project_connection_ids(request.payload.project_id, request.sender_id)
 815            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 816        broadcast(request.sender_id, receiver_ids, |connection_id| {
 817            self.peer
 818                .forward_send(request.sender_id, connection_id, request.payload.clone())
 819        })
 820        .await?;
 821        Ok(())
 822    }
 823
 824    async fn buffer_saved(
 825        self: Arc<Server>,
 826        request: TypedEnvelope<proto::BufferSaved>,
 827    ) -> tide::Result<()> {
 828        let receiver_ids = self
 829            .state()
 830            .project_connection_ids(request.payload.project_id, request.sender_id)
 831            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 832        broadcast(request.sender_id, receiver_ids, |connection_id| {
 833            self.peer
 834                .forward_send(request.sender_id, connection_id, request.payload.clone())
 835        })
 836        .await?;
 837        Ok(())
 838    }
 839
 840    async fn get_channels(
 841        self: Arc<Server>,
 842        request: TypedEnvelope<proto::GetChannels>,
 843    ) -> tide::Result<()> {
 844        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 845        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 846        self.peer
 847            .respond(
 848                request.receipt(),
 849                proto::GetChannelsResponse {
 850                    channels: channels
 851                        .into_iter()
 852                        .map(|chan| proto::Channel {
 853                            id: chan.id.to_proto(),
 854                            name: chan.name,
 855                        })
 856                        .collect(),
 857                },
 858            )
 859            .await?;
 860        Ok(())
 861    }
 862
 863    async fn get_users(
 864        self: Arc<Server>,
 865        request: TypedEnvelope<proto::GetUsers>,
 866    ) -> tide::Result<()> {
 867        let receipt = request.receipt();
 868        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 869        let users = self
 870            .app_state
 871            .db
 872            .get_users_by_ids(user_ids)
 873            .await?
 874            .into_iter()
 875            .map(|user| proto::User {
 876                id: user.id.to_proto(),
 877                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 878                github_login: user.github_login,
 879            })
 880            .collect();
 881        self.peer
 882            .respond(receipt, proto::GetUsersResponse { users })
 883            .await?;
 884        Ok(())
 885    }
 886
 887    async fn update_contacts_for_users<'a>(
 888        self: &Arc<Server>,
 889        user_ids: impl IntoIterator<Item = &'a UserId>,
 890    ) -> tide::Result<()> {
 891        let mut send_futures = Vec::new();
 892
 893        {
 894            let state = self.state();
 895            for user_id in user_ids {
 896                let contacts = state.contacts_for_user(*user_id);
 897                for connection_id in state.connection_ids_for_user(*user_id) {
 898                    send_futures.push(self.peer.send(
 899                        connection_id,
 900                        proto::UpdateContacts {
 901                            contacts: contacts.clone(),
 902                        },
 903                    ));
 904                }
 905            }
 906        }
 907        futures::future::try_join_all(send_futures).await?;
 908
 909        Ok(())
 910    }
 911
 912    async fn join_channel(
 913        mut self: Arc<Self>,
 914        request: TypedEnvelope<proto::JoinChannel>,
 915    ) -> tide::Result<()> {
 916        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 917        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 918        if !self
 919            .app_state
 920            .db
 921            .can_user_access_channel(user_id, channel_id)
 922            .await?
 923        {
 924            Err(anyhow!("access denied"))?;
 925        }
 926
 927        self.state_mut().join_channel(request.sender_id, channel_id);
 928        let messages = self
 929            .app_state
 930            .db
 931            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 932            .await?
 933            .into_iter()
 934            .map(|msg| proto::ChannelMessage {
 935                id: msg.id.to_proto(),
 936                body: msg.body,
 937                timestamp: msg.sent_at.unix_timestamp() as u64,
 938                sender_id: msg.sender_id.to_proto(),
 939                nonce: Some(msg.nonce.as_u128().into()),
 940            })
 941            .collect::<Vec<_>>();
 942        self.peer
 943            .respond(
 944                request.receipt(),
 945                proto::JoinChannelResponse {
 946                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 947                    messages,
 948                },
 949            )
 950            .await?;
 951        Ok(())
 952    }
 953
 954    async fn leave_channel(
 955        mut self: Arc<Self>,
 956        request: TypedEnvelope<proto::LeaveChannel>,
 957    ) -> tide::Result<()> {
 958        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 959        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 960        if !self
 961            .app_state
 962            .db
 963            .can_user_access_channel(user_id, channel_id)
 964            .await?
 965        {
 966            Err(anyhow!("access denied"))?;
 967        }
 968
 969        self.state_mut()
 970            .leave_channel(request.sender_id, channel_id);
 971
 972        Ok(())
 973    }
 974
 975    async fn send_channel_message(
 976        self: Arc<Self>,
 977        request: TypedEnvelope<proto::SendChannelMessage>,
 978    ) -> tide::Result<()> {
 979        let receipt = request.receipt();
 980        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 981        let user_id;
 982        let connection_ids;
 983        {
 984            let state = self.state();
 985            user_id = state.user_id_for_connection(request.sender_id)?;
 986            if let Some(ids) = state.channel_connection_ids(channel_id) {
 987                connection_ids = ids;
 988            } else {
 989                return Ok(());
 990            }
 991        }
 992
 993        // Validate the message body.
 994        let body = request.payload.body.trim().to_string();
 995        if body.len() > MAX_MESSAGE_LEN {
 996            self.peer
 997                .respond_with_error(
 998                    receipt,
 999                    proto::Error {
1000                        message: "message is too long".to_string(),
1001                    },
1002                )
1003                .await?;
1004            return Ok(());
1005        }
1006        if body.is_empty() {
1007            self.peer
1008                .respond_with_error(
1009                    receipt,
1010                    proto::Error {
1011                        message: "message can't be blank".to_string(),
1012                    },
1013                )
1014                .await?;
1015            return Ok(());
1016        }
1017
1018        let timestamp = OffsetDateTime::now_utc();
1019        let nonce = if let Some(nonce) = request.payload.nonce {
1020            nonce
1021        } else {
1022            self.peer
1023                .respond_with_error(
1024                    receipt,
1025                    proto::Error {
1026                        message: "nonce can't be blank".to_string(),
1027                    },
1028                )
1029                .await?;
1030            return Ok(());
1031        };
1032
1033        let message_id = self
1034            .app_state
1035            .db
1036            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1037            .await?
1038            .to_proto();
1039        let message = proto::ChannelMessage {
1040            sender_id: user_id.to_proto(),
1041            id: message_id,
1042            body,
1043            timestamp: timestamp.unix_timestamp() as u64,
1044            nonce: Some(nonce),
1045        };
1046        broadcast(request.sender_id, connection_ids, |conn_id| {
1047            self.peer.send(
1048                conn_id,
1049                proto::ChannelMessageSent {
1050                    channel_id: channel_id.to_proto(),
1051                    message: Some(message.clone()),
1052                },
1053            )
1054        })
1055        .await?;
1056        self.peer
1057            .respond(
1058                receipt,
1059                proto::SendChannelMessageResponse {
1060                    message: Some(message),
1061                },
1062            )
1063            .await?;
1064        Ok(())
1065    }
1066
1067    async fn get_channel_messages(
1068        self: Arc<Self>,
1069        request: TypedEnvelope<proto::GetChannelMessages>,
1070    ) -> tide::Result<()> {
1071        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1072        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1073        if !self
1074            .app_state
1075            .db
1076            .can_user_access_channel(user_id, channel_id)
1077            .await?
1078        {
1079            Err(anyhow!("access denied"))?;
1080        }
1081
1082        let messages = self
1083            .app_state
1084            .db
1085            .get_channel_messages(
1086                channel_id,
1087                MESSAGE_COUNT_PER_PAGE,
1088                Some(MessageId::from_proto(request.payload.before_message_id)),
1089            )
1090            .await?
1091            .into_iter()
1092            .map(|msg| proto::ChannelMessage {
1093                id: msg.id.to_proto(),
1094                body: msg.body,
1095                timestamp: msg.sent_at.unix_timestamp() as u64,
1096                sender_id: msg.sender_id.to_proto(),
1097                nonce: Some(msg.nonce.as_u128().into()),
1098            })
1099            .collect::<Vec<_>>();
1100        self.peer
1101            .respond(
1102                request.receipt(),
1103                proto::GetChannelMessagesResponse {
1104                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1105                    messages,
1106                },
1107            )
1108            .await?;
1109        Ok(())
1110    }
1111
1112    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1113        self.store.read()
1114    }
1115
1116    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1117        self.store.write()
1118    }
1119}
1120
1121pub async fn broadcast<F, T>(
1122    sender_id: ConnectionId,
1123    receiver_ids: Vec<ConnectionId>,
1124    mut f: F,
1125) -> anyhow::Result<()>
1126where
1127    F: FnMut(ConnectionId) -> T,
1128    T: Future<Output = anyhow::Result<()>>,
1129{
1130    let futures = receiver_ids
1131        .into_iter()
1132        .filter(|id| *id != sender_id)
1133        .map(|id| f(id));
1134    futures::future::try_join_all(futures).await?;
1135    Ok(())
1136}
1137
1138pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1139    let server = Server::new(app.state().clone(), rpc.clone(), None);
1140    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1141        let server = server.clone();
1142        async move {
1143            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1144
1145            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1146            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1147            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1148            let client_protocol_version: Option<u32> = request
1149                .header("X-Zed-Protocol-Version")
1150                .and_then(|v| v.as_str().parse().ok());
1151
1152            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1153                return Ok(Response::new(StatusCode::UpgradeRequired));
1154            }
1155
1156            let header = match request.header("Sec-Websocket-Key") {
1157                Some(h) => h.as_str(),
1158                None => return Err(anyhow!("expected sec-websocket-key"))?,
1159            };
1160
1161            let user_id = process_auth_header(&request).await?;
1162
1163            let mut response = Response::new(StatusCode::SwitchingProtocols);
1164            response.insert_header(UPGRADE, "websocket");
1165            response.insert_header(CONNECTION, "Upgrade");
1166            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1167            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1168            response.insert_header("Sec-Websocket-Version", "13");
1169
1170            let http_res: &mut tide::http::Response = response.as_mut();
1171            let upgrade_receiver = http_res.recv_upgrade().await;
1172            let addr = request.remote().unwrap_or("unknown").to_string();
1173            task::spawn(async move {
1174                if let Some(stream) = upgrade_receiver.await {
1175                    server
1176                        .handle_connection(
1177                            Connection::new(
1178                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1179                            ),
1180                            addr,
1181                            user_id,
1182                            None,
1183                        )
1184                        .await;
1185                }
1186            });
1187
1188            Ok(response)
1189        }
1190    });
1191}
1192
1193fn header_contains_ignore_case<T>(
1194    request: &tide::Request<T>,
1195    header_name: HeaderName,
1196    value: &str,
1197) -> bool {
1198    request
1199        .header(header_name)
1200        .map(|h| {
1201            h.as_str()
1202                .split(',')
1203                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1204        })
1205        .unwrap_or(false)
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211    use crate::{
1212        auth,
1213        db::{tests::TestDb, UserId},
1214        github, AppState, Config,
1215    };
1216    use ::rpc::Peer;
1217    use async_std::task;
1218    use gpui::{executor, ModelHandle, TestAppContext};
1219    use parking_lot::Mutex;
1220    use postage::{mpsc, watch};
1221    use rand::prelude::*;
1222    use rpc::PeerId;
1223    use serde_json::json;
1224    use sqlx::types::time::OffsetDateTime;
1225    use std::{
1226        ops::Deref,
1227        path::Path,
1228        rc::Rc,
1229        sync::{
1230            atomic::{AtomicBool, Ordering::SeqCst},
1231            Arc,
1232        },
1233        time::Duration,
1234    };
1235    use zed::{
1236        client::{
1237            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1238            EstablishConnectionError, UserStore,
1239        },
1240        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1241        fs::{FakeFs, Fs as _},
1242        language::{
1243            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1244            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1245        },
1246        lsp,
1247        project::{DiagnosticSummary, Project, ProjectPath},
1248    };
1249
1250    #[gpui::test]
1251    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1252        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1253        let lang_registry = Arc::new(LanguageRegistry::new());
1254        let fs = Arc::new(FakeFs::new(cx_a.background()));
1255        cx_a.foreground().forbid_parking();
1256
1257        // Connect to a server as 2 clients.
1258        let mut server = TestServer::start(cx_a.foreground()).await;
1259        let client_a = server.create_client(&mut cx_a, "user_a").await;
1260        let client_b = server.create_client(&mut cx_b, "user_b").await;
1261
1262        // Share a project as client A
1263        fs.insert_tree(
1264            "/a",
1265            json!({
1266                ".zed.toml": r#"collaborators = ["user_b"]"#,
1267                "a.txt": "a-contents",
1268                "b.txt": "b-contents",
1269            }),
1270        )
1271        .await;
1272        let project_a = cx_a.update(|cx| {
1273            Project::local(
1274                client_a.clone(),
1275                client_a.user_store.clone(),
1276                lang_registry.clone(),
1277                fs.clone(),
1278                cx,
1279            )
1280        });
1281        let (worktree_a, _) = project_a
1282            .update(&mut cx_a, |p, cx| {
1283                p.find_or_create_local_worktree("/a", false, cx)
1284            })
1285            .await
1286            .unwrap();
1287        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1288        worktree_a
1289            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1290            .await;
1291        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1292        project_a
1293            .update(&mut cx_a, |p, cx| p.share(cx))
1294            .await
1295            .unwrap();
1296
1297        // Join that project as client B
1298        let project_b = Project::remote(
1299            project_id,
1300            client_b.clone(),
1301            client_b.user_store.clone(),
1302            lang_registry.clone(),
1303            fs.clone(),
1304            &mut cx_b.to_async(),
1305        )
1306        .await
1307        .unwrap();
1308
1309        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1310            assert_eq!(
1311                project
1312                    .collaborators()
1313                    .get(&client_a.peer_id)
1314                    .unwrap()
1315                    .user
1316                    .github_login,
1317                "user_a"
1318            );
1319            project.replica_id()
1320        });
1321        project_a
1322            .condition(&cx_a, |tree, _| {
1323                tree.collaborators()
1324                    .get(&client_b.peer_id)
1325                    .map_or(false, |collaborator| {
1326                        collaborator.replica_id == replica_id_b
1327                            && collaborator.user.github_login == "user_b"
1328                    })
1329            })
1330            .await;
1331
1332        // Open the same file as client B and client A.
1333        let buffer_b = project_b
1334            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1335            .await
1336            .unwrap();
1337        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1338        buffer_b.read_with(&cx_b, |buf, cx| {
1339            assert_eq!(buf.read(cx).text(), "b-contents")
1340        });
1341        project_a.read_with(&cx_a, |project, cx| {
1342            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1343        });
1344        let buffer_a = project_a
1345            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1346            .await
1347            .unwrap();
1348
1349        let editor_b = cx_b.add_view(window_b, |cx| {
1350            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1351        });
1352
1353        // TODO
1354        // // Create a selection set as client B and see that selection set as client A.
1355        // buffer_a
1356        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1357        //     .await;
1358
1359        // Edit the buffer as client B and see that edit as client A.
1360        editor_b.update(&mut cx_b, |editor, cx| {
1361            editor.handle_input(&Input("ok, ".into()), cx)
1362        });
1363        buffer_a
1364            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1365            .await;
1366
1367        // TODO
1368        // // Remove the selection set as client B, see those selections disappear as client A.
1369        cx_b.update(move |_| drop(editor_b));
1370        // buffer_a
1371        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1372        //     .await;
1373
1374        // Close the buffer as client A, see that the buffer is closed.
1375        cx_a.update(move |_| drop(buffer_a));
1376        project_a
1377            .condition(&cx_a, |project, cx| {
1378                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1379            })
1380            .await;
1381
1382        // Dropping the client B's project removes client B from client A's collaborators.
1383        cx_b.update(move |_| drop(project_b));
1384        project_a
1385            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1386            .await;
1387    }
1388
1389    #[gpui::test]
1390    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1391        let lang_registry = Arc::new(LanguageRegistry::new());
1392        let fs = Arc::new(FakeFs::new(cx_a.background()));
1393        cx_a.foreground().forbid_parking();
1394
1395        // Connect to a server as 2 clients.
1396        let mut server = TestServer::start(cx_a.foreground()).await;
1397        let client_a = server.create_client(&mut cx_a, "user_a").await;
1398        let client_b = server.create_client(&mut cx_b, "user_b").await;
1399
1400        // Share a project as client A
1401        fs.insert_tree(
1402            "/a",
1403            json!({
1404                ".zed.toml": r#"collaborators = ["user_b"]"#,
1405                "a.txt": "a-contents",
1406                "b.txt": "b-contents",
1407            }),
1408        )
1409        .await;
1410        let project_a = cx_a.update(|cx| {
1411            Project::local(
1412                client_a.clone(),
1413                client_a.user_store.clone(),
1414                lang_registry.clone(),
1415                fs.clone(),
1416                cx,
1417            )
1418        });
1419        let (worktree_a, _) = project_a
1420            .update(&mut cx_a, |p, cx| {
1421                p.find_or_create_local_worktree("/a", false, cx)
1422            })
1423            .await
1424            .unwrap();
1425        worktree_a
1426            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1427            .await;
1428        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1429        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1430        project_a
1431            .update(&mut cx_a, |p, cx| p.share(cx))
1432            .await
1433            .unwrap();
1434        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1435
1436        // Join that project as client B
1437        let project_b = Project::remote(
1438            project_id,
1439            client_b.clone(),
1440            client_b.user_store.clone(),
1441            lang_registry.clone(),
1442            fs.clone(),
1443            &mut cx_b.to_async(),
1444        )
1445        .await
1446        .unwrap();
1447        project_b
1448            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1449            .await
1450            .unwrap();
1451
1452        // Unshare the project as client A
1453        project_a
1454            .update(&mut cx_a, |project, cx| project.unshare(cx))
1455            .await
1456            .unwrap();
1457        project_b
1458            .condition(&mut cx_b, |project, _| project.is_read_only())
1459            .await;
1460        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1461        drop(project_b);
1462
1463        // Share the project again and ensure guests can still join.
1464        project_a
1465            .update(&mut cx_a, |project, cx| project.share(cx))
1466            .await
1467            .unwrap();
1468        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1469
1470        let project_c = Project::remote(
1471            project_id,
1472            client_b.clone(),
1473            client_b.user_store.clone(),
1474            lang_registry.clone(),
1475            fs.clone(),
1476            &mut cx_b.to_async(),
1477        )
1478        .await
1479        .unwrap();
1480        project_c
1481            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1482            .await
1483            .unwrap();
1484    }
1485
1486    #[gpui::test]
1487    async fn test_propagate_saves_and_fs_changes(
1488        mut cx_a: TestAppContext,
1489        mut cx_b: TestAppContext,
1490        mut cx_c: TestAppContext,
1491    ) {
1492        let lang_registry = Arc::new(LanguageRegistry::new());
1493        let fs = Arc::new(FakeFs::new(cx_a.background()));
1494        cx_a.foreground().forbid_parking();
1495
1496        // Connect to a server as 3 clients.
1497        let mut server = TestServer::start(cx_a.foreground()).await;
1498        let client_a = server.create_client(&mut cx_a, "user_a").await;
1499        let client_b = server.create_client(&mut cx_b, "user_b").await;
1500        let client_c = server.create_client(&mut cx_c, "user_c").await;
1501
1502        // Share a worktree as client A.
1503        fs.insert_tree(
1504            "/a",
1505            json!({
1506                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1507                "file1": "",
1508                "file2": ""
1509            }),
1510        )
1511        .await;
1512        let project_a = cx_a.update(|cx| {
1513            Project::local(
1514                client_a.clone(),
1515                client_a.user_store.clone(),
1516                lang_registry.clone(),
1517                fs.clone(),
1518                cx,
1519            )
1520        });
1521        let (worktree_a, _) = project_a
1522            .update(&mut cx_a, |p, cx| {
1523                p.find_or_create_local_worktree("/a", false, cx)
1524            })
1525            .await
1526            .unwrap();
1527        worktree_a
1528            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529            .await;
1530        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1531        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1532        project_a
1533            .update(&mut cx_a, |p, cx| p.share(cx))
1534            .await
1535            .unwrap();
1536
1537        // Join that worktree as clients B and C.
1538        let project_b = Project::remote(
1539            project_id,
1540            client_b.clone(),
1541            client_b.user_store.clone(),
1542            lang_registry.clone(),
1543            fs.clone(),
1544            &mut cx_b.to_async(),
1545        )
1546        .await
1547        .unwrap();
1548        let project_c = Project::remote(
1549            project_id,
1550            client_c.clone(),
1551            client_c.user_store.clone(),
1552            lang_registry.clone(),
1553            fs.clone(),
1554            &mut cx_c.to_async(),
1555        )
1556        .await
1557        .unwrap();
1558        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1559        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1560
1561        // Open and edit a buffer as both guests B and C.
1562        let buffer_b = project_b
1563            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1564            .await
1565            .unwrap();
1566        let buffer_c = project_c
1567            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1568            .await
1569            .unwrap();
1570        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1571        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1572
1573        // Open and edit that buffer as the host.
1574        let buffer_a = project_a
1575            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1576            .await
1577            .unwrap();
1578
1579        buffer_a
1580            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1581            .await;
1582        buffer_a.update(&mut cx_a, |buf, cx| {
1583            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1584        });
1585
1586        // Wait for edits to propagate
1587        buffer_a
1588            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1589            .await;
1590        buffer_b
1591            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1592            .await;
1593        buffer_c
1594            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1595            .await;
1596
1597        // Edit the buffer as the host and concurrently save as guest B.
1598        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1599        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1600        save_b.await.unwrap();
1601        assert_eq!(
1602            fs.load("/a/file1".as_ref()).await.unwrap(),
1603            "hi-a, i-am-c, i-am-b, i-am-a"
1604        );
1605        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1606        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1607        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1608
1609        // Make changes on host's file system, see those changes on guest worktrees.
1610        fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1611            .await
1612            .unwrap();
1613        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1614            .await
1615            .unwrap();
1616        fs.insert_file(Path::new("/a/file4"), "4".into())
1617            .await
1618            .unwrap();
1619
1620        worktree_a
1621            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1622            .await;
1623        worktree_b
1624            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1625            .await;
1626        worktree_c
1627            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1628            .await;
1629        worktree_a.read_with(&cx_a, |tree, _| {
1630            assert_eq!(
1631                tree.paths()
1632                    .map(|p| p.to_string_lossy())
1633                    .collect::<Vec<_>>(),
1634                &[".zed.toml", "file1-renamed", "file3", "file4"]
1635            )
1636        });
1637        worktree_b.read_with(&cx_b, |tree, _| {
1638            assert_eq!(
1639                tree.paths()
1640                    .map(|p| p.to_string_lossy())
1641                    .collect::<Vec<_>>(),
1642                &[".zed.toml", "file1-renamed", "file3", "file4"]
1643            )
1644        });
1645        worktree_c.read_with(&cx_c, |tree, _| {
1646            assert_eq!(
1647                tree.paths()
1648                    .map(|p| p.to_string_lossy())
1649                    .collect::<Vec<_>>(),
1650                &[".zed.toml", "file1-renamed", "file3", "file4"]
1651            )
1652        });
1653
1654        // Ensure buffer files are updated as well.
1655        buffer_a
1656            .condition(&cx_a, |buf, _| {
1657                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1658            })
1659            .await;
1660        buffer_b
1661            .condition(&cx_b, |buf, _| {
1662                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1663            })
1664            .await;
1665        buffer_c
1666            .condition(&cx_c, |buf, _| {
1667                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1668            })
1669            .await;
1670    }
1671
1672    #[gpui::test]
1673    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1674        cx_a.foreground().forbid_parking();
1675        let lang_registry = Arc::new(LanguageRegistry::new());
1676        let fs = Arc::new(FakeFs::new(cx_a.background()));
1677
1678        // Connect to a server as 2 clients.
1679        let mut server = TestServer::start(cx_a.foreground()).await;
1680        let client_a = server.create_client(&mut cx_a, "user_a").await;
1681        let client_b = server.create_client(&mut cx_b, "user_b").await;
1682
1683        // Share a project as client A
1684        fs.insert_tree(
1685            "/dir",
1686            json!({
1687                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1688                "a.txt": "a-contents",
1689            }),
1690        )
1691        .await;
1692
1693        let project_a = cx_a.update(|cx| {
1694            Project::local(
1695                client_a.clone(),
1696                client_a.user_store.clone(),
1697                lang_registry.clone(),
1698                fs.clone(),
1699                cx,
1700            )
1701        });
1702        let (worktree_a, _) = project_a
1703            .update(&mut cx_a, |p, cx| {
1704                p.find_or_create_local_worktree("/dir", false, cx)
1705            })
1706            .await
1707            .unwrap();
1708        worktree_a
1709            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1710            .await;
1711        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1712        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1713        project_a
1714            .update(&mut cx_a, |p, cx| p.share(cx))
1715            .await
1716            .unwrap();
1717
1718        // Join that project as client B
1719        let project_b = Project::remote(
1720            project_id,
1721            client_b.clone(),
1722            client_b.user_store.clone(),
1723            lang_registry.clone(),
1724            fs.clone(),
1725            &mut cx_b.to_async(),
1726        )
1727        .await
1728        .unwrap();
1729        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1730
1731        // Open a buffer as client B
1732        let buffer_b = project_b
1733            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1734            .await
1735            .unwrap();
1736        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1737
1738        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1739        buffer_b.read_with(&cx_b, |buf, _| {
1740            assert!(buf.is_dirty());
1741            assert!(!buf.has_conflict());
1742        });
1743
1744        buffer_b
1745            .update(&mut cx_b, |buf, cx| buf.save(cx))
1746            .await
1747            .unwrap();
1748        worktree_b
1749            .condition(&cx_b, |_, cx| {
1750                buffer_b.read(cx).file().unwrap().mtime() != mtime
1751            })
1752            .await;
1753        buffer_b.read_with(&cx_b, |buf, _| {
1754            assert!(!buf.is_dirty());
1755            assert!(!buf.has_conflict());
1756        });
1757
1758        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1759        buffer_b.read_with(&cx_b, |buf, _| {
1760            assert!(buf.is_dirty());
1761            assert!(!buf.has_conflict());
1762        });
1763    }
1764
1765    #[gpui::test]
1766    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1767        cx_a.foreground().forbid_parking();
1768        let lang_registry = Arc::new(LanguageRegistry::new());
1769        let fs = Arc::new(FakeFs::new(cx_a.background()));
1770
1771        // Connect to a server as 2 clients.
1772        let mut server = TestServer::start(cx_a.foreground()).await;
1773        let client_a = server.create_client(&mut cx_a, "user_a").await;
1774        let client_b = server.create_client(&mut cx_b, "user_b").await;
1775
1776        // Share a project as client A
1777        fs.insert_tree(
1778            "/dir",
1779            json!({
1780                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1781                "a.txt": "a-contents",
1782            }),
1783        )
1784        .await;
1785
1786        let project_a = cx_a.update(|cx| {
1787            Project::local(
1788                client_a.clone(),
1789                client_a.user_store.clone(),
1790                lang_registry.clone(),
1791                fs.clone(),
1792                cx,
1793            )
1794        });
1795        let (worktree_a, _) = project_a
1796            .update(&mut cx_a, |p, cx| {
1797                p.find_or_create_local_worktree("/dir", false, cx)
1798            })
1799            .await
1800            .unwrap();
1801        worktree_a
1802            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1803            .await;
1804        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1805        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1806        project_a
1807            .update(&mut cx_a, |p, cx| p.share(cx))
1808            .await
1809            .unwrap();
1810
1811        // Join that project as client B
1812        let project_b = Project::remote(
1813            project_id,
1814            client_b.clone(),
1815            client_b.user_store.clone(),
1816            lang_registry.clone(),
1817            fs.clone(),
1818            &mut cx_b.to_async(),
1819        )
1820        .await
1821        .unwrap();
1822        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1823
1824        // Open a buffer as client B
1825        let buffer_b = project_b
1826            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1827            .await
1828            .unwrap();
1829        buffer_b.read_with(&cx_b, |buf, _| {
1830            assert!(!buf.is_dirty());
1831            assert!(!buf.has_conflict());
1832        });
1833
1834        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1835            .await
1836            .unwrap();
1837        buffer_b
1838            .condition(&cx_b, |buf, _| {
1839                buf.text() == "new contents" && !buf.is_dirty()
1840            })
1841            .await;
1842        buffer_b.read_with(&cx_b, |buf, _| {
1843            assert!(!buf.has_conflict());
1844        });
1845    }
1846
1847    #[gpui::test(iterations = 100)]
1848    async fn test_editing_while_guest_opens_buffer(
1849        mut cx_a: TestAppContext,
1850        mut cx_b: TestAppContext,
1851    ) {
1852        cx_a.foreground().forbid_parking();
1853        let lang_registry = Arc::new(LanguageRegistry::new());
1854        let fs = Arc::new(FakeFs::new(cx_a.background()));
1855
1856        // Connect to a server as 2 clients.
1857        let mut server = TestServer::start(cx_a.foreground()).await;
1858        let client_a = server.create_client(&mut cx_a, "user_a").await;
1859        let client_b = server.create_client(&mut cx_b, "user_b").await;
1860
1861        // Share a project as client A
1862        fs.insert_tree(
1863            "/dir",
1864            json!({
1865                ".zed.toml": r#"collaborators = ["user_b"]"#,
1866                "a.txt": "a-contents",
1867            }),
1868        )
1869        .await;
1870        let project_a = cx_a.update(|cx| {
1871            Project::local(
1872                client_a.clone(),
1873                client_a.user_store.clone(),
1874                lang_registry.clone(),
1875                fs.clone(),
1876                cx,
1877            )
1878        });
1879        let (worktree_a, _) = project_a
1880            .update(&mut cx_a, |p, cx| {
1881                p.find_or_create_local_worktree("/dir", false, cx)
1882            })
1883            .await
1884            .unwrap();
1885        worktree_a
1886            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1887            .await;
1888        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1889        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1890        project_a
1891            .update(&mut cx_a, |p, cx| p.share(cx))
1892            .await
1893            .unwrap();
1894
1895        // Join that project as client B
1896        let project_b = Project::remote(
1897            project_id,
1898            client_b.clone(),
1899            client_b.user_store.clone(),
1900            lang_registry.clone(),
1901            fs.clone(),
1902            &mut cx_b.to_async(),
1903        )
1904        .await
1905        .unwrap();
1906
1907        // Open a buffer as client A
1908        let buffer_a = project_a
1909            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1910            .await
1911            .unwrap();
1912
1913        // Start opening the same buffer as client B
1914        let buffer_b = cx_b
1915            .background()
1916            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1917        task::yield_now().await;
1918
1919        // Edit the buffer as client A while client B is still opening it.
1920        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1921
1922        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1923        let buffer_b = buffer_b.await.unwrap();
1924        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1925    }
1926
1927    #[gpui::test]
1928    async fn test_leaving_worktree_while_opening_buffer(
1929        mut cx_a: TestAppContext,
1930        mut cx_b: TestAppContext,
1931    ) {
1932        cx_a.foreground().forbid_parking();
1933        let lang_registry = Arc::new(LanguageRegistry::new());
1934        let fs = Arc::new(FakeFs::new(cx_a.background()));
1935
1936        // Connect to a server as 2 clients.
1937        let mut server = TestServer::start(cx_a.foreground()).await;
1938        let client_a = server.create_client(&mut cx_a, "user_a").await;
1939        let client_b = server.create_client(&mut cx_b, "user_b").await;
1940
1941        // Share a project as client A
1942        fs.insert_tree(
1943            "/dir",
1944            json!({
1945                ".zed.toml": r#"collaborators = ["user_b"]"#,
1946                "a.txt": "a-contents",
1947            }),
1948        )
1949        .await;
1950        let project_a = cx_a.update(|cx| {
1951            Project::local(
1952                client_a.clone(),
1953                client_a.user_store.clone(),
1954                lang_registry.clone(),
1955                fs.clone(),
1956                cx,
1957            )
1958        });
1959        let (worktree_a, _) = project_a
1960            .update(&mut cx_a, |p, cx| {
1961                p.find_or_create_local_worktree("/dir", false, cx)
1962            })
1963            .await
1964            .unwrap();
1965        worktree_a
1966            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1967            .await;
1968        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1969        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1970        project_a
1971            .update(&mut cx_a, |p, cx| p.share(cx))
1972            .await
1973            .unwrap();
1974
1975        // Join that project as client B
1976        let project_b = Project::remote(
1977            project_id,
1978            client_b.clone(),
1979            client_b.user_store.clone(),
1980            lang_registry.clone(),
1981            fs.clone(),
1982            &mut cx_b.to_async(),
1983        )
1984        .await
1985        .unwrap();
1986
1987        // See that a guest has joined as client A.
1988        project_a
1989            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1990            .await;
1991
1992        // Begin opening a buffer as client B, but leave the project before the open completes.
1993        let buffer_b = cx_b
1994            .background()
1995            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1996        cx_b.update(|_| drop(project_b));
1997        drop(buffer_b);
1998
1999        // See that the guest has left.
2000        project_a
2001            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2002            .await;
2003    }
2004
2005    #[gpui::test]
2006    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2007        cx_a.foreground().forbid_parking();
2008        let lang_registry = Arc::new(LanguageRegistry::new());
2009        let fs = Arc::new(FakeFs::new(cx_a.background()));
2010
2011        // Connect to a server as 2 clients.
2012        let mut server = TestServer::start(cx_a.foreground()).await;
2013        let client_a = server.create_client(&mut cx_a, "user_a").await;
2014        let client_b = server.create_client(&mut cx_b, "user_b").await;
2015
2016        // Share a project as client A
2017        fs.insert_tree(
2018            "/a",
2019            json!({
2020                ".zed.toml": r#"collaborators = ["user_b"]"#,
2021                "a.txt": "a-contents",
2022                "b.txt": "b-contents",
2023            }),
2024        )
2025        .await;
2026        let project_a = cx_a.update(|cx| {
2027            Project::local(
2028                client_a.clone(),
2029                client_a.user_store.clone(),
2030                lang_registry.clone(),
2031                fs.clone(),
2032                cx,
2033            )
2034        });
2035        let (worktree_a, _) = project_a
2036            .update(&mut cx_a, |p, cx| {
2037                p.find_or_create_local_worktree("/a", false, cx)
2038            })
2039            .await
2040            .unwrap();
2041        worktree_a
2042            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2043            .await;
2044        let project_id = project_a
2045            .update(&mut cx_a, |project, _| project.next_remote_id())
2046            .await;
2047        project_a
2048            .update(&mut cx_a, |project, cx| project.share(cx))
2049            .await
2050            .unwrap();
2051
2052        // Join that project as client B
2053        let _project_b = Project::remote(
2054            project_id,
2055            client_b.clone(),
2056            client_b.user_store.clone(),
2057            lang_registry.clone(),
2058            fs.clone(),
2059            &mut cx_b.to_async(),
2060        )
2061        .await
2062        .unwrap();
2063
2064        // See that a guest has joined as client A.
2065        project_a
2066            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2067            .await;
2068
2069        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2070        client_b.disconnect(&cx_b.to_async()).unwrap();
2071        project_a
2072            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2073            .await;
2074    }
2075
2076    #[gpui::test]
2077    async fn test_collaborating_with_diagnostics(
2078        mut cx_a: TestAppContext,
2079        mut cx_b: TestAppContext,
2080    ) {
2081        cx_a.foreground().forbid_parking();
2082        let mut lang_registry = Arc::new(LanguageRegistry::new());
2083        let fs = Arc::new(FakeFs::new(cx_a.background()));
2084
2085        // Set up a fake language server.
2086        let (language_server_config, mut fake_language_server) =
2087            LanguageServerConfig::fake(cx_a.background()).await;
2088        Arc::get_mut(&mut lang_registry)
2089            .unwrap()
2090            .add(Arc::new(Language::new(
2091                LanguageConfig {
2092                    name: "Rust".to_string(),
2093                    path_suffixes: vec!["rs".to_string()],
2094                    language_server: Some(language_server_config),
2095                    ..Default::default()
2096                },
2097                Some(tree_sitter_rust::language()),
2098            )));
2099
2100        // Connect to a server as 2 clients.
2101        let mut server = TestServer::start(cx_a.foreground()).await;
2102        let client_a = server.create_client(&mut cx_a, "user_a").await;
2103        let client_b = server.create_client(&mut cx_b, "user_b").await;
2104
2105        // Share a project as client A
2106        fs.insert_tree(
2107            "/a",
2108            json!({
2109                ".zed.toml": r#"collaborators = ["user_b"]"#,
2110                "a.rs": "let one = two",
2111                "other.rs": "",
2112            }),
2113        )
2114        .await;
2115        let project_a = cx_a.update(|cx| {
2116            Project::local(
2117                client_a.clone(),
2118                client_a.user_store.clone(),
2119                lang_registry.clone(),
2120                fs.clone(),
2121                cx,
2122            )
2123        });
2124        let (worktree_a, _) = project_a
2125            .update(&mut cx_a, |p, cx| {
2126                p.find_or_create_local_worktree("/a", false, cx)
2127            })
2128            .await
2129            .unwrap();
2130        worktree_a
2131            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2132            .await;
2133        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2134        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2135        project_a
2136            .update(&mut cx_a, |p, cx| p.share(cx))
2137            .await
2138            .unwrap();
2139
2140        // Cause the language server to start.
2141        let _ = cx_a
2142            .background()
2143            .spawn(project_a.update(&mut cx_a, |project, cx| {
2144                project.open_buffer(
2145                    ProjectPath {
2146                        worktree_id,
2147                        path: Path::new("other.rs").into(),
2148                    },
2149                    cx,
2150                )
2151            }))
2152            .await
2153            .unwrap();
2154
2155        // Simulate a language server reporting errors for a file.
2156        fake_language_server
2157            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2158                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2159                version: None,
2160                diagnostics: vec![lsp::Diagnostic {
2161                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2162                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2163                    message: "message 1".to_string(),
2164                    ..Default::default()
2165                }],
2166            })
2167            .await;
2168
2169        // Wait for server to see the diagnostics update.
2170        server
2171            .condition(|store| {
2172                let worktree = store
2173                    .project(project_id)
2174                    .unwrap()
2175                    .worktrees
2176                    .get(&worktree_id.to_proto())
2177                    .unwrap();
2178
2179                !worktree
2180                    .share
2181                    .as_ref()
2182                    .unwrap()
2183                    .diagnostic_summaries
2184                    .is_empty()
2185            })
2186            .await;
2187
2188        // Join the worktree as client B.
2189        let project_b = Project::remote(
2190            project_id,
2191            client_b.clone(),
2192            client_b.user_store.clone(),
2193            lang_registry.clone(),
2194            fs.clone(),
2195            &mut cx_b.to_async(),
2196        )
2197        .await
2198        .unwrap();
2199
2200        project_b.read_with(&cx_b, |project, cx| {
2201            assert_eq!(
2202                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2203                &[(
2204                    ProjectPath {
2205                        worktree_id,
2206                        path: Arc::from(Path::new("a.rs")),
2207                    },
2208                    DiagnosticSummary {
2209                        error_count: 1,
2210                        warning_count: 0,
2211                        ..Default::default()
2212                    },
2213                )]
2214            )
2215        });
2216
2217        // Simulate a language server reporting more errors for a file.
2218        fake_language_server
2219            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2220                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2221                version: None,
2222                diagnostics: vec![
2223                    lsp::Diagnostic {
2224                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2225                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2226                        message: "message 1".to_string(),
2227                        ..Default::default()
2228                    },
2229                    lsp::Diagnostic {
2230                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2231                        range: lsp::Range::new(
2232                            lsp::Position::new(0, 10),
2233                            lsp::Position::new(0, 13),
2234                        ),
2235                        message: "message 2".to_string(),
2236                        ..Default::default()
2237                    },
2238                ],
2239            })
2240            .await;
2241
2242        // Client b gets the updated summaries
2243        project_b
2244            .condition(&cx_b, |project, cx| {
2245                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2246                    == &[(
2247                        ProjectPath {
2248                            worktree_id,
2249                            path: Arc::from(Path::new("a.rs")),
2250                        },
2251                        DiagnosticSummary {
2252                            error_count: 1,
2253                            warning_count: 1,
2254                            ..Default::default()
2255                        },
2256                    )]
2257            })
2258            .await;
2259
2260        // Open the file with the errors on client B. They should be present.
2261        let buffer_b = cx_b
2262            .background()
2263            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2264            .await
2265            .unwrap();
2266
2267        buffer_b.read_with(&cx_b, |buffer, _| {
2268            assert_eq!(
2269                buffer
2270                    .snapshot()
2271                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2272                    .map(|entry| entry)
2273                    .collect::<Vec<_>>(),
2274                &[
2275                    DiagnosticEntry {
2276                        range: Point::new(0, 4)..Point::new(0, 7),
2277                        diagnostic: Diagnostic {
2278                            group_id: 0,
2279                            message: "message 1".to_string(),
2280                            severity: lsp::DiagnosticSeverity::ERROR,
2281                            is_primary: true,
2282                            ..Default::default()
2283                        }
2284                    },
2285                    DiagnosticEntry {
2286                        range: Point::new(0, 10)..Point::new(0, 13),
2287                        diagnostic: Diagnostic {
2288                            group_id: 1,
2289                            severity: lsp::DiagnosticSeverity::WARNING,
2290                            message: "message 2".to_string(),
2291                            is_primary: true,
2292                            ..Default::default()
2293                        }
2294                    }
2295                ]
2296            );
2297        });
2298    }
2299
2300    #[gpui::test]
2301    async fn test_collaborating_with_completion(
2302        mut cx_a: TestAppContext,
2303        mut cx_b: TestAppContext,
2304    ) {
2305        cx_a.foreground().forbid_parking();
2306        let mut lang_registry = Arc::new(LanguageRegistry::new());
2307        let fs = Arc::new(FakeFs::new(cx_a.background()));
2308
2309        // Set up a fake language server.
2310        let (language_server_config, mut fake_language_server) =
2311            LanguageServerConfig::fake_with_capabilities(
2312                lsp::ServerCapabilities {
2313                    completion_provider: Some(lsp::CompletionOptions {
2314                        trigger_characters: Some(vec![".".to_string()]),
2315                        ..Default::default()
2316                    }),
2317                    ..Default::default()
2318                },
2319                cx_a.background(),
2320            )
2321            .await;
2322        Arc::get_mut(&mut lang_registry)
2323            .unwrap()
2324            .add(Arc::new(Language::new(
2325                LanguageConfig {
2326                    name: "Rust".to_string(),
2327                    path_suffixes: vec!["rs".to_string()],
2328                    language_server: Some(language_server_config),
2329                    ..Default::default()
2330                },
2331                Some(tree_sitter_rust::language()),
2332            )));
2333
2334        // Connect to a server as 2 clients.
2335        let mut server = TestServer::start(cx_a.foreground()).await;
2336        let client_a = server.create_client(&mut cx_a, "user_a").await;
2337        let client_b = server.create_client(&mut cx_b, "user_b").await;
2338
2339        // Share a project as client A
2340        fs.insert_tree(
2341            "/a",
2342            json!({
2343                ".zed.toml": r#"collaborators = ["user_b"]"#,
2344                "main.rs": "fn main() { a }",
2345                "other.rs": "",
2346            }),
2347        )
2348        .await;
2349        let project_a = cx_a.update(|cx| {
2350            Project::local(
2351                client_a.clone(),
2352                client_a.user_store.clone(),
2353                lang_registry.clone(),
2354                fs.clone(),
2355                cx,
2356            )
2357        });
2358        let (worktree_a, _) = project_a
2359            .update(&mut cx_a, |p, cx| {
2360                p.find_or_create_local_worktree("/a", false, cx)
2361            })
2362            .await
2363            .unwrap();
2364        worktree_a
2365            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2366            .await;
2367        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2368        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2369        project_a
2370            .update(&mut cx_a, |p, cx| p.share(cx))
2371            .await
2372            .unwrap();
2373
2374        // Join the worktree as client B.
2375        let project_b = Project::remote(
2376            project_id,
2377            client_b.clone(),
2378            client_b.user_store.clone(),
2379            lang_registry.clone(),
2380            fs.clone(),
2381            &mut cx_b.to_async(),
2382        )
2383        .await
2384        .unwrap();
2385
2386        // Open a file in an editor as the guest.
2387        let buffer_b = project_b
2388            .update(&mut cx_b, |p, cx| {
2389                p.open_buffer((worktree_id, "main.rs"), cx)
2390            })
2391            .await
2392            .unwrap();
2393        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2394        let editor_b = cx_b.add_view(window_b, |cx| {
2395            Editor::for_buffer(
2396                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2397                Arc::new(|cx| EditorSettings::test(cx)),
2398                cx,
2399            )
2400        });
2401
2402        // Type a completion trigger character as the guest.
2403        editor_b.update(&mut cx_b, |editor, cx| {
2404            editor.select_ranges([13..13], None, cx);
2405            editor.handle_input(&Input(".".into()), cx);
2406            cx.focus(&editor_b);
2407        });
2408
2409        // Receive a completion request as the host's language server.
2410        let (request_id, params) = fake_language_server
2411            .receive_request::<lsp::request::Completion>()
2412            .await;
2413        assert_eq!(
2414            params.text_document_position.text_document.uri,
2415            lsp::Url::from_file_path("/a/main.rs").unwrap(),
2416        );
2417        assert_eq!(
2418            params.text_document_position.position,
2419            lsp::Position::new(0, 14),
2420        );
2421
2422        // Return some completions from the host's language server.
2423        fake_language_server
2424            .respond(
2425                request_id,
2426                Some(lsp::CompletionResponse::Array(vec![
2427                    lsp::CompletionItem {
2428                        label: "first_method(…)".into(),
2429                        detail: Some("fn(&mut self, B) -> C".into()),
2430                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2431                            new_text: "first_method($1)".to_string(),
2432                            range: lsp::Range::new(
2433                                lsp::Position::new(0, 14),
2434                                lsp::Position::new(0, 14),
2435                            ),
2436                        })),
2437                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2438                        ..Default::default()
2439                    },
2440                    lsp::CompletionItem {
2441                        label: "second_method(…)".into(),
2442                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2443                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2444                            new_text: "second_method()".to_string(),
2445                            range: lsp::Range::new(
2446                                lsp::Position::new(0, 14),
2447                                lsp::Position::new(0, 14),
2448                            ),
2449                        })),
2450                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2451                        ..Default::default()
2452                    },
2453                ])),
2454            )
2455            .await;
2456
2457        // Open the buffer on the host.
2458        let buffer_a = project_a
2459            .update(&mut cx_a, |p, cx| {
2460                p.open_buffer((worktree_id, "main.rs"), cx)
2461            })
2462            .await
2463            .unwrap();
2464        buffer_a
2465            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2466            .await;
2467
2468        // Confirm a completion on the guest.
2469        editor_b.next_notification(&cx_b).await;
2470        editor_b.update(&mut cx_b, |editor, cx| {
2471            assert!(editor.showing_context_menu());
2472            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2473            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2474        });
2475
2476        buffer_a
2477            .condition(&cx_a, |buffer, _| {
2478                buffer.text() == "fn main() { a.first_method() }"
2479            })
2480            .await;
2481
2482        // Receive a request resolve the selected completion on the host's language server.
2483        let (request_id, params) = fake_language_server
2484            .receive_request::<lsp::request::ResolveCompletionItem>()
2485            .await;
2486        assert_eq!(params.label, "first_method(…)");
2487
2488        // Return a resolved completion from the host's language server.
2489        // The resolved completion has an additional text edit.
2490        fake_language_server
2491            .respond(
2492                request_id,
2493                lsp::CompletionItem {
2494                    label: "first_method(…)".into(),
2495                    detail: Some("fn(&mut self, B) -> C".into()),
2496                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2497                        new_text: "first_method($1)".to_string(),
2498                        range: lsp::Range::new(
2499                            lsp::Position::new(0, 14),
2500                            lsp::Position::new(0, 14),
2501                        ),
2502                    })),
2503                    additional_text_edits: Some(vec![lsp::TextEdit {
2504                        new_text: "use d::SomeTrait;\n".to_string(),
2505                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2506                    }]),
2507                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2508                    ..Default::default()
2509                },
2510            )
2511            .await;
2512
2513        // The additional edit is applied.
2514        buffer_b
2515            .condition(&cx_b, |buffer, _| {
2516                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2517            })
2518            .await;
2519        assert_eq!(
2520            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2521            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2522        );
2523    }
2524
2525    #[gpui::test]
2526    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2527        cx_a.foreground().forbid_parking();
2528        let mut lang_registry = Arc::new(LanguageRegistry::new());
2529        let fs = Arc::new(FakeFs::new(cx_a.background()));
2530
2531        // Set up a fake language server.
2532        let (language_server_config, mut fake_language_server) =
2533            LanguageServerConfig::fake(cx_a.background()).await;
2534        Arc::get_mut(&mut lang_registry)
2535            .unwrap()
2536            .add(Arc::new(Language::new(
2537                LanguageConfig {
2538                    name: "Rust".to_string(),
2539                    path_suffixes: vec!["rs".to_string()],
2540                    language_server: Some(language_server_config),
2541                    ..Default::default()
2542                },
2543                Some(tree_sitter_rust::language()),
2544            )));
2545
2546        // Connect to a server as 2 clients.
2547        let mut server = TestServer::start(cx_a.foreground()).await;
2548        let client_a = server.create_client(&mut cx_a, "user_a").await;
2549        let client_b = server.create_client(&mut cx_b, "user_b").await;
2550
2551        // Share a project as client A
2552        fs.insert_tree(
2553            "/a",
2554            json!({
2555                ".zed.toml": r#"collaborators = ["user_b"]"#,
2556                "a.rs": "let one = two",
2557            }),
2558        )
2559        .await;
2560        let project_a = cx_a.update(|cx| {
2561            Project::local(
2562                client_a.clone(),
2563                client_a.user_store.clone(),
2564                lang_registry.clone(),
2565                fs.clone(),
2566                cx,
2567            )
2568        });
2569        let (worktree_a, _) = project_a
2570            .update(&mut cx_a, |p, cx| {
2571                p.find_or_create_local_worktree("/a", false, cx)
2572            })
2573            .await
2574            .unwrap();
2575        worktree_a
2576            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2577            .await;
2578        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2579        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2580        project_a
2581            .update(&mut cx_a, |p, cx| p.share(cx))
2582            .await
2583            .unwrap();
2584
2585        // Join the worktree as client B.
2586        let project_b = Project::remote(
2587            project_id,
2588            client_b.clone(),
2589            client_b.user_store.clone(),
2590            lang_registry.clone(),
2591            fs.clone(),
2592            &mut cx_b.to_async(),
2593        )
2594        .await
2595        .unwrap();
2596
2597        let buffer_b = cx_b
2598            .background()
2599            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2600            .await
2601            .unwrap();
2602
2603        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2604        let (request_id, _) = fake_language_server
2605            .receive_request::<lsp::request::Formatting>()
2606            .await;
2607        fake_language_server
2608            .respond(
2609                request_id,
2610                Some(vec![
2611                    lsp::TextEdit {
2612                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2613                        new_text: "h".to_string(),
2614                    },
2615                    lsp::TextEdit {
2616                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2617                        new_text: "y".to_string(),
2618                    },
2619                ]),
2620            )
2621            .await;
2622        format.await.unwrap();
2623        assert_eq!(
2624            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2625            "let honey = two"
2626        );
2627    }
2628
2629    #[gpui::test]
2630    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2631        cx_a.foreground().forbid_parking();
2632        let mut lang_registry = Arc::new(LanguageRegistry::new());
2633        let fs = Arc::new(FakeFs::new(cx_a.background()));
2634        fs.insert_tree(
2635            "/root-1",
2636            json!({
2637                ".zed.toml": r#"collaborators = ["user_b"]"#,
2638                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2639            }),
2640        )
2641        .await;
2642        fs.insert_tree(
2643            "/root-2",
2644            json!({
2645                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2646            }),
2647        )
2648        .await;
2649
2650        // Set up a fake language server.
2651        let (language_server_config, mut fake_language_server) =
2652            LanguageServerConfig::fake(cx_a.background()).await;
2653        Arc::get_mut(&mut lang_registry)
2654            .unwrap()
2655            .add(Arc::new(Language::new(
2656                LanguageConfig {
2657                    name: "Rust".to_string(),
2658                    path_suffixes: vec!["rs".to_string()],
2659                    language_server: Some(language_server_config),
2660                    ..Default::default()
2661                },
2662                Some(tree_sitter_rust::language()),
2663            )));
2664
2665        // Connect to a server as 2 clients.
2666        let mut server = TestServer::start(cx_a.foreground()).await;
2667        let client_a = server.create_client(&mut cx_a, "user_a").await;
2668        let client_b = server.create_client(&mut cx_b, "user_b").await;
2669
2670        // Share a project as client A
2671        let project_a = cx_a.update(|cx| {
2672            Project::local(
2673                client_a.clone(),
2674                client_a.user_store.clone(),
2675                lang_registry.clone(),
2676                fs.clone(),
2677                cx,
2678            )
2679        });
2680        let (worktree_a, _) = project_a
2681            .update(&mut cx_a, |p, cx| {
2682                p.find_or_create_local_worktree("/root-1", false, cx)
2683            })
2684            .await
2685            .unwrap();
2686        worktree_a
2687            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2688            .await;
2689        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2690        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2691        project_a
2692            .update(&mut cx_a, |p, cx| p.share(cx))
2693            .await
2694            .unwrap();
2695
2696        // Join the worktree as client B.
2697        let project_b = Project::remote(
2698            project_id,
2699            client_b.clone(),
2700            client_b.user_store.clone(),
2701            lang_registry.clone(),
2702            fs.clone(),
2703            &mut cx_b.to_async(),
2704        )
2705        .await
2706        .unwrap();
2707
2708        // Open the file to be formatted on client B.
2709        let buffer_b = cx_b
2710            .background()
2711            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2712            .await
2713            .unwrap();
2714
2715        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2716        let (request_id, _) = fake_language_server
2717            .receive_request::<lsp::request::GotoDefinition>()
2718            .await;
2719        fake_language_server
2720            .respond(
2721                request_id,
2722                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2723                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2724                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2725                ))),
2726            )
2727            .await;
2728        let definitions_1 = definitions_1.await.unwrap();
2729        cx_b.read(|cx| {
2730            assert_eq!(definitions_1.len(), 1);
2731            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2732            let target_buffer = definitions_1[0].target_buffer.read(cx);
2733            assert_eq!(
2734                target_buffer.text(),
2735                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2736            );
2737            assert_eq!(
2738                definitions_1[0].target_range.to_point(target_buffer),
2739                Point::new(0, 6)..Point::new(0, 9)
2740            );
2741        });
2742
2743        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2744        // the previous call to `definition`.
2745        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2746        let (request_id, _) = fake_language_server
2747            .receive_request::<lsp::request::GotoDefinition>()
2748            .await;
2749        fake_language_server
2750            .respond(
2751                request_id,
2752                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2753                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2754                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2755                ))),
2756            )
2757            .await;
2758        let definitions_2 = definitions_2.await.unwrap();
2759        cx_b.read(|cx| {
2760            assert_eq!(definitions_2.len(), 1);
2761            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2762            let target_buffer = definitions_2[0].target_buffer.read(cx);
2763            assert_eq!(
2764                target_buffer.text(),
2765                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2766            );
2767            assert_eq!(
2768                definitions_2[0].target_range.to_point(target_buffer),
2769                Point::new(1, 6)..Point::new(1, 11)
2770            );
2771        });
2772        assert_eq!(
2773            definitions_1[0].target_buffer,
2774            definitions_2[0].target_buffer
2775        );
2776
2777        cx_b.update(|_| {
2778            drop(definitions_1);
2779            drop(definitions_2);
2780        });
2781        project_b
2782            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2783            .await;
2784    }
2785
2786    #[gpui::test]
2787    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2788        mut cx_a: TestAppContext,
2789        mut cx_b: TestAppContext,
2790        mut rng: StdRng,
2791    ) {
2792        cx_a.foreground().forbid_parking();
2793        let mut lang_registry = Arc::new(LanguageRegistry::new());
2794        let fs = Arc::new(FakeFs::new(cx_a.background()));
2795        fs.insert_tree(
2796            "/root",
2797            json!({
2798                ".zed.toml": r#"collaborators = ["user_b"]"#,
2799                "a.rs": "const ONE: usize = b::TWO;",
2800                "b.rs": "const TWO: usize = 2",
2801            }),
2802        )
2803        .await;
2804
2805        // Set up a fake language server.
2806        let (language_server_config, mut fake_language_server) =
2807            LanguageServerConfig::fake(cx_a.background()).await;
2808        Arc::get_mut(&mut lang_registry)
2809            .unwrap()
2810            .add(Arc::new(Language::new(
2811                LanguageConfig {
2812                    name: "Rust".to_string(),
2813                    path_suffixes: vec!["rs".to_string()],
2814                    language_server: Some(language_server_config),
2815                    ..Default::default()
2816                },
2817                Some(tree_sitter_rust::language()),
2818            )));
2819
2820        // Connect to a server as 2 clients.
2821        let mut server = TestServer::start(cx_a.foreground()).await;
2822        let client_a = server.create_client(&mut cx_a, "user_a").await;
2823        let client_b = server.create_client(&mut cx_b, "user_b").await;
2824
2825        // Share a project as client A
2826        let project_a = cx_a.update(|cx| {
2827            Project::local(
2828                client_a.clone(),
2829                client_a.user_store.clone(),
2830                lang_registry.clone(),
2831                fs.clone(),
2832                cx,
2833            )
2834        });
2835        let (worktree_a, _) = project_a
2836            .update(&mut cx_a, |p, cx| {
2837                p.find_or_create_local_worktree("/root", false, cx)
2838            })
2839            .await
2840            .unwrap();
2841        worktree_a
2842            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2843            .await;
2844        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2845        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2846        project_a
2847            .update(&mut cx_a, |p, cx| p.share(cx))
2848            .await
2849            .unwrap();
2850
2851        // Join the worktree as client B.
2852        let project_b = Project::remote(
2853            project_id,
2854            client_b.clone(),
2855            client_b.user_store.clone(),
2856            lang_registry.clone(),
2857            fs.clone(),
2858            &mut cx_b.to_async(),
2859        )
2860        .await
2861        .unwrap();
2862
2863        let buffer_b1 = cx_b
2864            .background()
2865            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2866            .await
2867            .unwrap();
2868
2869        let definitions;
2870        let buffer_b2;
2871        if rng.gen() {
2872            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2873            buffer_b2 =
2874                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2875        } else {
2876            buffer_b2 =
2877                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2878            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2879        }
2880
2881        let (request_id, _) = fake_language_server
2882            .receive_request::<lsp::request::GotoDefinition>()
2883            .await;
2884        fake_language_server
2885            .respond(
2886                request_id,
2887                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2888                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2889                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2890                ))),
2891            )
2892            .await;
2893
2894        let buffer_b2 = buffer_b2.await.unwrap();
2895        let definitions = definitions.await.unwrap();
2896        assert_eq!(definitions.len(), 1);
2897        assert_eq!(definitions[0].target_buffer, buffer_b2);
2898    }
2899
2900    #[gpui::test]
2901    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2902        cx_a.foreground().forbid_parking();
2903
2904        // Connect to a server as 2 clients.
2905        let mut server = TestServer::start(cx_a.foreground()).await;
2906        let client_a = server.create_client(&mut cx_a, "user_a").await;
2907        let client_b = server.create_client(&mut cx_b, "user_b").await;
2908
2909        // Create an org that includes these 2 users.
2910        let db = &server.app_state.db;
2911        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2912        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2913            .await
2914            .unwrap();
2915        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2916            .await
2917            .unwrap();
2918
2919        // Create a channel that includes all the users.
2920        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2921        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2922            .await
2923            .unwrap();
2924        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2925            .await
2926            .unwrap();
2927        db.create_channel_message(
2928            channel_id,
2929            client_b.current_user_id(&cx_b),
2930            "hello A, it's B.",
2931            OffsetDateTime::now_utc(),
2932            1,
2933        )
2934        .await
2935        .unwrap();
2936
2937        let channels_a = cx_a
2938            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2939        channels_a
2940            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2941            .await;
2942        channels_a.read_with(&cx_a, |list, _| {
2943            assert_eq!(
2944                list.available_channels().unwrap(),
2945                &[ChannelDetails {
2946                    id: channel_id.to_proto(),
2947                    name: "test-channel".to_string()
2948                }]
2949            )
2950        });
2951        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2952            this.get_channel(channel_id.to_proto(), cx).unwrap()
2953        });
2954        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2955        channel_a
2956            .condition(&cx_a, |channel, _| {
2957                channel_messages(channel)
2958                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2959            })
2960            .await;
2961
2962        let channels_b = cx_b
2963            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2964        channels_b
2965            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2966            .await;
2967        channels_b.read_with(&cx_b, |list, _| {
2968            assert_eq!(
2969                list.available_channels().unwrap(),
2970                &[ChannelDetails {
2971                    id: channel_id.to_proto(),
2972                    name: "test-channel".to_string()
2973                }]
2974            )
2975        });
2976
2977        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2978            this.get_channel(channel_id.to_proto(), cx).unwrap()
2979        });
2980        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2981        channel_b
2982            .condition(&cx_b, |channel, _| {
2983                channel_messages(channel)
2984                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2985            })
2986            .await;
2987
2988        channel_a
2989            .update(&mut cx_a, |channel, cx| {
2990                channel
2991                    .send_message("oh, hi B.".to_string(), cx)
2992                    .unwrap()
2993                    .detach();
2994                let task = channel.send_message("sup".to_string(), cx).unwrap();
2995                assert_eq!(
2996                    channel_messages(channel),
2997                    &[
2998                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2999                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3000                        ("user_a".to_string(), "sup".to_string(), true)
3001                    ]
3002                );
3003                task
3004            })
3005            .await
3006            .unwrap();
3007
3008        channel_b
3009            .condition(&cx_b, |channel, _| {
3010                channel_messages(channel)
3011                    == [
3012                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3013                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3014                        ("user_a".to_string(), "sup".to_string(), false),
3015                    ]
3016            })
3017            .await;
3018
3019        assert_eq!(
3020            server
3021                .state()
3022                .await
3023                .channel(channel_id)
3024                .unwrap()
3025                .connection_ids
3026                .len(),
3027            2
3028        );
3029        cx_b.update(|_| drop(channel_b));
3030        server
3031            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3032            .await;
3033
3034        cx_a.update(|_| drop(channel_a));
3035        server
3036            .condition(|state| state.channel(channel_id).is_none())
3037            .await;
3038    }
3039
3040    #[gpui::test]
3041    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3042        cx_a.foreground().forbid_parking();
3043
3044        let mut server = TestServer::start(cx_a.foreground()).await;
3045        let client_a = server.create_client(&mut cx_a, "user_a").await;
3046
3047        let db = &server.app_state.db;
3048        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3049        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3050        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3051            .await
3052            .unwrap();
3053        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3054            .await
3055            .unwrap();
3056
3057        let channels_a = cx_a
3058            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3059        channels_a
3060            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3061            .await;
3062        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3063            this.get_channel(channel_id.to_proto(), cx).unwrap()
3064        });
3065
3066        // Messages aren't allowed to be too long.
3067        channel_a
3068            .update(&mut cx_a, |channel, cx| {
3069                let long_body = "this is long.\n".repeat(1024);
3070                channel.send_message(long_body, cx).unwrap()
3071            })
3072            .await
3073            .unwrap_err();
3074
3075        // Messages aren't allowed to be blank.
3076        channel_a.update(&mut cx_a, |channel, cx| {
3077            channel.send_message(String::new(), cx).unwrap_err()
3078        });
3079
3080        // Leading and trailing whitespace are trimmed.
3081        channel_a
3082            .update(&mut cx_a, |channel, cx| {
3083                channel
3084                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3085                    .unwrap()
3086            })
3087            .await
3088            .unwrap();
3089        assert_eq!(
3090            db.get_channel_messages(channel_id, 10, None)
3091                .await
3092                .unwrap()
3093                .iter()
3094                .map(|m| &m.body)
3095                .collect::<Vec<_>>(),
3096            &["surrounded by whitespace"]
3097        );
3098    }
3099
3100    #[gpui::test]
3101    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3102        cx_a.foreground().forbid_parking();
3103
3104        // Connect to a server as 2 clients.
3105        let mut server = TestServer::start(cx_a.foreground()).await;
3106        let client_a = server.create_client(&mut cx_a, "user_a").await;
3107        let client_b = server.create_client(&mut cx_b, "user_b").await;
3108        let mut status_b = client_b.status();
3109
3110        // Create an org that includes these 2 users.
3111        let db = &server.app_state.db;
3112        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3113        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3114            .await
3115            .unwrap();
3116        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3117            .await
3118            .unwrap();
3119
3120        // Create a channel that includes all the users.
3121        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3122        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3123            .await
3124            .unwrap();
3125        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3126            .await
3127            .unwrap();
3128        db.create_channel_message(
3129            channel_id,
3130            client_b.current_user_id(&cx_b),
3131            "hello A, it's B.",
3132            OffsetDateTime::now_utc(),
3133            2,
3134        )
3135        .await
3136        .unwrap();
3137
3138        let channels_a = cx_a
3139            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3140        channels_a
3141            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3142            .await;
3143
3144        channels_a.read_with(&cx_a, |list, _| {
3145            assert_eq!(
3146                list.available_channels().unwrap(),
3147                &[ChannelDetails {
3148                    id: channel_id.to_proto(),
3149                    name: "test-channel".to_string()
3150                }]
3151            )
3152        });
3153        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3154            this.get_channel(channel_id.to_proto(), cx).unwrap()
3155        });
3156        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3157        channel_a
3158            .condition(&cx_a, |channel, _| {
3159                channel_messages(channel)
3160                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3161            })
3162            .await;
3163
3164        let channels_b = cx_b
3165            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3166        channels_b
3167            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3168            .await;
3169        channels_b.read_with(&cx_b, |list, _| {
3170            assert_eq!(
3171                list.available_channels().unwrap(),
3172                &[ChannelDetails {
3173                    id: channel_id.to_proto(),
3174                    name: "test-channel".to_string()
3175                }]
3176            )
3177        });
3178
3179        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3180            this.get_channel(channel_id.to_proto(), cx).unwrap()
3181        });
3182        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3183        channel_b
3184            .condition(&cx_b, |channel, _| {
3185                channel_messages(channel)
3186                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3187            })
3188            .await;
3189
3190        // Disconnect client B, ensuring we can still access its cached channel data.
3191        server.forbid_connections();
3192        server.disconnect_client(client_b.current_user_id(&cx_b));
3193        while !matches!(
3194            status_b.next().await,
3195            Some(client::Status::ReconnectionError { .. })
3196        ) {}
3197
3198        channels_b.read_with(&cx_b, |channels, _| {
3199            assert_eq!(
3200                channels.available_channels().unwrap(),
3201                [ChannelDetails {
3202                    id: channel_id.to_proto(),
3203                    name: "test-channel".to_string()
3204                }]
3205            )
3206        });
3207        channel_b.read_with(&cx_b, |channel, _| {
3208            assert_eq!(
3209                channel_messages(channel),
3210                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3211            )
3212        });
3213
3214        // Send a message from client B while it is disconnected.
3215        channel_b
3216            .update(&mut cx_b, |channel, cx| {
3217                let task = channel
3218                    .send_message("can you see this?".to_string(), cx)
3219                    .unwrap();
3220                assert_eq!(
3221                    channel_messages(channel),
3222                    &[
3223                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3224                        ("user_b".to_string(), "can you see this?".to_string(), true)
3225                    ]
3226                );
3227                task
3228            })
3229            .await
3230            .unwrap_err();
3231
3232        // Send a message from client A while B is disconnected.
3233        channel_a
3234            .update(&mut cx_a, |channel, cx| {
3235                channel
3236                    .send_message("oh, hi B.".to_string(), cx)
3237                    .unwrap()
3238                    .detach();
3239                let task = channel.send_message("sup".to_string(), cx).unwrap();
3240                assert_eq!(
3241                    channel_messages(channel),
3242                    &[
3243                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3244                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3245                        ("user_a".to_string(), "sup".to_string(), true)
3246                    ]
3247                );
3248                task
3249            })
3250            .await
3251            .unwrap();
3252
3253        // Give client B a chance to reconnect.
3254        server.allow_connections();
3255        cx_b.foreground().advance_clock(Duration::from_secs(10));
3256
3257        // Verify that B sees the new messages upon reconnection, as well as the message client B
3258        // sent while offline.
3259        channel_b
3260            .condition(&cx_b, |channel, _| {
3261                channel_messages(channel)
3262                    == [
3263                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3264                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3265                        ("user_a".to_string(), "sup".to_string(), false),
3266                        ("user_b".to_string(), "can you see this?".to_string(), false),
3267                    ]
3268            })
3269            .await;
3270
3271        // Ensure client A and B can communicate normally after reconnection.
3272        channel_a
3273            .update(&mut cx_a, |channel, cx| {
3274                channel.send_message("you online?".to_string(), cx).unwrap()
3275            })
3276            .await
3277            .unwrap();
3278        channel_b
3279            .condition(&cx_b, |channel, _| {
3280                channel_messages(channel)
3281                    == [
3282                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3283                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3284                        ("user_a".to_string(), "sup".to_string(), false),
3285                        ("user_b".to_string(), "can you see this?".to_string(), false),
3286                        ("user_a".to_string(), "you online?".to_string(), false),
3287                    ]
3288            })
3289            .await;
3290
3291        channel_b
3292            .update(&mut cx_b, |channel, cx| {
3293                channel.send_message("yep".to_string(), cx).unwrap()
3294            })
3295            .await
3296            .unwrap();
3297        channel_a
3298            .condition(&cx_a, |channel, _| {
3299                channel_messages(channel)
3300                    == [
3301                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3302                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3303                        ("user_a".to_string(), "sup".to_string(), false),
3304                        ("user_b".to_string(), "can you see this?".to_string(), false),
3305                        ("user_a".to_string(), "you online?".to_string(), false),
3306                        ("user_b".to_string(), "yep".to_string(), false),
3307                    ]
3308            })
3309            .await;
3310    }
3311
3312    #[gpui::test]
3313    async fn test_contacts(
3314        mut cx_a: TestAppContext,
3315        mut cx_b: TestAppContext,
3316        mut cx_c: TestAppContext,
3317    ) {
3318        cx_a.foreground().forbid_parking();
3319        let lang_registry = Arc::new(LanguageRegistry::new());
3320        let fs = Arc::new(FakeFs::new(cx_a.background()));
3321
3322        // Connect to a server as 3 clients.
3323        let mut server = TestServer::start(cx_a.foreground()).await;
3324        let client_a = server.create_client(&mut cx_a, "user_a").await;
3325        let client_b = server.create_client(&mut cx_b, "user_b").await;
3326        let client_c = server.create_client(&mut cx_c, "user_c").await;
3327
3328        // Share a worktree as client A.
3329        fs.insert_tree(
3330            "/a",
3331            json!({
3332                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3333            }),
3334        )
3335        .await;
3336
3337        let project_a = cx_a.update(|cx| {
3338            Project::local(
3339                client_a.clone(),
3340                client_a.user_store.clone(),
3341                lang_registry.clone(),
3342                fs.clone(),
3343                cx,
3344            )
3345        });
3346        let (worktree_a, _) = project_a
3347            .update(&mut cx_a, |p, cx| {
3348                p.find_or_create_local_worktree("/a", false, cx)
3349            })
3350            .await
3351            .unwrap();
3352        worktree_a
3353            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3354            .await;
3355
3356        client_a
3357            .user_store
3358            .condition(&cx_a, |user_store, _| {
3359                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3360            })
3361            .await;
3362        client_b
3363            .user_store
3364            .condition(&cx_b, |user_store, _| {
3365                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3366            })
3367            .await;
3368        client_c
3369            .user_store
3370            .condition(&cx_c, |user_store, _| {
3371                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3372            })
3373            .await;
3374
3375        let project_id = project_a
3376            .update(&mut cx_a, |project, _| project.next_remote_id())
3377            .await;
3378        project_a
3379            .update(&mut cx_a, |project, cx| project.share(cx))
3380            .await
3381            .unwrap();
3382
3383        let _project_b = Project::remote(
3384            project_id,
3385            client_b.clone(),
3386            client_b.user_store.clone(),
3387            lang_registry.clone(),
3388            fs.clone(),
3389            &mut cx_b.to_async(),
3390        )
3391        .await
3392        .unwrap();
3393
3394        client_a
3395            .user_store
3396            .condition(&cx_a, |user_store, _| {
3397                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3398            })
3399            .await;
3400        client_b
3401            .user_store
3402            .condition(&cx_b, |user_store, _| {
3403                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3404            })
3405            .await;
3406        client_c
3407            .user_store
3408            .condition(&cx_c, |user_store, _| {
3409                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3410            })
3411            .await;
3412
3413        project_a
3414            .condition(&cx_a, |project, _| {
3415                project.collaborators().contains_key(&client_b.peer_id)
3416            })
3417            .await;
3418
3419        cx_a.update(move |_| drop(project_a));
3420        client_a
3421            .user_store
3422            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3423            .await;
3424        client_b
3425            .user_store
3426            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3427            .await;
3428        client_c
3429            .user_store
3430            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3431            .await;
3432
3433        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3434            user_store
3435                .contacts()
3436                .iter()
3437                .map(|contact| {
3438                    let worktrees = contact
3439                        .projects
3440                        .iter()
3441                        .map(|p| {
3442                            (
3443                                p.worktree_root_names[0].as_str(),
3444                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3445                            )
3446                        })
3447                        .collect();
3448                    (contact.user.github_login.as_str(), worktrees)
3449                })
3450                .collect()
3451        }
3452    }
3453
3454    struct TestServer {
3455        peer: Arc<Peer>,
3456        app_state: Arc<AppState>,
3457        server: Arc<Server>,
3458        foreground: Rc<executor::Foreground>,
3459        notifications: mpsc::Receiver<()>,
3460        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3461        forbid_connections: Arc<AtomicBool>,
3462        _test_db: TestDb,
3463    }
3464
3465    impl TestServer {
3466        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3467            let test_db = TestDb::new();
3468            let app_state = Self::build_app_state(&test_db).await;
3469            let peer = Peer::new();
3470            let notifications = mpsc::channel(128);
3471            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3472            Self {
3473                peer,
3474                app_state,
3475                server,
3476                foreground,
3477                notifications: notifications.1,
3478                connection_killers: Default::default(),
3479                forbid_connections: Default::default(),
3480                _test_db: test_db,
3481            }
3482        }
3483
3484        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3485            let http = FakeHttpClient::with_404_response();
3486            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3487            let client_name = name.to_string();
3488            let mut client = Client::new(http.clone());
3489            let server = self.server.clone();
3490            let connection_killers = self.connection_killers.clone();
3491            let forbid_connections = self.forbid_connections.clone();
3492            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3493
3494            Arc::get_mut(&mut client)
3495                .unwrap()
3496                .override_authenticate(move |cx| {
3497                    cx.spawn(|_| async move {
3498                        let access_token = "the-token".to_string();
3499                        Ok(Credentials {
3500                            user_id: user_id.0 as u64,
3501                            access_token,
3502                        })
3503                    })
3504                })
3505                .override_establish_connection(move |credentials, cx| {
3506                    assert_eq!(credentials.user_id, user_id.0 as u64);
3507                    assert_eq!(credentials.access_token, "the-token");
3508
3509                    let server = server.clone();
3510                    let connection_killers = connection_killers.clone();
3511                    let forbid_connections = forbid_connections.clone();
3512                    let client_name = client_name.clone();
3513                    let connection_id_tx = connection_id_tx.clone();
3514                    cx.spawn(move |cx| async move {
3515                        if forbid_connections.load(SeqCst) {
3516                            Err(EstablishConnectionError::other(anyhow!(
3517                                "server is forbidding connections"
3518                            )))
3519                        } else {
3520                            let (client_conn, server_conn, kill_conn) =
3521                                Connection::in_memory(cx.background());
3522                            connection_killers.lock().insert(user_id, kill_conn);
3523                            cx.background()
3524                                .spawn(server.handle_connection(
3525                                    server_conn,
3526                                    client_name,
3527                                    user_id,
3528                                    Some(connection_id_tx),
3529                                ))
3530                                .detach();
3531                            Ok(client_conn)
3532                        }
3533                    })
3534                });
3535
3536            client
3537                .authenticate_and_connect(&cx.to_async())
3538                .await
3539                .unwrap();
3540
3541            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3542            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3543            let mut authed_user =
3544                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3545            while authed_user.next().await.unwrap().is_none() {}
3546
3547            TestClient {
3548                client,
3549                peer_id,
3550                user_store,
3551            }
3552        }
3553
3554        fn disconnect_client(&self, user_id: UserId) {
3555            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3556                let _ = kill_conn.try_send(Some(()));
3557            }
3558        }
3559
3560        fn forbid_connections(&self) {
3561            self.forbid_connections.store(true, SeqCst);
3562        }
3563
3564        fn allow_connections(&self) {
3565            self.forbid_connections.store(false, SeqCst);
3566        }
3567
3568        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3569            let mut config = Config::default();
3570            config.session_secret = "a".repeat(32);
3571            config.database_url = test_db.url.clone();
3572            let github_client = github::AppClient::test();
3573            Arc::new(AppState {
3574                db: test_db.db().clone(),
3575                handlebars: Default::default(),
3576                auth_client: auth::build_client("", ""),
3577                repo_client: github::RepoClient::test(&github_client),
3578                github_client,
3579                config,
3580            })
3581        }
3582
3583        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3584            self.server.store.read()
3585        }
3586
3587        async fn condition<F>(&mut self, mut predicate: F)
3588        where
3589            F: FnMut(&Store) -> bool,
3590        {
3591            async_std::future::timeout(Duration::from_millis(500), async {
3592                while !(predicate)(&*self.server.store.read()) {
3593                    self.foreground.start_waiting();
3594                    self.notifications.next().await;
3595                    self.foreground.finish_waiting();
3596                }
3597            })
3598            .await
3599            .expect("condition timed out");
3600        }
3601    }
3602
3603    impl Drop for TestServer {
3604        fn drop(&mut self) {
3605            self.peer.reset();
3606        }
3607    }
3608
3609    struct TestClient {
3610        client: Arc<Client>,
3611        pub peer_id: PeerId,
3612        pub user_store: ModelHandle<UserStore>,
3613    }
3614
3615    impl Deref for TestClient {
3616        type Target = Arc<Client>;
3617
3618        fn deref(&self) -> &Self::Target {
3619            &self.client
3620        }
3621    }
3622
3623    impl TestClient {
3624        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3625            UserId::from_proto(
3626                self.user_store
3627                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3628            )
3629        }
3630    }
3631
3632    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3633        channel
3634            .messages()
3635            .cursor::<()>()
3636            .map(|m| {
3637                (
3638                    m.sender.github_login.clone(),
3639                    m.body.clone(),
3640                    m.is_pending(),
3641                )
3642            })
3643            .collect()
3644    }
3645
3646    struct EmptyView;
3647
3648    impl gpui::Entity for EmptyView {
3649        type Event = ();
3650    }
3651
3652    impl gpui::View for EmptyView {
3653        fn ui_name() -> &'static str {
3654            "empty view"
3655        }
3656
3657        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3658            gpui::Element::boxed(gpui::elements::Empty)
3659        }
3660    }
3661}