diff --git a/Cargo.lock b/Cargo.lock index 16b2742d436e9b8cfb9bcb035f5510e73b4fadd9..c5b1023687a261f2b3fffdfd21a313583a05b7fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -823,6 +823,7 @@ dependencies = [ "futures 0.3.25", "gpui", "live_kit_client", + "log", "media", "postage", "project", @@ -1130,7 +1131,7 @@ dependencies = [ [[package]] name = "collab" -version = "0.3.4" +version = "0.3.14" dependencies = [ "anyhow", "async-tungstenite", diff --git a/crates/call/Cargo.toml b/crates/call/Cargo.toml index a7a3331d20be6b93701999de33d0922e184e38af..c0a6cedc622d6db2d6bec4a3ff3895182c541f85 100644 --- a/crates/call/Cargo.toml +++ b/crates/call/Cargo.toml @@ -21,6 +21,7 @@ test-support = [ client = { path = "../client" } collections = { path = "../collections" } gpui = { path = "../gpui" } +log = "0.4" live_kit_client = { path = "../live_kit_client" } media = { path = "../media" } project = { path = "../project" } diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 8deb4341180afa02c39dc096f78b95714d93784a..1d22fe50f1a8ff4e9ac530ff991f5f9010a2b4e6 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -3,7 +3,7 @@ use crate::{ IncomingCall, }; use anyhow::{anyhow, Result}; -use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore}; +use client::{proto, Client, TypedEnvelope, User, UserStore}; use collections::{BTreeMap, HashSet}; use futures::{FutureExt, StreamExt}; use gpui::{ @@ -13,17 +13,17 @@ use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUp use postage::stream::Stream; use project::Project; use std::{mem, sync::Arc, time::Duration}; -use util::{post_inc, ResultExt}; +use util::{post_inc, ResultExt, TryFutureExt}; pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT; #[derive(Clone, Debug, PartialEq, Eq)] pub enum Event { ParticipantLocationChanged { - participant_id: PeerId, + participant_id: proto::PeerId, }, RemoteVideoTracksChanged { - participant_id: PeerId, + participant_id: proto::PeerId, }, RemoteProjectShared { owner: Arc, @@ -41,7 +41,7 @@ pub struct Room { live_kit: Option, status: RoomStatus, local_participant: LocalParticipant, - remote_participants: BTreeMap, + remote_participants: BTreeMap, pending_participants: Vec>, participant_user_ids: HashSet, pending_call_count: usize, @@ -50,7 +50,7 @@ pub struct Room { user_store: ModelHandle, subscriptions: Vec, pending_room_update: Option>, - maintain_connection: Option>>, + maintain_connection: Option>>, } impl Entity for Room { @@ -58,6 +58,7 @@ impl Entity for Room { fn release(&mut self, _: &mut MutableAppContext) { if self.status.is_online() { + log::info!("room was released, sending leave message"); self.client.send(proto::LeaveRoom {}).log_err(); } } @@ -122,7 +123,7 @@ impl Room { }; let maintain_connection = - cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx)); + cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()); Self { id, @@ -229,6 +230,7 @@ impl Room { cx.notify(); cx.emit(Event::Left); + log::info!("leaving room"); self.status = RoomStatus::Offline; self.remote_participants.clear(); self.pending_participants.clear(); @@ -254,6 +256,7 @@ impl Room { .map_or(false, |s| s.is_connected()); // Even if we're initially connected, any future change of the status means we momentarily disconnected. if !is_connected || client_status.next().await.is_some() { + log::info!("detected client disconnection"); let room_id = this .upgrade(&cx) .ok_or_else(|| anyhow!("room was dropped"))? @@ -269,8 +272,13 @@ impl Room { let client_reconnection = async { let mut remaining_attempts = 3; while remaining_attempts > 0 { + log::info!( + "waiting for client status change, remaining attempts {}", + remaining_attempts + ); if let Some(status) = client_status.next().await { if status.is_connected() { + log::info!("client reconnected, attempting to rejoin room"); let rejoin_room = async { let response = client.request(proto::JoinRoom { id: room_id }).await?; @@ -285,7 +293,7 @@ impl Room { anyhow::Ok(()) }; - if rejoin_room.await.is_ok() { + if rejoin_room.await.log_err().is_some() { return true; } else { remaining_attempts -= 1; @@ -303,12 +311,15 @@ impl Room { futures::select_biased! { reconnected = client_reconnection => { if reconnected { + log::info!("successfully reconnected to room"); // If we successfully joined the room, go back around the loop // waiting for future connection status changes. continue; } } - _ = reconnection_timeout => {} + _ = reconnection_timeout => { + log::info!("room reconnection timeout expired"); + } } } @@ -316,6 +327,7 @@ impl Room { // or an error occurred while trying to re-join the room. Either way // we leave the room and return an error. if let Some(this) = this.upgrade(&cx) { + log::info!("reconnection failed, leaving room"); let _ = this.update(&mut cx, |this, cx| this.leave(cx)); } return Err(anyhow!( @@ -337,7 +349,7 @@ impl Room { &self.local_participant } - pub fn remote_participants(&self) -> &BTreeMap { + pub fn remote_participants(&self) -> &BTreeMap { &self.remote_participants } @@ -407,7 +419,7 @@ impl Room { if let Some(participants) = remote_participants.log_err() { let mut participant_peer_ids = HashSet::default(); for (participant, user) in room.participants.into_iter().zip(participants) { - let peer_id = PeerId(participant.peer_id); + let Some(peer_id) = participant.peer_id else { continue }; this.participant_user_ids.insert(participant.user_id); participant_peer_ids.insert(peer_id); @@ -464,7 +476,7 @@ impl Room { if let Some(live_kit) = this.live_kit.as_ref() { let tracks = - live_kit.room.remote_video_tracks(&peer_id.0.to_string()); + live_kit.room.remote_video_tracks(&peer_id.to_string()); for track in tracks { this.remote_video_track_updated( RemoteVideoTrackUpdate::Subscribed(track), @@ -499,6 +511,7 @@ impl Room { this.pending_room_update.take(); if this.should_leave() { + log::info!("room is empty, leaving"); let _ = this.leave(cx); } @@ -518,7 +531,7 @@ impl Room { ) -> Result<()> { match change { RemoteVideoTrackUpdate::Subscribed(track) => { - let peer_id = PeerId(track.publisher_id().parse()?); + let peer_id = track.publisher_id().parse()?; let track_id = track.sid().to_string(); let participant = self .remote_participants @@ -538,7 +551,7 @@ impl Room { publisher_id, track_id, } => { - let peer_id = PeerId(publisher_id.parse()?); + let peer_id = publisher_id.parse()?; let participant = self .remote_participants .get_mut(&peer_id) diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 5e10f9ea8f4374d3e0607be2a0f68fc5259bc431..6d9ec305b697981a6c1c4f8492655db4d9eb6757 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -23,7 +23,7 @@ use lazy_static::lazy_static; use parking_lot::RwLock; use postage::watch; use rand::prelude::*; -use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage}; +use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage}; use serde::Deserialize; use std::{ any::TypeId, @@ -140,7 +140,7 @@ impl EstablishConnectionError { } } -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, PartialEq)] pub enum Status { SignedOut, UpgradeRequired, @@ -306,7 +306,7 @@ impl Client { pub fn new(http: Arc, cx: &AppContext) -> Arc { Arc::new(Self { id: 0, - peer: Peer::new(), + peer: Peer::new(0), telemetry: Telemetry::new(http.clone(), cx), http, state: Default::default(), @@ -333,14 +333,14 @@ impl Client { } #[cfg(any(test, feature = "test-support"))] - pub fn tear_down(&self) { + pub fn teardown(&self) { let mut state = self.state.write(); state._reconnect_task.take(); state.message_handlers.clear(); state.models_by_message_type.clear(); state.entities_by_type_and_remote_id.clear(); state.entity_id_extractors.clear(); - self.peer.reset(); + self.peer.teardown(); } #[cfg(any(test, feature = "test-support"))] @@ -810,7 +810,11 @@ impl Client { hello_message_type_name ) })?; - Ok(PeerId(hello.payload.peer_id)) + let peer_id = hello + .payload + .peer_id + .ok_or_else(|| anyhow!("invalid peer id"))?; + Ok(peer_id) }; let peer_id = match peer_id.await { @@ -822,7 +826,7 @@ impl Client { }; log::info!( - "set status to connected (connection id: {}, peer id: {})", + "set status to connected (connection id: {:?}, peer id: {:?})", connection_id, peer_id ); @@ -853,7 +857,7 @@ impl Client { .spawn(async move { match handle_io.await { Ok(()) => { - if *this.status().borrow() + if this.status().borrow().clone() == (Status::Connected { connection_id, peer_id, @@ -1194,7 +1198,7 @@ impl Client { let mut state = self.state.write(); let type_name = message.payload_type_name(); let payload_type_id = message.payload_type_id(); - let sender_id = message.original_sender_id().map(|id| id.0); + let sender_id = message.original_sender_id(); let mut subscriber = None; diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 3cfba3b1847c4af4655ad625840492db59249974..db9e0d8c487b27a7474373af9d2c25a29e04b9d7 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -35,7 +35,7 @@ impl FakeServer { cx: &TestAppContext, ) -> Self { let server = Self { - peer: Peer::new(), + peer: Peer::new(0), state: Default::default(), user_id: client_user_id, executor: cx.foreground(), @@ -92,7 +92,7 @@ impl FakeServer { peer.send( connection_id, proto::Hello { - peer_id: connection_id.0, + peer_id: Some(connection_id.into()), }, ) .unwrap(); diff --git a/crates/collab/.env.toml b/crates/collab/.env.toml index 1945d9cb66b33ee36a8e17f4ebbc8af54f346c5c..b4a6694e5e6578e771406d6947c2c4ad8d2efb0a 100644 --- a/crates/collab/.env.toml +++ b/crates/collab/.env.toml @@ -2,6 +2,7 @@ DATABASE_URL = "postgres://postgres@localhost/zed" HTTP_PORT = 8080 API_TOKEN = "secret" INVITE_LINK_PREFIX = "http://localhost:3000/invites/" +ZED_ENVIRONMENT = "development" LIVE_KIT_SERVER = "http://localhost:7880" LIVE_KIT_KEY = "devkey" LIVE_KIT_SECRET = "secret" diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index c40ae9cd83e411209f2ff270f8f2f44073fabd24..4f2b83d7ecc4ecc01b9962ac58fea2b1c34d751a 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -3,7 +3,7 @@ authors = ["Nathan Sobo "] default-run = "collab" edition = "2021" name = "collab" -version = "0.3.4" +version = "0.3.14" [[bin]] name = "collab" diff --git a/crates/collab/k8s/manifest.template.yml b/crates/collab/k8s/manifest.template.yml index 1f0fafb170256b7a75268d6349c38d89cf7f9d69..339d02892ef2cf24f40815c3fd32dc0fb72c317a 100644 --- a/crates/collab/k8s/manifest.template.yml +++ b/crates/collab/k8s/manifest.template.yml @@ -59,6 +59,12 @@ spec: ports: - containerPort: 8080 protocol: TCP + readinessProbe: + httpGet: + path: / + port: 8080 + initialDelaySeconds: 1 + periodSeconds: 1 env: - name: HTTP_PORT value: "8080" @@ -93,6 +99,8 @@ spec: value: ${RUST_LOG} - name: LOG_JSON value: "true" + - name: ZED_ENVIRONMENT + value: ${ZED_ENVIRONMENT} securityContext: capabilities: # FIXME - Switch to the more restrictive `PERFMON` capability. diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index c0cc5b3457e5b279a6f211ec7e9f2b9d30823868..d002c8a135caec2590be6cf76efda340bcfd04f3 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -43,11 +43,12 @@ CREATE TABLE "projects" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "room_id" INTEGER REFERENCES rooms (id) NOT NULL, "host_user_id" INTEGER REFERENCES users (id) NOT NULL, - "host_connection_id" INTEGER NOT NULL, - "host_connection_epoch" TEXT NOT NULL, + "host_connection_id" INTEGER, + "host_connection_server_id" INTEGER REFERENCES servers (id) ON DELETE CASCADE, "unregistered" BOOLEAN NOT NULL DEFAULT FALSE ); -CREATE INDEX "index_projects_on_host_connection_epoch" ON "projects" ("host_connection_epoch"); +CREATE INDEX "index_projects_on_host_connection_server_id" ON "projects" ("host_connection_server_id"); +CREATE INDEX "index_projects_on_host_connection_id_and_host_connection_server_id" ON "projects" ("host_connection_id", "host_connection_server_id"); CREATE TABLE "worktrees" ( "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, @@ -103,34 +104,39 @@ CREATE TABLE "project_collaborators" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "connection_id" INTEGER NOT NULL, - "connection_epoch" TEXT NOT NULL, + "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE, "user_id" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, "is_host" BOOLEAN NOT NULL ); CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id"); CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id"); -CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch"); +CREATE INDEX "index_project_collaborators_on_connection_server_id" ON "project_collaborators" ("connection_server_id"); CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id"); -CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch"); +CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_server_id" ON "project_collaborators" ("project_id", "connection_id", "connection_server_id"); CREATE TABLE "room_participants" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "room_id" INTEGER NOT NULL REFERENCES rooms (id), "user_id" INTEGER NOT NULL REFERENCES users (id), "answering_connection_id" INTEGER, - "answering_connection_epoch" TEXT, + "answering_connection_server_id" INTEGER REFERENCES servers (id) ON DELETE CASCADE, "answering_connection_lost" BOOLEAN NOT NULL, "location_kind" INTEGER, "location_project_id" INTEGER, "initial_project_id" INTEGER, "calling_user_id" INTEGER NOT NULL REFERENCES users (id), "calling_connection_id" INTEGER NOT NULL, - "calling_connection_epoch" TEXT NOT NULL + "calling_connection_server_id" INTEGER REFERENCES servers (id) ON DELETE SET NULL ); CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id"); -CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch"); -CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch"); +CREATE INDEX "index_room_participants_on_answering_connection_server_id" ON "room_participants" ("answering_connection_server_id"); +CREATE INDEX "index_room_participants_on_calling_connection_server_id" ON "room_participants" ("calling_connection_server_id"); CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id"); -CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch"); +CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_server_id" ON "room_participants" ("answering_connection_id", "answering_connection_server_id"); + +CREATE TABLE "servers" ( + "id" INTEGER PRIMARY KEY AUTOINCREMENT, + "environment" VARCHAR NOT NULL +); diff --git a/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql b/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql new file mode 100644 index 0000000000000000000000000000000000000000..5e02f76ce25d59d799d5e5d9719e4e038d1bac02 --- /dev/null +++ b/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql @@ -0,0 +1,30 @@ +CREATE TABLE servers ( + id SERIAL PRIMARY KEY, + environment VARCHAR NOT NULL +); + +DROP TABLE worktree_extensions; +DROP TABLE project_activity_periods; +DELETE from projects; +ALTER TABLE projects + DROP COLUMN host_connection_epoch, + ADD COLUMN host_connection_server_id INTEGER REFERENCES servers (id) ON DELETE CASCADE; +CREATE INDEX "index_projects_on_host_connection_server_id" ON "projects" ("host_connection_server_id"); +CREATE INDEX "index_projects_on_host_connection_id_and_host_connection_server_id" ON "projects" ("host_connection_id", "host_connection_server_id"); + +DELETE FROM project_collaborators; +ALTER TABLE project_collaborators + DROP COLUMN connection_epoch, + ADD COLUMN connection_server_id INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE; +CREATE INDEX "index_project_collaborators_on_connection_server_id" ON "project_collaborators" ("connection_server_id"); +CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_server_id" ON "project_collaborators" ("project_id", "connection_id", "connection_server_id"); + +DELETE FROM room_participants; +ALTER TABLE room_participants + DROP COLUMN answering_connection_epoch, + DROP COLUMN calling_connection_epoch, + ADD COLUMN answering_connection_server_id INTEGER REFERENCES servers (id) ON DELETE CASCADE, + ADD COLUMN calling_connection_server_id INTEGER REFERENCES servers (id) ON DELETE SET NULL; +CREATE INDEX "index_room_participants_on_answering_connection_server_id" ON "room_participants" ("answering_connection_server_id"); +CREATE INDEX "index_room_participants_on_calling_connection_server_id" ON "room_participants" ("calling_connection_server_id"); +CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_server_id" ON "room_participants" ("answering_connection_id", "answering_connection_server_id"); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 892ea4ccd658faab2ccea8b685110a6d2bb6057a..b1cbddc77ea398b26d5a9b00bb0216b097a45adc 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -5,6 +5,7 @@ mod project; mod project_collaborator; mod room; mod room_participant; +mod server; mod signup; #[cfg(test)] mod tests; @@ -48,7 +49,6 @@ pub struct Database { background: Option>, #[cfg(test)] runtime: Option, - epoch: parking_lot::RwLock, } impl Database { @@ -61,18 +61,12 @@ impl Database { background: None, #[cfg(test)] runtime: None, - epoch: parking_lot::RwLock::new(Uuid::new_v4()), }) } #[cfg(test)] pub fn reset(&self) { self.rooms.clear(); - *self.epoch.write() = Uuid::new_v4(); - } - - fn epoch(&self) -> Uuid { - *self.epoch.read() } pub async fn migrate( @@ -116,14 +110,40 @@ impl Database { Ok(new_migrations) } - pub async fn delete_stale_projects(&self) -> Result<()> { + pub async fn create_server(&self, environment: &str) -> Result { + self.transaction(|tx| async move { + let server = server::ActiveModel { + environment: ActiveValue::set(environment.into()), + ..Default::default() + } + .insert(&*tx) + .await?; + Ok(server.id) + }) + .await + } + + pub async fn delete_stale_projects( + &self, + environment: &str, + new_server_id: ServerId, + ) -> Result<()> { self.transaction(|tx| async move { + let stale_server_epochs = self + .stale_server_ids(environment, new_server_id, &tx) + .await?; project_collaborator::Entity::delete_many() - .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch())) + .filter( + project_collaborator::Column::ConnectionServerId + .is_in(stale_server_epochs.iter().copied()), + ) .exec(&*tx) .await?; project::Entity::delete_many() - .filter(project::Column::HostConnectionEpoch.ne(self.epoch())) + .filter( + project::Column::HostConnectionServerId + .is_in(stale_server_epochs.iter().copied()), + ) .exec(&*tx) .await?; Ok(()) @@ -131,18 +151,28 @@ impl Database { .await } - pub async fn stale_room_ids(&self) -> Result> { + pub async fn stale_room_ids( + &self, + environment: &str, + new_server_id: ServerId, + ) -> Result> { self.transaction(|tx| async move { #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] enum QueryAs { RoomId, } + let stale_server_epochs = self + .stale_server_ids(environment, new_server_id, &tx) + .await?; Ok(room_participant::Entity::find() .select_only() .column(room_participant::Column::RoomId) .distinct() - .filter(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch())) + .filter( + room_participant::Column::AnsweringConnectionServerId + .is_in(stale_server_epochs), + ) .into_values::<_, QueryAs>() .all(&*tx) .await?) @@ -150,12 +180,16 @@ impl Database { .await } - pub async fn refresh_room(&self, room_id: RoomId) -> Result> { + pub async fn refresh_room( + &self, + room_id: RoomId, + new_server_id: ServerId, + ) -> Result> { self.room_transaction(|tx| async move { let stale_participant_filter = Condition::all() .add(room_participant::Column::RoomId.eq(room_id)) .add(room_participant::Column::AnsweringConnectionId.is_not_null()) - .add(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch())); + .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id)); let stale_participant_user_ids = room_participant::Entity::find() .filter(stale_participant_filter.clone()) @@ -199,6 +233,42 @@ impl Database { .await } + pub async fn delete_stale_servers( + &self, + new_server_id: ServerId, + environment: &str, + ) -> Result<()> { + self.transaction(|tx| async move { + server::Entity::delete_many() + .filter( + Condition::all() + .add(server::Column::Environment.eq(environment)) + .add(server::Column::Id.ne(new_server_id)), + ) + .exec(&*tx) + .await?; + Ok(()) + }) + .await + } + + async fn stale_server_ids( + &self, + environment: &str, + new_server_id: ServerId, + tx: &DatabaseTransaction, + ) -> Result> { + let stale_servers = server::Entity::find() + .filter( + Condition::all() + .add(server::Column::Environment.eq(environment)) + .add(server::Column::Id.ne(new_server_id)), + ) + .all(&*tx) + .await?; + Ok(stale_servers.into_iter().map(|server| server.id).collect()) + } + // users pub async fn create_user( @@ -1076,7 +1146,7 @@ impl Database { pub async fn create_room( &self, user_id: UserId, - connection_id: ConnectionId, + connection: ConnectionId, live_kit_room: &str, ) -> Result> { self.room_transaction(|tx| async move { @@ -1091,12 +1161,16 @@ impl Database { room_participant::ActiveModel { room_id: ActiveValue::set(room_id), user_id: ActiveValue::set(user_id), - answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), - answering_connection_epoch: ActiveValue::set(Some(self.epoch())), + answering_connection_id: ActiveValue::set(Some(connection.id as i32)), + answering_connection_server_id: ActiveValue::set(Some(ServerId( + connection.owner_id as i32, + ))), answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(user_id), - calling_connection_id: ActiveValue::set(connection_id.0 as i32), - calling_connection_epoch: ActiveValue::set(self.epoch()), + calling_connection_id: ActiveValue::set(connection.id as i32), + calling_connection_server_id: ActiveValue::set(Some(ServerId( + connection.owner_id as i32, + ))), ..Default::default() } .insert(&*tx) @@ -1112,7 +1186,7 @@ impl Database { &self, room_id: RoomId, calling_user_id: UserId, - calling_connection_id: ConnectionId, + calling_connection: ConnectionId, called_user_id: UserId, initial_project_id: Option, ) -> Result> { @@ -1122,8 +1196,10 @@ impl Database { user_id: ActiveValue::set(called_user_id), answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(calling_user_id), - calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32), - calling_connection_epoch: ActiveValue::set(self.epoch()), + calling_connection_id: ActiveValue::set(calling_connection.id as i32), + calling_connection_server_id: ActiveValue::set(Some(ServerId( + calling_connection.owner_id as i32, + ))), initial_project_id: ActiveValue::set(initial_project_id), ..Default::default() } @@ -1162,57 +1238,64 @@ impl Database { &self, expected_room_id: Option, user_id: UserId, - ) -> Result> { - self.room_transaction(|tx| async move { + ) -> Result>> { + self.optional_room_transaction(|tx| async move { + let mut filter = Condition::all() + .add(room_participant::Column::UserId.eq(user_id)) + .add(room_participant::Column::AnsweringConnectionId.is_null()); + if let Some(room_id) = expected_room_id { + filter = filter.add(room_participant::Column::RoomId.eq(room_id)); + } let participant = room_participant::Entity::find() - .filter( - room_participant::Column::UserId - .eq(user_id) - .and(room_participant::Column::AnsweringConnectionId.is_null()), - ) + .filter(filter) .one(&*tx) - .await? - .ok_or_else(|| anyhow!("could not decline call"))?; - let room_id = participant.room_id; + .await?; - if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { - return Err(anyhow!("declining call on unexpected room"))?; - } + let participant = if let Some(participant) = participant { + participant + } else if expected_room_id.is_some() { + return Err(anyhow!("could not find call to decline"))?; + } else { + return Ok(None); + }; + let room_id = participant.room_id; room_participant::Entity::delete(participant.into_active_model()) .exec(&*tx) .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(Some((room_id, room))) }) .await } pub async fn cancel_call( &self, - expected_room_id: Option, - calling_connection_id: ConnectionId, + room_id: RoomId, + calling_connection: ConnectionId, called_user_id: UserId, ) -> Result> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() .filter( - room_participant::Column::UserId - .eq(called_user_id) - .and( + Condition::all() + .add(room_participant::Column::UserId.eq(called_user_id)) + .add(room_participant::Column::RoomId.eq(room_id)) + .add( room_participant::Column::CallingConnectionId - .eq(calling_connection_id.0 as i32), + .eq(calling_connection.id as i32), ) - .and(room_participant::Column::AnsweringConnectionId.is_null()), + .add( + room_participant::Column::CallingConnectionServerId + .eq(calling_connection.owner_id as i32), + ) + .add(room_participant::Column::AnsweringConnectionId.is_null()), ) .one(&*tx) .await? - .ok_or_else(|| anyhow!("could not cancel call"))?; + .ok_or_else(|| anyhow!("no call to cancel"))?; let room_id = participant.room_id; - if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { - return Err(anyhow!("canceling call on unexpected room"))?; - } room_participant::Entity::delete(participant.into_active_model()) .exec(&*tx) @@ -1228,7 +1311,7 @@ impl Database { &self, room_id: RoomId, user_id: UserId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result> { self.room_transaction(|tx| async move { let result = room_participant::Entity::update_many() @@ -1241,14 +1324,16 @@ impl Database { .add(room_participant::Column::AnsweringConnectionId.is_null()) .add(room_participant::Column::AnsweringConnectionLost.eq(true)) .add( - room_participant::Column::AnsweringConnectionEpoch - .ne(self.epoch()), + room_participant::Column::AnsweringConnectionServerId + .ne(connection.owner_id as i32), ), ), ) .set(room_participant::ActiveModel { - answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), - answering_connection_epoch: ActiveValue::set(Some(self.epoch())), + answering_connection_id: ActiveValue::set(Some(connection.id as i32)), + answering_connection_server_id: ActiveValue::set(Some(ServerId( + connection.owner_id as i32, + ))), answering_connection_lost: ActiveValue::set(false), ..Default::default() }) @@ -1264,10 +1349,23 @@ impl Database { .await } - pub async fn leave_room(&self, connection_id: ConnectionId) -> Result> { - self.room_transaction(|tx| async move { + pub async fn leave_room( + &self, + connection: ConnectionId, + ) -> Result>> { + self.optional_room_transaction(|tx| async move { let leaving_participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionServerId + .eq(connection.owner_id as i32), + ), + ) .one(&*tx) .await?; @@ -1281,9 +1379,16 @@ impl Database { // Cancel pending calls initiated by the leaving user. let called_participants = room_participant::Entity::find() .filter( - room_participant::Column::CallingConnectionId - .eq(connection_id.0) - .and(room_participant::Column::AnsweringConnectionId.is_null()), + Condition::all() + .add( + room_participant::Column::CallingConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::CallingConnectionServerId + .eq(connection.owner_id as i32), + ) + .add(room_participant::Column::AnsweringConnectionId.is_null()), ) .all(&*tx) .await?; @@ -1310,7 +1415,16 @@ impl Database { project_collaborator::Column::ProjectId, QueryProjectIds::ProjectId, ) - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + project_collaborator::Column::ConnectionId.eq(connection.id as i32), + ) + .add( + project_collaborator::Column::ConnectionServerId + .eq(connection.owner_id as i32), + ), + ) .into_values::<_, QueryProjectIds>() .all(&*tx) .await?; @@ -1331,32 +1445,46 @@ impl Database { host_connection_id: Default::default(), }); - let collaborator_connection_id = - ConnectionId(collaborator.connection_id as u32); - if collaborator_connection_id != connection_id { + let collaborator_connection_id = ConnectionId { + owner_id: collaborator.connection_server_id.0 as u32, + id: collaborator.connection_id as u32, + }; + if collaborator_connection_id != connection { left_project.connection_ids.push(collaborator_connection_id); } if collaborator.is_host { left_project.host_user_id = collaborator.user_id; - left_project.host_connection_id = - ConnectionId(collaborator.connection_id as u32); + left_project.host_connection_id = collaborator_connection_id; } } drop(collaborators); // Leave projects. project_collaborator::Entity::delete_many() - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + project_collaborator::Column::ConnectionId.eq(connection.id as i32), + ) + .add( + project_collaborator::Column::ConnectionServerId + .eq(connection.owner_id as i32), + ), + ) .exec(&*tx) .await?; // Unshare projects. project::Entity::delete_many() .filter( - project::Column::RoomId - .eq(room_id) - .and(project::Column::HostConnectionId.eq(connection_id.0 as i32)), + Condition::all() + .add(project::Column::RoomId.eq(room_id)) + .add(project::Column::HostConnectionId.eq(connection.id as i32)) + .add( + project::Column::HostConnectionServerId + .eq(connection.owner_id as i32), + ), ) .exec(&*tx) .await?; @@ -1376,9 +1504,9 @@ impl Database { self.rooms.remove(&room_id); } - Ok((room_id, left_room)) + Ok(Some((room_id, left_room))) } else { - Err(anyhow!("could not leave room"))? + Ok(None) } }) .await @@ -1387,7 +1515,7 @@ impl Database { pub async fn update_room_participant_location( &self, room_id: RoomId, - connection_id: ConnectionId, + connection: ConnectionId, location: proto::ParticipantLocation, ) -> Result> { self.room_transaction(|tx| async { @@ -1414,9 +1542,18 @@ impl Database { } let result = room_participant::Entity::update_many() - .filter(room_participant::Column::RoomId.eq(room_id).and( - room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32), - )) + .filter( + Condition::all() + .add(room_participant::Column::RoomId.eq(room_id)) + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionServerId + .eq(connection.owner_id as i32), + ), + ) .set(room_participant::ActiveModel { location_kind: ActiveValue::set(Some(location_kind)), location_project_id: ActiveValue::set(location_project_id), @@ -1437,11 +1574,21 @@ impl Database { pub async fn connection_lost( &self, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionServerId + .eq(connection.owner_id as i32), + ), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("not a participant in any room"))?; @@ -1456,11 +1603,25 @@ impl Database { let collaborator_on_projects = project_collaborator::Entity::find() .find_also_related(project::Entity) - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) + .add( + project_collaborator::Column::ConnectionServerId + .eq(connection.owner_id as i32), + ), + ) .all(&*tx) .await?; project_collaborator::Entity::delete_many() - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) + .add( + project_collaborator::Column::ConnectionServerId + .eq(connection.owner_id as i32), + ), + ) .exec(&*tx) .await?; @@ -1473,20 +1634,29 @@ impl Database { .await?; let connection_ids = collaborators .into_iter() - .map(|collaborator| ConnectionId(collaborator.connection_id as u32)) + .map(|collaborator| ConnectionId { + id: collaborator.connection_id as u32, + owner_id: collaborator.connection_server_id.0 as u32, + }) .collect(); left_projects.push(LeftProject { id: project.id, host_user_id: project.host_user_id, - host_connection_id: ConnectionId(project.host_connection_id as u32), + host_connection_id: project.host_connection()?, connection_ids, }); } } project::Entity::delete_many() - .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project::Column::HostConnectionId.eq(connection.id as i32)) + .add( + project::Column::HostConnectionServerId.eq(connection.owner_id as i32), + ), + ) .exec(&*tx) .await?; @@ -1537,7 +1707,10 @@ impl Database { let mut pending_participants = Vec::new(); while let Some(db_participant) = db_participants.next().await { let db_participant = db_participant?; - if let Some(answering_connection_id) = db_participant.answering_connection_id { + if let Some((answering_connection_id, answering_connection_server_id)) = db_participant + .answering_connection_id + .zip(db_participant.answering_connection_server_id) + { let location = match ( db_participant.location_kind, db_participant.location_project_id, @@ -1556,11 +1729,16 @@ impl Database { Default::default(), )), }; + + let answering_connection = ConnectionId { + owner_id: answering_connection_server_id.0 as u32, + id: answering_connection_id as u32, + }; participants.insert( - answering_connection_id, + answering_connection, proto::Participant { user_id: db_participant.user_id.to_proto(), - peer_id: answering_connection_id as u32, + peer_id: Some(answering_connection.into()), projects: Default::default(), location: Some(proto::ParticipantLocation { variant: location }), }, @@ -1583,7 +1761,8 @@ impl Database { while let Some(row) = db_projects.next().await { let (db_project, db_worktree) = row?; - if let Some(participant) = participants.get_mut(&db_project.host_connection_id) { + let host_connection = db_project.host_connection()?; + if let Some(participant) = participants.get_mut(&host_connection) { let project = if let Some(project) = participant .projects .iter_mut() @@ -1637,12 +1816,22 @@ impl Database { pub async fn share_project( &self, room_id: RoomId, - connection_id: ConnectionId, + connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], ) -> Result> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionServerId + .eq(connection.owner_id as i32), + ), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("could not find participant"))?; @@ -1653,8 +1842,10 @@ impl Database { let project = project::ActiveModel { room_id: ActiveValue::set(participant.room_id), host_user_id: ActiveValue::set(participant.user_id), - host_connection_id: ActiveValue::set(connection_id.0 as i32), - host_connection_epoch: ActiveValue::set(self.epoch()), + host_connection_id: ActiveValue::set(Some(connection.id as i32)), + host_connection_server_id: ActiveValue::set(Some(ServerId( + connection.owner_id as i32, + ))), ..Default::default() } .insert(&*tx) @@ -1678,8 +1869,8 @@ impl Database { project_collaborator::ActiveModel { project_id: ActiveValue::set(project.id), - connection_id: ActiveValue::set(connection_id.0 as i32), - connection_epoch: ActiveValue::set(self.epoch()), + connection_id: ActiveValue::set(connection.id as i32), + connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(ReplicaId(0)), is_host: ActiveValue::set(true), @@ -1697,7 +1888,7 @@ impl Database { pub async fn unshare_project( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result)>> { self.room_transaction(|tx| async move { let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; @@ -1706,7 +1897,7 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("project not found"))?; - if project.host_connection_id == connection_id.0 as i32 { + if project.host_connection()? == connection { let room_id = project.room_id; project::Entity::delete(project.into_active_model()) .exec(&*tx) @@ -1723,12 +1914,18 @@ impl Database { pub async fn update_project( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], ) -> Result)>> { self.room_transaction(|tx| async move { let project = project::Entity::find_by_id(project_id) - .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project::Column::HostConnectionId.eq(connection.id as i32)) + .add( + project::Column::HostConnectionServerId.eq(connection.owner_id as i32), + ), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; @@ -1774,7 +1971,7 @@ impl Database { pub async fn update_worktree( &self, update: &proto::UpdateWorktree, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let project_id = ProjectId::from_proto(update.project_id); @@ -1782,7 +1979,13 @@ impl Database { // Ensure the update comes from the host. let project = project::Entity::find_by_id(project_id) - .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project::Column::HostConnectionId.eq(connection.id as i32)) + .add( + project::Column::HostConnectionServerId.eq(connection.owner_id as i32), + ), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; @@ -1862,7 +2065,7 @@ impl Database { pub async fn update_diagnostic_summary( &self, update: &proto::UpdateDiagnosticSummary, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let project_id = ProjectId::from_proto(update.project_id); @@ -1877,7 +2080,7 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; - if project.host_connection_id != connection_id.0 as i32 { + if project.host_connection()? != connection { return Err(anyhow!("can't update a project hosted by someone else"))?; } @@ -1916,7 +2119,7 @@ impl Database { pub async fn start_language_server( &self, update: &proto::StartLanguageServer, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let project_id = ProjectId::from_proto(update.project_id); @@ -1930,7 +2133,7 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; - if project.host_connection_id != connection_id.0 as i32 { + if project.host_connection()? != connection { return Err(anyhow!("can't update a project hosted by someone else"))?; } @@ -1961,11 +2164,21 @@ impl Database { pub async fn join_project( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionServerId + .eq(connection.owner_id as i32), + ), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("must join a room first"))?; @@ -1992,8 +2205,8 @@ impl Database { } let new_collaborator = project_collaborator::ActiveModel { project_id: ActiveValue::set(project_id), - connection_id: ActiveValue::set(connection_id.0 as i32), - connection_epoch: ActiveValue::set(self.epoch()), + connection_id: ActiveValue::set(connection.id as i32), + connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(replica_id), is_host: ActiveValue::set(false), @@ -2095,14 +2308,18 @@ impl Database { pub async fn leave_project( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result> { self.room_transaction(|tx| async move { let result = project_collaborator::Entity::delete_many() .filter( - project_collaborator::Column::ProjectId - .eq(project_id) - .and(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)), + Condition::all() + .add(project_collaborator::Column::ProjectId.eq(project_id)) + .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) + .add( + project_collaborator::Column::ConnectionServerId + .eq(connection.owner_id as i32), + ), ) .exec(&*tx) .await?; @@ -2120,13 +2337,16 @@ impl Database { .await?; let connection_ids = collaborators .into_iter() - .map(|collaborator| ConnectionId(collaborator.connection_id as u32)) + .map(|collaborator| ConnectionId { + owner_id: collaborator.connection_server_id.0 as u32, + id: collaborator.connection_id as u32, + }) .collect(); let left_project = LeftProject { id: project_id, host_user_id: project.host_user_id, - host_connection_id: ConnectionId(project.host_connection_id as u32), + host_connection_id: project.host_connection()?, connection_ids, }; Ok((project.room_id, left_project)) @@ -2137,7 +2357,7 @@ impl Database { pub async fn project_collaborators( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let project = project::Entity::find_by_id(project_id) @@ -2149,10 +2369,13 @@ impl Database { .all(&*tx) .await?; - if collaborators - .iter() - .any(|collaborator| collaborator.connection_id == connection_id.0 as i32) - { + if collaborators.iter().any(|collaborator| { + let collaborator_connection = ConnectionId { + owner_id: collaborator.connection_server_id.0 as u32, + id: collaborator.connection_id as u32, + }; + collaborator_connection == connection + }) { Ok((project.room_id, collaborators)) } else { Err(anyhow!("no such project"))? @@ -2167,29 +2390,22 @@ impl Database { connection_id: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { - #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] - enum QueryAs { - ConnectionId, - } - let project = project::Entity::find_by_id(project_id) .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; - let mut db_connection_ids = project_collaborator::Entity::find() - .select_only() - .column_as( - project_collaborator::Column::ConnectionId, - QueryAs::ConnectionId, - ) + let mut participants = project_collaborator::Entity::find() .filter(project_collaborator::Column::ProjectId.eq(project_id)) - .into_values::() .stream(&*tx) .await?; let mut connection_ids = HashSet::default(); - while let Some(connection_id) = db_connection_ids.next().await { - connection_ids.insert(ConnectionId(connection_id? as u32)); + while let Some(participant) = participants.next().await { + let participant = participant?; + connection_ids.insert(ConnectionId { + owner_id: participant.connection_server_id.0 as u32, + id: participant.connection_id as u32, + }); } if connection_ids.contains(&connection_id) { @@ -2206,29 +2422,22 @@ impl Database { project_id: ProjectId, tx: &DatabaseTransaction, ) -> Result> { - #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] - enum QueryAs { - ConnectionId, - } - - let mut db_guest_connection_ids = project_collaborator::Entity::find() - .select_only() - .column_as( - project_collaborator::Column::ConnectionId, - QueryAs::ConnectionId, - ) + let mut participants = project_collaborator::Entity::find() .filter( project_collaborator::Column::ProjectId .eq(project_id) .and(project_collaborator::Column::IsHost.eq(false)), ) - .into_values::() .stream(tx) .await?; let mut guest_connection_ids = Vec::new(); - while let Some(connection_id) = db_guest_connection_ids.next().await { - guest_connection_ids.push(ConnectionId(connection_id? as u32)); + while let Some(participant) = participants.next().await { + let participant = participant?; + guest_connection_ids.push(ConnectionId { + owner_id: participant.connection_server_id.0 as u32, + id: participant.connection_id as u32, + }); } Ok(guest_connection_ids) } @@ -2327,25 +2536,25 @@ impl Database { self.run(body).await } - async fn room_transaction(&self, f: F) -> Result> + async fn optional_room_transaction(&self, f: F) -> Result>> where F: Send + Fn(TransactionHandle) -> Fut, - Fut: Send + Future>, + Fut: Send + Future>>, { let body = async { loop { let (tx, result) = self.with_transaction(&f).await?; match result { - Ok((room_id, data)) => { + Ok(Some((room_id, data))) => { let lock = self.rooms.entry(room_id).or_default().clone(); let _guard = lock.lock_owned().await; match tx.commit().await.map_err(Into::into) { Ok(()) => { - return Ok(RoomGuard { + return Ok(Some(RoomGuard { data, _guard, _not_send: PhantomData, - }); + })); } Err(error) => { if is_serialization_error(&error) { @@ -2356,6 +2565,18 @@ impl Database { } } } + Ok(None) => { + match tx.commit().await.map_err(Into::into) { + Ok(()) => return Ok(None), + Err(error) => { + if is_serialization_error(&error) { + // Retry (don't break the loop) + } else { + return Err(error); + } + } + } + } Err(error) => { tx.rollback().await?; if is_serialization_error(&error) { @@ -2371,6 +2592,23 @@ impl Database { self.run(body).await } + async fn room_transaction(&self, f: F) -> Result> + where + F: Send + Fn(TransactionHandle) -> Fut, + Fut: Send + Future>, + { + let data = self + .optional_room_transaction(move |tx| { + let future = f(tx); + async { + let data = future.await?; + Ok(Some(data)) + } + }) + .await?; + Ok(data.unwrap()) + } + async fn with_transaction(&self, f: &F) -> Result<(DatabaseTransaction, Result)> where F: Send + Fn(TransactionHandle) -> Fut, @@ -2607,6 +2845,7 @@ id_type!(RoomParticipantId); id_type!(ProjectId); id_type!(ProjectCollaboratorId); id_type!(ReplicaId); +id_type!(ServerId); id_type!(SignupId); id_type!(UserId); diff --git a/crates/collab/src/db/project.rs b/crates/collab/src/db/project.rs index 971a8fcefb465114c9703003e2a74f6f38d8c397..5b1f9f8467853344e1c57004e875ba434117195b 100644 --- a/crates/collab/src/db/project.rs +++ b/crates/collab/src/db/project.rs @@ -1,4 +1,6 @@ -use super::{ProjectId, RoomId, UserId}; +use super::{ProjectId, Result, RoomId, ServerId, UserId}; +use anyhow::anyhow; +use rpc::ConnectionId; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] @@ -8,8 +10,23 @@ pub struct Model { pub id: ProjectId, pub room_id: RoomId, pub host_user_id: UserId, - pub host_connection_id: i32, - pub host_connection_epoch: Uuid, + pub host_connection_id: Option, + pub host_connection_server_id: Option, +} + +impl Model { + pub fn host_connection(&self) -> Result { + let host_connection_server_id = self + .host_connection_server_id + .ok_or_else(|| anyhow!("empty host_connection_server_id"))?; + let host_connection_id = self + .host_connection_id + .ok_or_else(|| anyhow!("empty host_connection_id"))?; + Ok(ConnectionId { + owner_id: host_connection_server_id.0 as u32, + id: host_connection_id as u32, + }) + } } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/collab/src/db/project_collaborator.rs b/crates/collab/src/db/project_collaborator.rs index 5db307f5df27ec07f282207eb253ddae95c44970..a1a99d1170ae5cfb254fb1c85ce50034cf03d13b 100644 --- a/crates/collab/src/db/project_collaborator.rs +++ b/crates/collab/src/db/project_collaborator.rs @@ -1,4 +1,4 @@ -use super::{ProjectCollaboratorId, ProjectId, ReplicaId, UserId}; +use super::{ProjectCollaboratorId, ProjectId, ReplicaId, ServerId, UserId}; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] @@ -8,7 +8,7 @@ pub struct Model { pub id: ProjectCollaboratorId, pub project_id: ProjectId, pub connection_id: i32, - pub connection_epoch: Uuid, + pub connection_server_id: ServerId, pub user_id: UserId, pub replica_id: ReplicaId, pub is_host: bool, diff --git a/crates/collab/src/db/room_participant.rs b/crates/collab/src/db/room_participant.rs index c80c10c1bae25cc1b79a499a4ed5b310f2209527..f939a3bfb8709132b1aa138ea43e21a25b0f61af 100644 --- a/crates/collab/src/db/room_participant.rs +++ b/crates/collab/src/db/room_participant.rs @@ -1,4 +1,4 @@ -use super::{ProjectId, RoomId, RoomParticipantId, UserId}; +use super::{ProjectId, RoomId, RoomParticipantId, ServerId, UserId}; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] @@ -9,14 +9,14 @@ pub struct Model { pub room_id: RoomId, pub user_id: UserId, pub answering_connection_id: Option, - pub answering_connection_epoch: Option, + pub answering_connection_server_id: Option, pub answering_connection_lost: bool, pub location_kind: Option, pub location_project_id: Option, pub initial_project_id: Option, pub calling_user_id: UserId, pub calling_connection_id: i32, - pub calling_connection_epoch: Uuid, + pub calling_connection_server_id: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/collab/src/db/server.rs b/crates/collab/src/db/server.rs new file mode 100644 index 0000000000000000000000000000000000000000..e3905f244892e8befd97f52afc119341106cb566 --- /dev/null +++ b/crates/collab/src/db/server.rs @@ -0,0 +1,15 @@ +use super::ServerId; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "servers")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: ServerId, + pub environment: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/db/tests.rs b/crates/collab/src/db/tests.rs index 2d254c2e3702d48b4451301977a988fd1686d91f..9d42c11f8bba88bd164eaab443857d910b97230d 100644 --- a/crates/collab/src/db/tests.rs +++ b/crates/collab/src/db/tests.rs @@ -410,6 +410,8 @@ test_both_dbs!( test_project_count_sqlite, db, { + let owner_id = db.create_server("test").await.unwrap().0 as u32; + let user1 = db .create_user( &format!("admin@example.com"), @@ -436,36 +438,44 @@ test_both_dbs!( .unwrap(); let room_id = RoomId::from_proto( - db.create_room(user1.user_id, ConnectionId(0), "") + db.create_room(user1.user_id, ConnectionId { owner_id, id: 0 }, "") .await .unwrap() .id, ); - db.call(room_id, user1.user_id, ConnectionId(0), user2.user_id, None) - .await - .unwrap(); - db.join_room(room_id, user2.user_id, ConnectionId(1)) + db.call( + room_id, + user1.user_id, + ConnectionId { owner_id, id: 0 }, + user2.user_id, + None, + ) + .await + .unwrap(); + db.join_room(room_id, user2.user_id, ConnectionId { owner_id, id: 1 }) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0); - db.share_project(room_id, ConnectionId(1), &[]) + db.share_project(room_id, ConnectionId { owner_id, id: 1 }, &[]) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 1); - db.share_project(room_id, ConnectionId(1), &[]) + db.share_project(room_id, ConnectionId { owner_id, id: 1 }, &[]) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2); // Projects shared by admins aren't counted. - db.share_project(room_id, ConnectionId(0), &[]) + db.share_project(room_id, ConnectionId { owner_id, id: 0 }, &[]) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2); - db.leave_room(ConnectionId(1)).await.unwrap(); + db.leave_room(ConnectionId { owner_id, id: 1 }) + .await + .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0); } ); diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 61e914ad87145a6c22bc3beb7694ed92a476795b..2629d340dac0e745e1027d3cd17913023fcb45d3 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -7,8 +7,8 @@ use crate::{ use anyhow::anyhow; use call::{room, ActiveCall, ParticipantLocation, Room}; use client::{ - self, test::FakeHttpClient, Client, Connection, Credentials, EstablishConnectionError, PeerId, - User, UserStore, RECEIVE_TIMEOUT, + self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials, + EstablishConnectionError, User, UserStore, RECEIVE_TIMEOUT, }; use collections::{BTreeMap, HashMap, HashSet}; use editor::{ @@ -608,7 +608,7 @@ async fn test_server_restarts( ); // The server is torn down. - server.teardown(); + server.reset().await; // Users A and B reconnect to the call. User C has troubles reconnecting, so it leaves the room. client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); @@ -778,7 +778,7 @@ async fn test_server_restarts( ); // The server is torn down. - server.teardown(); + server.reset().await; // Users A and B have troubles reconnecting, so they leave the room. client_a.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); @@ -6125,7 +6125,7 @@ async fn test_random_collaboration( .user_connection_ids(removed_guest_id) .collect::>(); assert_eq!(user_connection_ids.len(), 1); - let removed_peer_id = PeerId(user_connection_ids[0].0); + let removed_peer_id = user_connection_ids[0].into(); let guest = clients.remove(guest_ix); op_start_signals.remove(guest_ix); server.forbid_connections(); @@ -6174,17 +6174,25 @@ async fn test_random_collaboration( .user_connection_ids(user_id) .collect::>(); assert_eq!(user_connection_ids.len(), 1); - let peer_id = PeerId(user_connection_ids[0].0); + let peer_id = user_connection_ids[0].into(); server.disconnect_client(peer_id); deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); operations += 1; } 30..=34 => { log::info!("Simulating server restart"); - server.teardown(); - deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + server.reset().await; + deterministic.advance_clock(RECEIVE_TIMEOUT); server.start().await.unwrap(); deterministic.advance_clock(CLEANUP_TIMEOUT); + let environment = &server.app_state.config.zed_environment; + let stale_room_ids = server + .app_state + .db + .stale_room_ids(environment, server.id()) + .await + .unwrap(); + assert_eq!(stale_room_ids, vec![]); } _ if !op_start_signals.is_empty() => { while operations < max_operations && rng.lock().gen_bool(0.7) { @@ -6379,7 +6387,13 @@ impl TestServer { ) .unwrap(); let app_state = Self::build_app_state(&test_db, &live_kit_server).await; + let epoch = app_state + .db + .create_server(&app_state.config.zed_environment) + .await + .unwrap(); let server = Server::new( + epoch, app_state.clone(), Executor::Deterministic(deterministic.build_background()), ); @@ -6396,9 +6410,15 @@ impl TestServer { } } - fn teardown(&self) { - self.server.teardown(); + async fn reset(&self) { self.app_state.db.reset(); + let epoch = self + .app_state + .db + .create_server(&self.app_state.config.zed_environment) + .await + .unwrap(); + self.server.reset(epoch); } async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient { @@ -6488,7 +6508,7 @@ impl TestServer { let connection_id = connection_id_rx.await.unwrap(); connection_killers .lock() - .insert(PeerId(connection_id.0), killed); + .insert(connection_id.into(), killed); Ok(client_conn) } }) @@ -7310,7 +7330,7 @@ impl TestClient { impl Drop for TestClient { fn drop(&mut self) { - self.client.tear_down(); + self.client.teardown(); } } diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index 7e0f23f5d4096cbf33aaf8ea6275381b96f9208a..27f49f5b1e1fb6a0ba7d6fba7f72efd88723d83d 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -97,6 +97,7 @@ pub struct Config { pub live_kit_secret: Option, pub rust_log: Option, pub log_json: Option, + pub zed_environment: String, } #[derive(Default, Deserialize)] diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index 8ad1313d1ea8bff15b54793aa40aa97405d20c8a..0f783c13e58d3b64c0e034c18ea6173d63be4036 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -7,6 +7,7 @@ use std::{ net::{SocketAddr, TcpListener}, path::Path, }; +use tokio::signal::unix::SignalKind; use tracing_log::LogTracer; use tracing_subscriber::{filter::EnvFilter, fmt::format::JsonFields, Layer}; use util::ResultExt; @@ -56,7 +57,11 @@ async fn main() -> Result<()> { let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port)) .expect("failed to bind TCP listener"); - let rpc_server = collab::rpc::Server::new(state.clone(), Executor::Production); + let epoch = state + .db + .create_server(&state.config.zed_environment) + .await?; + let rpc_server = collab::rpc::Server::new(epoch, state.clone(), Executor::Production); rpc_server.start().await?; let app = collab::api::routes(rpc_server.clone(), state.clone()) @@ -66,9 +71,15 @@ async fn main() -> Result<()> { axum::Server::from_tcp(listener)? .serve(app.into_make_service_with_connect_info::()) .with_graceful_shutdown(async move { - tokio::signal::ctrl_c() - .await + let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()) + .expect("failed to listen for interrupt signal"); + let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()) .expect("failed to listen for interrupt signal"); + let sigterm = sigterm.recv(); + let sigint = sigint.recv(); + futures::pin_mut!(sigterm, sigint); + futures::future::select(sigterm, sigint).await; + tracing::info!("Received interrupt signal"); rpc_server.teardown(); }) .await?; diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 69794ab35aec448c1bec30a5e8f0a37af72f10da..6a8ae61ed0b7bff3ad44741a2a21528dbb32a5b7 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,7 +2,7 @@ mod connection_pool; use crate::{ auth, - db::{self, Database, ProjectId, RoomId, User, UserId}, + db::{self, Database, ProjectId, RoomId, ServerId, User, UserId}, executor::Executor, AppState, Result, }; @@ -138,6 +138,7 @@ impl Deref for DbHandle { } pub struct Server { + id: parking_lot::Mutex, peer: Arc, pub(crate) connection_pool: Arc>, app_state: Arc, @@ -168,9 +169,10 @@ where } impl Server { - pub fn new(app_state: Arc, executor: Executor) -> Arc { + pub fn new(id: ServerId, app_state: Arc, executor: Executor) -> Arc { let mut server = Self { - peer: Peer::new(), + id: parking_lot::Mutex::new(id), + peer: Peer::new(id.0 as u32), app_state, executor, connection_pool: Default::default(), @@ -239,97 +241,146 @@ impl Server { } pub async fn start(&self) -> Result<()> { - self.app_state.db.delete_stale_projects().await?; - let db = self.app_state.db.clone(); + let server_id = *self.id.lock(); + let app_state = self.app_state.clone(); let peer = self.peer.clone(); let timeout = self.executor.sleep(CLEANUP_TIMEOUT); let pool = self.connection_pool.clone(); let live_kit_client = self.app_state.live_kit_client.clone(); - self.executor.spawn_detached(async move { - timeout.await; - if let Some(room_ids) = db.stale_room_ids().await.trace_err() { - for room_id in room_ids { - let mut contacts_to_update = HashSet::default(); - let mut canceled_calls_to_user_ids = Vec::new(); - let mut live_kit_room = String::new(); - let mut delete_live_kit_room = false; - - if let Ok(mut refreshed_room) = db.refresh_room(room_id).await { - room_updated(&refreshed_room.room, &peer); - contacts_to_update - .extend(refreshed_room.stale_participant_user_ids.iter().copied()); - contacts_to_update - .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied()); - canceled_calls_to_user_ids = - mem::take(&mut refreshed_room.canceled_calls_to_user_ids); - live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room); - delete_live_kit_room = refreshed_room.room.participants.is_empty(); - } - { - let pool = pool.lock(); - for canceled_user_id in canceled_calls_to_user_ids { - for connection_id in pool.user_connection_ids(canceled_user_id) { - peer.send( - connection_id, - proto::CallCanceled { - room_id: room_id.to_proto(), - }, - ) - .trace_err(); - } + let span = info_span!("start server"); + let span_enter = span.enter(); + + tracing::info!("begin deleting stale projects"); + app_state + .db + .delete_stale_projects(&app_state.config.zed_environment, server_id) + .await?; + tracing::info!("finish deleting stale projects"); + + drop(span_enter); + self.executor.spawn_detached( + async move { + tracing::info!("waiting for cleanup timeout"); + timeout.await; + tracing::info!("cleanup timeout expired, retrieving stale rooms"); + if let Some(room_ids) = app_state + .db + .stale_room_ids(&app_state.config.zed_environment, server_id) + .await + .trace_err() + { + tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms"); + for room_id in room_ids { + let mut contacts_to_update = HashSet::default(); + let mut canceled_calls_to_user_ids = Vec::new(); + let mut live_kit_room = String::new(); + let mut delete_live_kit_room = false; + + if let Ok(mut refreshed_room) = + app_state.db.refresh_room(room_id, server_id).await + { + tracing::info!( + room_id = room_id.0, + new_participant_count = refreshed_room.room.participants.len(), + "refreshed room" + ); + room_updated(&refreshed_room.room, &peer); + contacts_to_update + .extend(refreshed_room.stale_participant_user_ids.iter().copied()); + contacts_to_update + .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied()); + canceled_calls_to_user_ids = + mem::take(&mut refreshed_room.canceled_calls_to_user_ids); + live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room); + delete_live_kit_room = refreshed_room.room.participants.is_empty(); } - } - for user_id in contacts_to_update { - let busy = db.is_user_busy(user_id).await.trace_err(); - let contacts = db.get_contacts(user_id).await.trace_err(); - if let Some((busy, contacts)) = busy.zip(contacts) { + { let pool = pool.lock(); - let updated_contact = contact_for_user(user_id, false, busy, &pool); - for contact in contacts { - if let db::Contact::Accepted { - user_id: contact_user_id, - .. - } = contact - { - for contact_conn_id in pool.user_connection_ids(contact_user_id) + for canceled_user_id in canceled_calls_to_user_ids { + for connection_id in pool.user_connection_ids(canceled_user_id) { + peer.send( + connection_id, + proto::CallCanceled { + room_id: room_id.to_proto(), + }, + ) + .trace_err(); + } + } + } + + for user_id in contacts_to_update { + let busy = app_state.db.is_user_busy(user_id).await.trace_err(); + let contacts = app_state.db.get_contacts(user_id).await.trace_err(); + if let Some((busy, contacts)) = busy.zip(contacts) { + let pool = pool.lock(); + let updated_contact = contact_for_user(user_id, false, busy, &pool); + for contact in contacts { + if let db::Contact::Accepted { + user_id: contact_user_id, + .. + } = contact { - peer.send( - contact_conn_id, - proto::UpdateContacts { - contacts: vec![updated_contact.clone()], - remove_contacts: Default::default(), - incoming_requests: Default::default(), - remove_incoming_requests: Default::default(), - outgoing_requests: Default::default(), - remove_outgoing_requests: Default::default(), - }, - ) - .trace_err(); + for contact_conn_id in + pool.user_connection_ids(contact_user_id) + { + peer.send( + contact_conn_id, + proto::UpdateContacts { + contacts: vec![updated_contact.clone()], + remove_contacts: Default::default(), + incoming_requests: Default::default(), + remove_incoming_requests: Default::default(), + outgoing_requests: Default::default(), + remove_outgoing_requests: Default::default(), + }, + ) + .trace_err(); + } } } } } - } - if let Some(live_kit) = live_kit_client.as_ref() { - if delete_live_kit_room { - live_kit.delete_room(live_kit_room).await.trace_err(); + if let Some(live_kit) = live_kit_client.as_ref() { + if delete_live_kit_room { + live_kit.delete_room(live_kit_room).await.trace_err(); + } } } } + + app_state + .db + .delete_stale_servers(server_id, &app_state.config.zed_environment) + .await + .trace_err(); } - }); + .instrument(span), + ); Ok(()) } pub fn teardown(&self) { - self.peer.reset(); + self.peer.teardown(); self.connection_pool.lock().reset(); let _ = self.teardown.send(()); } + #[cfg(test)] + pub fn reset(&self, id: ServerId) { + self.teardown(); + *self.id.lock() = id; + self.peer.reset(id.0 as u32); + } + + #[cfg(test)] + pub fn id(&self) -> ServerId { + *self.id.lock() + } + fn add_handler(&mut self, handler: F) -> &mut Self where F: 'static + Send + Sync + Fn(TypedEnvelope, Session) -> Fut, @@ -438,7 +489,7 @@ impl Server { }); tracing::info!(%user_id, %login, %connection_id, %address, "connection opened"); - this.peer.send(connection_id, proto::Hello { peer_id: connection_id.0 })?; + this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?; tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message"); if let Some(send_connection_id) = send_connection_id.take() { @@ -769,7 +820,7 @@ async fn sign_out( .is_user_online(session.user_id) { let db = session.db().await; - if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() { + if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() { room_updated(&room, &session.peer); } } @@ -973,7 +1024,7 @@ async fn cancel_call( let room = session .db() .await - .cancel_call(Some(room_id), session.connection_id, called_user_id) + .cancel_call(room_id, session.connection_id, called_user_id) .await?; room_updated(&room, &session.peer); } @@ -1006,7 +1057,8 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<( .db() .await .decline_call(Some(room_id), session.user_id) - .await?; + .await? + .ok_or_else(|| anyhow!("failed to decline call"))?; room_updated(&room, &session.peer); } @@ -1108,12 +1160,18 @@ async fn join_project( let collaborators = project .collaborators .iter() - .filter(|collaborator| collaborator.connection_id != session.connection_id.0 as i32) - .map(|collaborator| proto::Collaborator { - peer_id: collaborator.connection_id as u32, - replica_id: collaborator.replica_id.0 as u32, - user_id: collaborator.user_id.to_proto(), + .map(|collaborator| { + let peer_id = proto::PeerId { + owner_id: collaborator.connection_server_id.0 as u32, + id: collaborator.connection_id as u32, + }; + proto::Collaborator { + peer_id: Some(peer_id), + replica_id: collaborator.replica_id.0 as u32, + user_id: collaborator.user_id.to_proto(), + } }) + .filter(|collaborator| collaborator.peer_id != Some(session.connection_id.into())) .collect::>(); let worktrees = project .worktrees @@ -1130,11 +1188,11 @@ async fn join_project( session .peer .send( - ConnectionId(collaborator.peer_id), + collaborator.peer_id.unwrap().into(), proto::AddProjectCollaborator { project_id: project_id.to_proto(), collaborator: Some(proto::Collaborator { - peer_id: session.connection_id.0, + peer_id: Some(session.connection_id.into()), replica_id: replica_id.0 as u32, user_id: guest_user_id.to_proto(), }), @@ -1355,13 +1413,14 @@ where .await .project_collaborators(project_id, session.connection_id) .await?; - ConnectionId( - collaborators - .iter() - .find(|collaborator| collaborator.is_host) - .ok_or_else(|| anyhow!("host not found"))? - .connection_id as u32, - ) + let host = collaborators + .iter() + .find(|collaborator| collaborator.is_host) + .ok_or_else(|| anyhow!("host not found"))?; + ConnectionId { + owner_id: host.connection_server_id.0 as u32, + id: host.connection_id as u32, + } }; let payload = session @@ -1389,7 +1448,10 @@ async fn save_buffer( .iter() .find(|collaborator| collaborator.is_host) .ok_or_else(|| anyhow!("host not found"))?; - ConnectionId(host.connection_id as u32) + ConnectionId { + owner_id: host.connection_server_id.0 as u32, + id: host.connection_id as u32, + } }; let response_payload = session .peer @@ -1401,11 +1463,17 @@ async fn save_buffer( .await .project_collaborators(project_id, session.connection_id) .await?; - collaborators - .retain(|collaborator| collaborator.connection_id != session.connection_id.0 as i32); - let project_connection_ids = collaborators - .iter() - .map(|collaborator| ConnectionId(collaborator.connection_id as u32)); + collaborators.retain(|collaborator| { + let collaborator_connection = ConnectionId { + owner_id: collaborator.connection_server_id.0 as u32, + id: collaborator.connection_id as u32, + }; + collaborator_connection != session.connection_id + }); + let project_connection_ids = collaborators.iter().map(|collaborator| ConnectionId { + owner_id: collaborator.connection_server_id.0 as u32, + id: collaborator.connection_id as u32, + }); broadcast(host_connection_id, project_connection_ids, |conn_id| { session .peer @@ -1419,11 +1487,10 @@ async fn create_buffer_for_peer( request: proto::CreateBufferForPeer, session: Session, ) -> Result<()> { - session.peer.forward_send( - session.connection_id, - ConnectionId(request.peer_id), - request, - )?; + let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?; + session + .peer + .forward_send(session.connection_id, peer_id.into(), request)?; Ok(()) } @@ -1516,7 +1583,10 @@ async fn follow( session: Session, ) -> Result<()> { let project_id = ProjectId::from_proto(request.project_id); - let leader_id = ConnectionId(request.leader_id); + let leader_id = request + .leader_id + .ok_or_else(|| anyhow!("invalid leader id"))? + .into(); let follower_id = session.connection_id; { let project_connection_ids = session @@ -1536,14 +1606,17 @@ async fn follow( .await?; response_payload .views - .retain(|view| view.leader_id != Some(follower_id.0)); + .retain(|view| view.leader_id != Some(follower_id.into())); response.send(response_payload)?; Ok(()) } async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> { let project_id = ProjectId::from_proto(request.project_id); - let leader_id = ConnectionId(request.leader_id); + let leader_id = request + .leader_id + .ok_or_else(|| anyhow!("invalid leader id"))? + .into(); let project_connection_ids = session .db() .await @@ -1572,12 +1645,16 @@ async fn update_followers(request: proto::UpdateFollowers, session: Session) -> proto::update_followers::Variant::UpdateView(payload) => payload.leader_id, proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id, }); - for follower_id in &request.follower_ids { - let follower_id = ConnectionId(*follower_id); - if project_connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id { - session - .peer - .forward_send(session.connection_id, follower_id, request.clone())?; + for follower_peer_id in request.follower_ids.iter().copied() { + let follower_connection_id = follower_peer_id.into(); + if project_connection_ids.contains(&follower_connection_id) + && Some(follower_peer_id) != leader_id + { + session.peer.forward_send( + session.connection_id, + follower_connection_id, + request.clone(), + )?; } } Ok(()) @@ -1892,13 +1969,19 @@ fn contact_for_user( fn room_updated(room: &proto::Room, peer: &Peer) { for participant in &room.participants { - peer.send( - ConnectionId(participant.peer_id), - proto::RoomUpdated { - room: Some(room.clone()), - }, - ) - .trace_err(); + if let Some(peer_id) = participant + .peer_id + .ok_or_else(|| anyhow!("invalid participant peer id")) + .trace_err() + { + peer.send( + peer_id.into(), + proto::RoomUpdated { + room: Some(room.clone()), + }, + ) + .trace_err(); + } } } @@ -1943,8 +2026,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { let canceled_calls_to_user_ids; let live_kit_room; let delete_live_kit_room; - { - let mut left_room = session.db().await.leave_room(session.connection_id).await?; + if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? { contacts_to_update.insert(session.user_id); for project in left_room.left_projects.values() { @@ -1956,6 +2038,8 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids); live_kit_room = mem::take(&mut left_room.room.live_kit_room); delete_live_kit_room = left_room.room.participants.is_empty(); + } else { + return Ok(()); } { @@ -2013,7 +2097,7 @@ fn project_left(project: &db::LeftProject, session: &Session) { *connection_id, proto::RemoveProjectCollaborator { project_id: project.id.to_proto(), - peer_id: session.connection_id.0, + peer_id: Some(session.connection_id.into()), }, ) .trace_err(); diff --git a/crates/collab_ui/src/collab_titlebar_item.rs b/crates/collab_ui/src/collab_titlebar_item.rs index ab414e051bcfeb2ec77ab6bac751853fe41b0ab8..2288f77cd358996f5e0ecba98d09d56a42a3dd76 100644 --- a/crates/collab_ui/src/collab_titlebar_item.rs +++ b/crates/collab_ui/src/collab_titlebar_item.rs @@ -1,6 +1,6 @@ use crate::{contact_notification::ContactNotification, contacts_popover}; use call::{ActiveCall, ParticipantLocation}; -use client::{Authenticate, ContactEventKind, PeerId, User, UserStore}; +use client::{proto::PeerId, Authenticate, ContactEventKind, User, UserStore}; use clock::ReplicaId; use contacts_popover::ContactsPopover; use gpui::{ @@ -474,7 +474,7 @@ impl CollabTitlebarItem { cx.dispatch_action(ToggleFollow(peer_id)) }) .with_tooltip::( - peer_id.0 as usize, + peer_id.as_u64() as usize, if is_followed { format!("Unfollow {}", peer_github_login) } else { @@ -487,22 +487,24 @@ impl CollabTitlebarItem { .boxed() } else if let ParticipantLocation::SharedProject { project_id } = location { let user_id = user.id; - MouseEventHandler::::new(peer_id.0 as usize, cx, move |_, _| content) - .with_cursor_style(CursorStyle::PointingHand) - .on_click(MouseButton::Left, move |_, cx| { - cx.dispatch_action(JoinProject { - project_id, - follow_user_id: user_id, - }) + MouseEventHandler::::new(peer_id.as_u64() as usize, cx, move |_, _| { + content + }) + .with_cursor_style(CursorStyle::PointingHand) + .on_click(MouseButton::Left, move |_, cx| { + cx.dispatch_action(JoinProject { + project_id, + follow_user_id: user_id, }) - .with_tooltip::( - peer_id.0 as usize, - format!("Follow {} into external project", peer_github_login), - Some(Box::new(FollowNextCollaborator)), - theme.tooltip.clone(), - cx, - ) - .boxed() + }) + .with_tooltip::( + peer_id.as_u64() as usize, + format!("Follow {} into external project", peer_github_login), + Some(Box::new(FollowNextCollaborator)), + theme.tooltip.clone(), + cx, + ) + .boxed() } else { content } diff --git a/crates/collab_ui/src/contact_list.rs b/crates/collab_ui/src/contact_list.rs index bc8b2947c4b278fa05d6e78c091a6517b971e890..48a4d1a2b541d69f7fa8c41ab21da1e766103301 100644 --- a/crates/collab_ui/src/contact_list.rs +++ b/crates/collab_ui/src/contact_list.rs @@ -2,7 +2,7 @@ use std::{mem, sync::Arc}; use crate::contacts_popover; use call::ActiveCall; -use client::{Contact, PeerId, User, UserStore}; +use client::{proto::PeerId, Contact, User, UserStore}; use editor::{Cancel, Editor}; use fuzzy::{match_strings, StringMatchCandidate}; use gpui::{ @@ -465,7 +465,7 @@ impl ContactList { room.remote_participants() .iter() .map(|(peer_id, participant)| StringMatchCandidate { - id: peer_id.0 as usize, + id: peer_id.as_u64() as usize, string: participant.user.github_login.clone(), char_bag: participant.user.github_login.chars().collect(), }), @@ -479,7 +479,7 @@ impl ContactList { executor.clone(), )); for mat in matches { - let peer_id = PeerId(mat.candidate_id as u32); + let peer_id = PeerId::from_u64(mat.candidate_id as u64); let participant = &room.remote_participants()[&peer_id]; participant_entries.push(ContactEntry::CallParticipant { user: participant.user.clone(), @@ -881,75 +881,80 @@ impl ContactList { let baseline_offset = row.name.text.baseline_offset(font_cache) + (theme.row_height - line_height) / 2.; - MouseEventHandler::::new(peer_id.0 as usize, cx, |mouse_state, _| { - let tree_branch = *tree_branch.style_for(mouse_state, is_selected); - let row = theme.project_row.style_for(mouse_state, is_selected); - - Flex::row() - .with_child( - Stack::new() - .with_child( - Canvas::new(move |bounds, _, cx| { - let start_x = bounds.min_x() + (bounds.width() / 2.) - - (tree_branch.width / 2.); - let end_x = bounds.max_x(); - let start_y = bounds.min_y(); - let end_y = bounds.min_y() + baseline_offset - (cap_height / 2.); + MouseEventHandler::::new( + peer_id.as_u64() as usize, + cx, + |mouse_state, _| { + let tree_branch = *tree_branch.style_for(mouse_state, is_selected); + let row = theme.project_row.style_for(mouse_state, is_selected); - cx.scene.push_quad(gpui::Quad { - bounds: RectF::from_points( - vec2f(start_x, start_y), - vec2f( - start_x + tree_branch.width, - if is_last { end_y } else { bounds.max_y() }, + Flex::row() + .with_child( + Stack::new() + .with_child( + Canvas::new(move |bounds, _, cx| { + let start_x = bounds.min_x() + (bounds.width() / 2.) + - (tree_branch.width / 2.); + let end_x = bounds.max_x(); + let start_y = bounds.min_y(); + let end_y = + bounds.min_y() + baseline_offset - (cap_height / 2.); + + cx.scene.push_quad(gpui::Quad { + bounds: RectF::from_points( + vec2f(start_x, start_y), + vec2f( + start_x + tree_branch.width, + if is_last { end_y } else { bounds.max_y() }, + ), ), - ), - background: Some(tree_branch.color), - border: gpui::Border::default(), - corner_radius: 0., - }); - cx.scene.push_quad(gpui::Quad { - bounds: RectF::from_points( - vec2f(start_x, end_y), - vec2f(end_x, end_y + tree_branch.width), - ), - background: Some(tree_branch.color), - border: gpui::Border::default(), - corner_radius: 0., - }); - }) + background: Some(tree_branch.color), + border: gpui::Border::default(), + corner_radius: 0., + }); + cx.scene.push_quad(gpui::Quad { + bounds: RectF::from_points( + vec2f(start_x, end_y), + vec2f(end_x, end_y + tree_branch.width), + ), + background: Some(tree_branch.color), + border: gpui::Border::default(), + corner_radius: 0., + }); + }) + .boxed(), + ) + .constrained() + .with_width(host_avatar_height) .boxed(), - ) - .constrained() - .with_width(host_avatar_height) - .boxed(), - ) - .with_child( - Svg::new("icons/disable_screen_sharing_12.svg") - .with_color(row.icon.color) - .constrained() - .with_width(row.icon.width) - .aligned() - .left() - .contained() - .with_style(row.icon.container) - .boxed(), - ) - .with_child( - Label::new("Screen".into(), row.name.text.clone()) - .aligned() - .left() - .contained() - .with_style(row.name.container) - .flex(1., false) - .boxed(), - ) - .constrained() - .with_height(theme.row_height) - .contained() - .with_style(row.container) - .boxed() - }) + ) + .with_child( + Svg::new("icons/disable_screen_sharing_12.svg") + .with_color(row.icon.color) + .constrained() + .with_width(row.icon.width) + .aligned() + .left() + .contained() + .with_style(row.icon.container) + .boxed(), + ) + .with_child( + Label::new("Screen".into(), row.name.text.clone()) + .aligned() + .left() + .contained() + .with_style(row.name.container) + .flex(1., false) + .boxed(), + ) + .constrained() + .with_height(theme.row_height) + .contained() + .with_style(row.container) + .boxed() + }, + ) .with_cursor_style(CursorStyle::PointingHand) .on_click(MouseButton::Left, move |_, cx| { cx.dispatch_action(OpenSharedScreen { peer_id }); diff --git a/crates/editor/src/editor_tests.rs b/crates/editor/src/editor_tests.rs index c3c15bb5b4169ee4f995bf52ddd98aed0014e66a..2fcc5f0014ef942a9d12762a6ae424de1aabfcc1 100644 --- a/crates/editor/src/editor_tests.rs +++ b/crates/editor/src/editor_tests.rs @@ -1,9 +1,7 @@ -use std::{cell::RefCell, rc::Rc, time::Instant}; - use drag_and_drop::DragAndDrop; use futures::StreamExt; use indoc::indoc; -use rpc::PeerId; +use std::{cell::RefCell, rc::Rc, time::Instant}; use unindent::Unindent; use super::*; @@ -5128,7 +5126,7 @@ async fn test_following_with_multiple_excerpts(cx: &mut gpui::TestAppContext) { pane.clone(), project.clone(), ViewId { - creator: PeerId(0), + creator: Default::default(), id: 0, }, &mut state_message, @@ -5223,7 +5221,7 @@ async fn test_following_with_multiple_excerpts(cx: &mut gpui::TestAppContext) { pane.clone(), project.clone(), ViewId { - creator: PeerId(0), + creator: Default::default(), id: 0, }, &mut state_message, diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index 3ea12617351ecc1708741ad1a60aef6e73702740..a0eb84558154e8f26c129ce6c7d5f7379b3fd0d6 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -3,7 +3,7 @@ use crate::{ }; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use client::{proto, PeerId}; +use client::proto::{self, PeerId}; use gpui::{AppContext, AsyncAppContext, ModelHandle}; use language::{ point_from_lsp, point_to_lsp, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 9b4a163af4e6f78e397c730f444a4f697ece1e22..7f2fcb516f82bd6102e9abd391c406b2b6e34af8 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -7,7 +7,7 @@ pub mod worktree; mod project_tests; use anyhow::{anyhow, Context, Result}; -use client::{proto, Client, PeerId, TypedEnvelope, UserStore}; +use client::{proto, Client, TypedEnvelope, UserStore}; use clock::ReplicaId; use collections::{hash_map, BTreeMap, HashMap, HashSet}; use futures::{ @@ -15,7 +15,6 @@ use futures::{ future::Shared, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, }; - use gpui::{ AnyModelHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, UpgradeModelHandle, WeakModelHandle, @@ -103,11 +102,11 @@ pub struct Project { user_store: ModelHandle, fs: Arc, client_state: Option, - collaborators: HashMap, + collaborators: HashMap, client_subscriptions: Vec, _subscriptions: Vec, opened_buffer: (watch::Sender<()>, watch::Receiver<()>), - shared_buffers: HashMap>, + shared_buffers: HashMap>, #[allow(clippy::type_complexity)] loading_buffers: HashMap< ProjectPath, @@ -164,7 +163,7 @@ enum ProjectClientState { #[derive(Clone, Debug)] pub struct Collaborator { - pub peer_id: PeerId, + pub peer_id: proto::PeerId, pub replica_id: ReplicaId, } @@ -185,7 +184,7 @@ pub enum Event { }, RemoteIdChanged(Option), DisconnectedFromHost, - CollaboratorLeft(PeerId), + CollaboratorLeft(proto::PeerId), } pub enum LanguageServerState { @@ -555,7 +554,7 @@ impl Project { .await?; let mut collaborators = HashMap::default(); for message in response.collaborators { - let collaborator = Collaborator::from_proto(message); + let collaborator = Collaborator::from_proto(message)?; collaborators.insert(collaborator.peer_id, collaborator); } @@ -754,7 +753,7 @@ impl Project { } } - pub fn collaborators(&self) -> &HashMap { + pub fn collaborators(&self) -> &HashMap { &self.collaborators } @@ -4605,7 +4604,7 @@ impl Project { .take() .ok_or_else(|| anyhow!("empty collaborator"))?; - let collaborator = Collaborator::from_proto(collaborator); + let collaborator = Collaborator::from_proto(collaborator)?; this.update(&mut cx, |this, cx| { this.collaborators .insert(collaborator.peer_id, collaborator); @@ -4622,7 +4621,10 @@ impl Project { mut cx: AsyncAppContext, ) -> Result<()> { this.update(&mut cx, |this, cx| { - let peer_id = PeerId(envelope.payload.peer_id); + let peer_id = envelope + .payload + .peer_id + .ok_or_else(|| anyhow!("invalid peer id"))?; let replica_id = this .collaborators .remove(&peer_id) @@ -5489,7 +5491,7 @@ impl Project { fn serialize_project_transaction_for_peer( &mut self, project_transaction: ProjectTransaction, - peer_id: PeerId, + peer_id: proto::PeerId, cx: &AppContext, ) -> proto::ProjectTransaction { let mut serialized_transaction = proto::ProjectTransaction { @@ -5545,7 +5547,7 @@ impl Project { fn create_buffer_for_peer( &mut self, buffer: &ModelHandle, - peer_id: PeerId, + peer_id: proto::PeerId, cx: &AppContext, ) -> u64 { let buffer_id = buffer.read(cx).remote_id(); @@ -5563,7 +5565,7 @@ impl Project { client.send(proto::CreateBufferForPeer { project_id, - peer_id: peer_id.0, + peer_id: Some(peer_id), variant: Some(proto::create_buffer_for_peer::Variant::State(state)), })?; @@ -5580,7 +5582,7 @@ impl Project { let is_last = operations.is_empty(); client.send(proto::CreateBufferForPeer { project_id, - peer_id: peer_id.0, + peer_id: Some(peer_id), variant: Some(proto::create_buffer_for_peer::Variant::Chunk( proto::BufferChunk { buffer_id, @@ -6036,11 +6038,11 @@ impl Entity for Project { } impl Collaborator { - fn from_proto(message: proto::Collaborator) -> Self { - Self { - peer_id: PeerId(message.peer_id), + fn from_proto(message: proto::Collaborator) -> Result { + Ok(Self { + peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?, replica_id: message.replica_id as ReplicaId, - } + }) } } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 84db17a494e0de99695b884b2ddfd2cf29503139..9528bd10b7de96ad1e05063c6c445786fc3d8f5c 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -1,10 +1,15 @@ syntax = "proto3"; package zed.messages; +message PeerId { + uint32 owner_id = 1; + uint32 id = 2; +} + message Envelope { uint32 id = 1; optional uint32 responding_to = 2; - optional uint32 original_sender_id = 3; + optional PeerId original_sender_id = 3; oneof payload { Hello hello = 4; Ack ack = 5; @@ -125,7 +130,7 @@ message Envelope { // Messages message Hello { - uint32 peer_id = 1; + PeerId peer_id = 1; } message Ping {} @@ -167,7 +172,7 @@ message Room { message Participant { uint64 user_id = 1; - uint32 peer_id = 2; + PeerId peer_id = 2; repeated ParticipantProject projects = 3; ParticipantLocation location = 4; } @@ -319,7 +324,7 @@ message AddProjectCollaborator { message RemoveProjectCollaborator { uint64 project_id = 1; - uint32 peer_id = 2; + PeerId peer_id = 2; } message GetDefinition { @@ -438,7 +443,7 @@ message OpenBufferResponse { message CreateBufferForPeer { uint64 project_id = 1; - uint32 peer_id = 2; + PeerId peer_id = 2; oneof variant { BufferState state = 3; BufferChunk chunk = 4; @@ -794,7 +799,7 @@ message UpdateDiagnostics { message Follow { uint64 project_id = 1; - uint32 leader_id = 2; + PeerId leader_id = 2; } message FollowResponse { @@ -804,7 +809,7 @@ message FollowResponse { message UpdateFollowers { uint64 project_id = 1; - repeated uint32 follower_ids = 2; + repeated PeerId follower_ids = 2; oneof variant { UpdateActiveView update_active_view = 3; View create_view = 4; @@ -814,7 +819,7 @@ message UpdateFollowers { message Unfollow { uint64 project_id = 1; - uint32 leader_id = 2; + PeerId leader_id = 2; } message GetPrivateUserInfo {} @@ -827,18 +832,18 @@ message GetPrivateUserInfoResponse { // Entities message ViewId { - uint32 creator = 1; + PeerId creator = 1; uint64 id = 2; } message UpdateActiveView { optional ViewId id = 1; - optional uint32 leader_id = 2; + optional PeerId leader_id = 2; } message UpdateView { ViewId id = 1; - optional uint32 leader_id = 2; + optional PeerId leader_id = 2; oneof variant { Editor editor = 3; @@ -856,7 +861,7 @@ message UpdateView { message View { ViewId id = 1; - optional uint32 leader_id = 2; + optional PeerId leader_id = 2; oneof variant { Editor editor = 3; @@ -874,7 +879,7 @@ message View { } message Collaborator { - uint32 peer_id = 1; + PeerId peer_id = 1; uint32 replica_id = 2; uint64 user_id = 3; } diff --git a/crates/rpc/src/macros.rs b/crates/rpc/src/macros.rs index 38d35893ee70c34615da14aabb561c417f83170b..89e605540da1157f5530ad7236b23358dc127c1a 100644 --- a/crates/rpc/src/macros.rs +++ b/crates/rpc/src/macros.rs @@ -6,7 +6,10 @@ macro_rules! messages { $(Some(envelope::Payload::$name(payload)) => { Some(Box::new(TypedEnvelope { sender_id, - original_sender_id: envelope.original_sender_id.map(PeerId), + original_sender_id: envelope.original_sender_id.map(|original_sender| PeerId { + owner_id: original_sender.owner_id, + id: original_sender.id + }), message_id: envelope.id, payload, })) @@ -24,7 +27,7 @@ macro_rules! messages { self, id: u32, responding_to: Option, - original_sender_id: Option, + original_sender_id: Option, ) -> Envelope { Envelope { id, diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 66ba6a40292d87d13a83943b0e40239ee37b526d..d2a4e6e0804119cbdbcb316861c27caeab57850a 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -1,5 +1,5 @@ use super::{ - proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage}, + proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage}, Connection, }; use anyhow::{anyhow, Context, Result}; @@ -11,9 +11,8 @@ use futures::{ }; use parking_lot::{Mutex, RwLock}; use serde::{ser::SerializeStruct, Serialize}; -use std::sync::atomic::Ordering::SeqCst; +use std::{fmt, sync::atomic::Ordering::SeqCst}; use std::{ - fmt, future::Future, marker::PhantomData, sync::{ @@ -25,20 +24,32 @@ use std::{ use tracing::instrument; #[derive(Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Serialize)] -pub struct ConnectionId(pub u32); +pub struct ConnectionId { + pub owner_id: u32, + pub id: u32, +} -impl fmt::Display for ConnectionId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) +impl Into for ConnectionId { + fn into(self) -> PeerId { + PeerId { + owner_id: self.owner_id, + id: self.id, + } } } -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] -pub struct PeerId(pub u32); +impl From for ConnectionId { + fn from(peer_id: PeerId) -> Self { + Self { + owner_id: peer_id.owner_id, + id: peer_id.id, + } + } +} -impl fmt::Display for PeerId { +impl fmt::Display for ConnectionId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) + write!(f, "{}/{}", self.owner_id, self.id) } } @@ -85,6 +96,7 @@ impl TypedEnvelope { } pub struct Peer { + epoch: AtomicU32, pub connections: RwLock>, next_connection_id: AtomicU32, } @@ -105,13 +117,18 @@ const WRITE_TIMEOUT: Duration = Duration::from_secs(2); pub const RECEIVE_TIMEOUT: Duration = Duration::from_secs(5); impl Peer { - pub fn new() -> Arc { + pub fn new(epoch: u32) -> Arc { Arc::new(Self { + epoch: AtomicU32::new(epoch), connections: Default::default(), next_connection_id: Default::default(), }) } + pub fn epoch(&self) -> u32 { + self.epoch.load(SeqCst) + } + #[instrument(skip_all)] pub fn add_connection( self: &Arc, @@ -138,7 +155,10 @@ impl Peer { let (mut incoming_tx, incoming_rx) = mpsc::channel(INCOMING_BUFFER_SIZE); let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded(); - let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); + let connection_id = ConnectionId { + owner_id: self.epoch.load(SeqCst), + id: self.next_connection_id.fetch_add(1, SeqCst), + }; let connection_state = ConnectionState { outgoing_tx, next_message_id: Default::default(), @@ -255,11 +275,7 @@ impl Peer { let message_id = incoming.id; tracing::debug!(?incoming, "incoming message future: start"); let _end = util::defer(move || { - tracing::debug!( - %connection_id, - message_id, - "incoming message future: end" - ); + tracing::debug!(%connection_id, message_id, "incoming message future: end"); }); if let Some(responding_to) = incoming.responding_to { @@ -306,11 +322,7 @@ impl Peer { None } else { - tracing::debug!( - %connection_id, - message_id, - "incoming message: received" - ); + tracing::debug!(%connection_id, message_id, "incoming message: received"); proto::build_typed_envelope(connection_id, incoming).or_else(|| { tracing::error!( %connection_id, @@ -343,7 +355,13 @@ impl Peer { self.connections.write().remove(&connection_id); } - pub fn reset(&self) { + pub fn reset(&self, epoch: u32) { + self.teardown(); + self.next_connection_id.store(0, SeqCst); + self.epoch.store(epoch, SeqCst); + } + + pub fn teardown(&self) { self.connections.write().clear(); } @@ -384,7 +402,7 @@ impl Peer { .unbounded_send(proto::Message::Envelope(request.into_envelope( message_id, None, - original_sender_id.map(|id| id.0), + original_sender_id.map(Into::into), ))) .map_err(|_| anyhow!("connection was closed"))?; Ok(()) @@ -433,7 +451,7 @@ impl Peer { .unbounded_send(proto::Message::Envelope(message.into_envelope( message_id, None, - Some(sender_id.0), + Some(sender_id.into()), )))?; Ok(()) } @@ -515,9 +533,9 @@ mod tests { let executor = cx.foreground(); // create 2 clients connected to 1 server - let server = Peer::new(); - let client1 = Peer::new(); - let client2 = Peer::new(); + let server = Peer::new(0); + let client1 = Peer::new(0); + let client2 = Peer::new(0); let (client1_to_server_conn, server_to_client_1_conn, _kill) = Connection::in_memory(cx.background()); @@ -609,8 +627,8 @@ mod tests { #[gpui::test(iterations = 50)] async fn test_order_of_response_and_incoming(cx: &mut TestAppContext) { let executor = cx.foreground(); - let server = Peer::new(); - let client = Peer::new(); + let server = Peer::new(0); + let client = Peer::new(0); let (client_to_server_conn, server_to_client_conn, _kill) = Connection::in_memory(cx.background()); @@ -707,8 +725,8 @@ mod tests { #[gpui::test(iterations = 50)] async fn test_dropping_request_before_completion(cx: &mut TestAppContext) { let executor = cx.foreground(); - let server = Peer::new(); - let client = Peer::new(); + let server = Peer::new(0); + let client = Peer::new(0); let (client_to_server_conn, server_to_client_conn, _kill) = Connection::in_memory(cx.background()); @@ -822,7 +840,7 @@ mod tests { let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background()); - let client = Peer::new(); + let client = Peer::new(0); let (connection_id, io_handler, mut incoming) = client.add_test_connection(client_conn, cx.background()); @@ -857,7 +875,7 @@ mod tests { let executor = cx.foreground(); let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background()); - let client = Peer::new(); + let client = Peer::new(0); let (connection_id, io_handler, mut incoming) = client.add_test_connection(client_conn, cx.background()); executor.spawn(io_handler).detach(); diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 6d9bc9a0aa348af8c1a14f442323fcf06064688e..385caf3565f639b973007a67669378f59584ea55 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -1,14 +1,16 @@ -use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope}; +use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope}; use anyhow::{anyhow, Result}; use async_tungstenite::tungstenite::Message as WebSocketMessage; use futures::{SinkExt as _, StreamExt as _}; use prost::Message as _; use serde::Serialize; use std::any::{Any, TypeId}; -use std::{cmp, iter, mem}; +use std::fmt; +use std::str::FromStr; use std::{ + cmp, fmt::Debug, - io, + io, iter, mem, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -21,7 +23,7 @@ pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 's self, id: u32, responding_to: Option, - original_sender_id: Option, + original_sender_id: Option, ) -> Envelope; fn from_envelope(envelope: Envelope) -> Option; } @@ -74,6 +76,66 @@ impl AnyTypedEnvelope for TypedEnvelope { } } +impl PeerId { + pub fn from_u64(peer_id: u64) -> Self { + let owner_id = (peer_id >> 32) as u32; + let id = peer_id as u32; + Self { owner_id, id } + } + + pub fn as_u64(self) -> u64 { + ((self.owner_id as u64) << 32) | (self.id as u64) + } +} + +impl Copy for PeerId {} + +impl Eq for PeerId {} + +impl Ord for PeerId { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.owner_id + .cmp(&other.owner_id) + .then_with(|| self.id.cmp(&other.id)) + } +} + +impl PartialOrd for PeerId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl std::hash::Hash for PeerId { + fn hash(&self, state: &mut H) { + self.owner_id.hash(state); + self.id.hash(state); + } +} + +impl fmt::Display for PeerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.owner_id, self.id) + } +} + +impl FromStr for PeerId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let mut components = s.split('/'); + let owner_id = components + .next() + .ok_or_else(|| anyhow!("invalid peer id {:?}", s))? + .parse()?; + let id = components + .next() + .ok_or_else(|| anyhow!("invalid peer id {:?}", s))? + .parse()?; + Ok(PeerId { owner_id, id }) + } +} + messages!( (Ack, Foreground), (AddProjectCollaborator, Foreground), @@ -477,4 +539,28 @@ mod tests { stream.read().await.unwrap(); assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN); } + + #[gpui::test] + fn test_converting_peer_id_from_and_to_u64() { + let peer_id = PeerId { + owner_id: 10, + id: 3, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + owner_id: u32::MAX, + id: 3, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + owner_id: 10, + id: u32::MAX, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + owner_id: u32::MAX, + id: u32::MAX, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + } } diff --git a/crates/workspace/src/item.rs b/crates/workspace/src/item.rs index 1d6b4a9eb527fb931d961229f2400ce3aaecaccd..0a8311fd5cb3998a392908c522fc7f5e1efbe792 100644 --- a/crates/workspace/src/item.rs +++ b/crates/workspace/src/item.rs @@ -286,7 +286,7 @@ impl ItemHandle for ViewHandle { .remote_id(&workspace.client, cx) .map(|id| id.to_proto()), variant: Some(message), - leader_id: workspace.leader_for_pane(&pane).map(|id| id.0), + leader_id: workspace.leader_for_pane(&pane), }), cx, ); @@ -342,7 +342,7 @@ impl ItemHandle for ViewHandle { .remote_id(&this.client, cx) .map(|id| id.to_proto()), variant: pending_update.borrow_mut().take(), - leader_id: leader_id.map(|id| id.0), + leader_id, }, ), cx, diff --git a/crates/workspace/src/shared_screen.rs b/crates/workspace/src/shared_screen.rs index 7dee642423c805e9581520b523c5182d424c8ccb..d292ece3d5fb821f07e8ed80a0931ebb44bf0e96 100644 --- a/crates/workspace/src/shared_screen.rs +++ b/crates/workspace/src/shared_screen.rs @@ -3,7 +3,7 @@ use crate::{ }; use anyhow::{anyhow, Result}; use call::participant::{Frame, RemoteVideoTrack}; -use client::{PeerId, User}; +use client::{proto::PeerId, User}; use futures::StreamExt; use gpui::{ elements::*, diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 5cf65568f82b5240ffb5caba6d7e25b9834395bc..c30dc2ea2997ff25a9dfe7287f40cb2d70a85a90 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -16,7 +16,10 @@ mod toolbar; use anyhow::{anyhow, Result}; use call::ActiveCall; -use client::{proto, Client, PeerId, TypedEnvelope, UserStore}; +use client::{ + proto::{self, PeerId}, + Client, TypedEnvelope, UserStore, +}; use collections::{hash_map, HashMap, HashSet}; use dock::{Dock, DockDefaultItemFactory, ToggleDockButton}; use drag_and_drop::DragAndDrop; @@ -1466,7 +1469,7 @@ impl Workspace { .remote_id(&self.client, cx) .map(|id| id.to_proto()) }), - leader_id: self.leader_for_pane(&pane).map(|id| id.0), + leader_id: self.leader_for_pane(&pane), }), cx, ); @@ -1643,7 +1646,7 @@ impl Workspace { let project_id = self.project.read(cx).remote_id()?; let request = self.client.request(proto::Follow { project_id, - leader_id: leader_id.0, + leader_id: Some(leader_id), }); Some(cx.spawn_weak(|this, mut cx| async move { let response = request.await?; @@ -1654,7 +1657,11 @@ impl Workspace { .get_mut(&leader_id) .and_then(|states_by_pane| states_by_pane.get_mut(&pane)) .ok_or_else(|| anyhow!("following interrupted"))?; - state.active_view_id = response.active_view_id.map(ViewId::from_proto); + state.active_view_id = if let Some(active_view_id) = response.active_view_id { + Some(ViewId::from_proto(active_view_id)?) + } else { + None + }; Ok::<_, anyhow::Error>(()) })?; Self::add_views_from_leader( @@ -1720,7 +1727,7 @@ impl Workspace { self.client .send(proto::Unfollow { project_id, - leader_id: leader_id.0, + leader_id: Some(leader_id), }) .log_err(); } @@ -1920,7 +1927,7 @@ impl Workspace { .panes() .iter() .flat_map(|pane| { - let leader_id = this.leader_for_pane(pane).map(|id| id.0); + let leader_id = this.leader_for_pane(pane); pane.read(cx).items().filter_map({ let cx = &cx; move |item| { @@ -1980,10 +1987,15 @@ impl Workspace { if let Some(state) = this.follower_states_by_leader.get_mut(&leader_id) { for state in state.values_mut() { state.active_view_id = - update_active_view.id.clone().map(ViewId::from_proto); + if let Some(active_view_id) = update_active_view.id.clone() { + Some(ViewId::from_proto(active_view_id)?) + } else { + None + }; } } - }); + anyhow::Ok(()) + })?; } proto::update_followers::Variant::UpdateView(update_view) => { let variant = update_view @@ -1997,15 +2009,14 @@ impl Workspace { let project = this.project.clone(); if let Some(state) = this.follower_states_by_leader.get_mut(&leader_id) { for state in state.values_mut() { - if let Some(item) = state - .items_by_leader_view_id - .get(&ViewId::from_proto(id.clone())) - { + let view_id = ViewId::from_proto(id.clone())?; + if let Some(item) = state.items_by_leader_view_id.get(&view_id) { tasks.push(item.apply_update_proto(&project, variant.clone(), cx)); } } } - }); + anyhow::Ok(()) + })?; try_join_all(tasks).await.log_err(); } proto::update_followers::Variant::CreateView(view) => { @@ -2054,7 +2065,7 @@ impl Workspace { let mut leader_view_ids = Vec::new(); for view in &views { let Some(id) = &view.id else { continue }; - let id = ViewId::from_proto(id.clone()); + let id = ViewId::from_proto(id.clone())?; let mut variant = view.variant.clone(); if variant.is_none() { Err(anyhow!("missing variant"))?; @@ -2105,7 +2116,7 @@ impl Workspace { self.client .send(proto::UpdateFollowers { project_id, - follower_ids: self.leader_state.followers.iter().map(|f| f.0).collect(), + follower_ids: self.leader_state.followers.iter().copied().collect(), variant: Some(update), }) .log_err(); @@ -2587,16 +2598,18 @@ impl View for Workspace { } impl ViewId { - pub(crate) fn from_proto(message: proto::ViewId) -> Self { - Self { - creator: PeerId(message.creator), + pub(crate) fn from_proto(message: proto::ViewId) -> Result { + Ok(Self { + creator: message + .creator + .ok_or_else(|| anyhow!("creator is missing"))?, id: message.id, - } + }) } pub(crate) fn to_proto(&self) -> proto::ViewId { proto::ViewId { - creator: self.creator.0, + creator: Some(self.creator), id: self.id, } }