Detailed changes
@@ -823,6 +823,7 @@ dependencies = [
"futures 0.3.25",
"gpui",
"live_kit_client",
+ "log",
"media",
"postage",
"project",
@@ -1130,7 +1131,7 @@ dependencies = [
[[package]]
name = "collab"
-version = "0.3.4"
+version = "0.3.14"
dependencies = [
"anyhow",
"async-tungstenite",
@@ -21,6 +21,7 @@ test-support = [
client = { path = "../client" }
collections = { path = "../collections" }
gpui = { path = "../gpui" }
+log = "0.4"
live_kit_client = { path = "../live_kit_client" }
media = { path = "../media" }
project = { path = "../project" }
@@ -3,7 +3,7 @@ use crate::{
IncomingCall,
};
use anyhow::{anyhow, Result};
-use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
+use client::{proto, Client, TypedEnvelope, User, UserStore};
use collections::{BTreeMap, HashSet};
use futures::{FutureExt, StreamExt};
use gpui::{
@@ -13,17 +13,17 @@ use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUp
use postage::stream::Stream;
use project::Project;
use std::{mem, sync::Arc, time::Duration};
-use util::{post_inc, ResultExt};
+use util::{post_inc, ResultExt, TryFutureExt};
pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Event {
ParticipantLocationChanged {
- participant_id: PeerId,
+ participant_id: proto::PeerId,
},
RemoteVideoTracksChanged {
- participant_id: PeerId,
+ participant_id: proto::PeerId,
},
RemoteProjectShared {
owner: Arc<User>,
@@ -41,7 +41,7 @@ pub struct Room {
live_kit: Option<LiveKitRoom>,
status: RoomStatus,
local_participant: LocalParticipant,
- remote_participants: BTreeMap<PeerId, RemoteParticipant>,
+ remote_participants: BTreeMap<proto::PeerId, RemoteParticipant>,
pending_participants: Vec<Arc<User>>,
participant_user_ids: HashSet<u64>,
pending_call_count: usize,
@@ -50,7 +50,7 @@ pub struct Room {
user_store: ModelHandle<UserStore>,
subscriptions: Vec<client::Subscription>,
pending_room_update: Option<Task<()>>,
- maintain_connection: Option<Task<Result<()>>>,
+ maintain_connection: Option<Task<Option<()>>>,
}
impl Entity for Room {
@@ -58,6 +58,7 @@ impl Entity for Room {
fn release(&mut self, _: &mut MutableAppContext) {
if self.status.is_online() {
+ log::info!("room was released, sending leave message");
self.client.send(proto::LeaveRoom {}).log_err();
}
}
@@ -122,7 +123,7 @@ impl Room {
};
let maintain_connection =
- cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx));
+ cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
Self {
id,
@@ -229,6 +230,7 @@ impl Room {
cx.notify();
cx.emit(Event::Left);
+ log::info!("leaving room");
self.status = RoomStatus::Offline;
self.remote_participants.clear();
self.pending_participants.clear();
@@ -254,6 +256,7 @@ impl Room {
.map_or(false, |s| s.is_connected());
// Even if we're initially connected, any future change of the status means we momentarily disconnected.
if !is_connected || client_status.next().await.is_some() {
+ log::info!("detected client disconnection");
let room_id = this
.upgrade(&cx)
.ok_or_else(|| anyhow!("room was dropped"))?
@@ -269,8 +272,13 @@ impl Room {
let client_reconnection = async {
let mut remaining_attempts = 3;
while remaining_attempts > 0 {
+ log::info!(
+ "waiting for client status change, remaining attempts {}",
+ remaining_attempts
+ );
if let Some(status) = client_status.next().await {
if status.is_connected() {
+ log::info!("client reconnected, attempting to rejoin room");
let rejoin_room = async {
let response =
client.request(proto::JoinRoom { id: room_id }).await?;
@@ -285,7 +293,7 @@ impl Room {
anyhow::Ok(())
};
- if rejoin_room.await.is_ok() {
+ if rejoin_room.await.log_err().is_some() {
return true;
} else {
remaining_attempts -= 1;
@@ -303,12 +311,15 @@ impl Room {
futures::select_biased! {
reconnected = client_reconnection => {
if reconnected {
+ log::info!("successfully reconnected to room");
// If we successfully joined the room, go back around the loop
// waiting for future connection status changes.
continue;
}
}
- _ = reconnection_timeout => {}
+ _ = reconnection_timeout => {
+ log::info!("room reconnection timeout expired");
+ }
}
}
@@ -316,6 +327,7 @@ impl Room {
// or an error occurred while trying to re-join the room. Either way
// we leave the room and return an error.
if let Some(this) = this.upgrade(&cx) {
+ log::info!("reconnection failed, leaving room");
let _ = this.update(&mut cx, |this, cx| this.leave(cx));
}
return Err(anyhow!(
@@ -337,7 +349,7 @@ impl Room {
&self.local_participant
}
- pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
+ pub fn remote_participants(&self) -> &BTreeMap<proto::PeerId, RemoteParticipant> {
&self.remote_participants
}
@@ -407,7 +419,7 @@ impl Room {
if let Some(participants) = remote_participants.log_err() {
let mut participant_peer_ids = HashSet::default();
for (participant, user) in room.participants.into_iter().zip(participants) {
- let peer_id = PeerId(participant.peer_id);
+ let Some(peer_id) = participant.peer_id else { continue };
this.participant_user_ids.insert(participant.user_id);
participant_peer_ids.insert(peer_id);
@@ -464,7 +476,7 @@ impl Room {
if let Some(live_kit) = this.live_kit.as_ref() {
let tracks =
- live_kit.room.remote_video_tracks(&peer_id.0.to_string());
+ live_kit.room.remote_video_tracks(&peer_id.to_string());
for track in tracks {
this.remote_video_track_updated(
RemoteVideoTrackUpdate::Subscribed(track),
@@ -499,6 +511,7 @@ impl Room {
this.pending_room_update.take();
if this.should_leave() {
+ log::info!("room is empty, leaving");
let _ = this.leave(cx);
}
@@ -518,7 +531,7 @@ impl Room {
) -> Result<()> {
match change {
RemoteVideoTrackUpdate::Subscribed(track) => {
- let peer_id = PeerId(track.publisher_id().parse()?);
+ let peer_id = track.publisher_id().parse()?;
let track_id = track.sid().to_string();
let participant = self
.remote_participants
@@ -538,7 +551,7 @@ impl Room {
publisher_id,
track_id,
} => {
- let peer_id = PeerId(publisher_id.parse()?);
+ let peer_id = publisher_id.parse()?;
let participant = self
.remote_participants
.get_mut(&peer_id)
@@ -23,7 +23,7 @@ use lazy_static::lazy_static;
use parking_lot::RwLock;
use postage::watch;
use rand::prelude::*;
-use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage};
+use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage};
use serde::Deserialize;
use std::{
any::TypeId,
@@ -140,7 +140,7 @@ impl EstablishConnectionError {
}
}
-#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+#[derive(Copy, Clone, Debug, PartialEq)]
pub enum Status {
SignedOut,
UpgradeRequired,
@@ -306,7 +306,7 @@ impl Client {
pub fn new(http: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
Arc::new(Self {
id: 0,
- peer: Peer::new(),
+ peer: Peer::new(0),
telemetry: Telemetry::new(http.clone(), cx),
http,
state: Default::default(),
@@ -333,14 +333,14 @@ impl Client {
}
#[cfg(any(test, feature = "test-support"))]
- pub fn tear_down(&self) {
+ pub fn teardown(&self) {
let mut state = self.state.write();
state._reconnect_task.take();
state.message_handlers.clear();
state.models_by_message_type.clear();
state.entities_by_type_and_remote_id.clear();
state.entity_id_extractors.clear();
- self.peer.reset();
+ self.peer.teardown();
}
#[cfg(any(test, feature = "test-support"))]
@@ -810,7 +810,11 @@ impl Client {
hello_message_type_name
)
})?;
- Ok(PeerId(hello.payload.peer_id))
+ let peer_id = hello
+ .payload
+ .peer_id
+ .ok_or_else(|| anyhow!("invalid peer id"))?;
+ Ok(peer_id)
};
let peer_id = match peer_id.await {
@@ -822,7 +826,7 @@ impl Client {
};
log::info!(
- "set status to connected (connection id: {}, peer id: {})",
+ "set status to connected (connection id: {:?}, peer id: {:?})",
connection_id,
peer_id
);
@@ -853,7 +857,7 @@ impl Client {
.spawn(async move {
match handle_io.await {
Ok(()) => {
- if *this.status().borrow()
+ if this.status().borrow().clone()
== (Status::Connected {
connection_id,
peer_id,
@@ -1194,7 +1198,7 @@ impl Client {
let mut state = self.state.write();
let type_name = message.payload_type_name();
let payload_type_id = message.payload_type_id();
- let sender_id = message.original_sender_id().map(|id| id.0);
+ let sender_id = message.original_sender_id();
let mut subscriber = None;
@@ -35,7 +35,7 @@ impl FakeServer {
cx: &TestAppContext,
) -> Self {
let server = Self {
- peer: Peer::new(),
+ peer: Peer::new(0),
state: Default::default(),
user_id: client_user_id,
executor: cx.foreground(),
@@ -92,7 +92,7 @@ impl FakeServer {
peer.send(
connection_id,
proto::Hello {
- peer_id: connection_id.0,
+ peer_id: Some(connection_id.into()),
},
)
.unwrap();
@@ -2,6 +2,7 @@ DATABASE_URL = "postgres://postgres@localhost/zed"
HTTP_PORT = 8080
API_TOKEN = "secret"
INVITE_LINK_PREFIX = "http://localhost:3000/invites/"
+ZED_ENVIRONMENT = "development"
LIVE_KIT_SERVER = "http://localhost:7880"
LIVE_KIT_KEY = "devkey"
LIVE_KIT_SECRET = "secret"
@@ -3,7 +3,7 @@ authors = ["Nathan Sobo <nathan@zed.dev>"]
default-run = "collab"
edition = "2021"
name = "collab"
-version = "0.3.4"
+version = "0.3.14"
[[bin]]
name = "collab"
@@ -59,6 +59,12 @@ spec:
ports:
- containerPort: 8080
protocol: TCP
+ readinessProbe:
+ httpGet:
+ path: /
+ port: 8080
+ initialDelaySeconds: 1
+ periodSeconds: 1
env:
- name: HTTP_PORT
value: "8080"
@@ -93,6 +99,8 @@ spec:
value: ${RUST_LOG}
- name: LOG_JSON
value: "true"
+ - name: ZED_ENVIRONMENT
+ value: ${ZED_ENVIRONMENT}
securityContext:
capabilities:
# FIXME - Switch to the more restrictive `PERFMON` capability.
@@ -43,11 +43,12 @@ CREATE TABLE "projects" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"room_id" INTEGER REFERENCES rooms (id) NOT NULL,
"host_user_id" INTEGER REFERENCES users (id) NOT NULL,
- "host_connection_id" INTEGER NOT NULL,
- "host_connection_epoch" TEXT NOT NULL,
+ "host_connection_id" INTEGER,
+ "host_connection_server_id" INTEGER REFERENCES servers (id) ON DELETE CASCADE,
"unregistered" BOOLEAN NOT NULL DEFAULT FALSE
);
-CREATE INDEX "index_projects_on_host_connection_epoch" ON "projects" ("host_connection_epoch");
+CREATE INDEX "index_projects_on_host_connection_server_id" ON "projects" ("host_connection_server_id");
+CREATE INDEX "index_projects_on_host_connection_id_and_host_connection_server_id" ON "projects" ("host_connection_id", "host_connection_server_id");
CREATE TABLE "worktrees" (
"project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
@@ -103,34 +104,39 @@ CREATE TABLE "project_collaborators" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
"connection_id" INTEGER NOT NULL,
- "connection_epoch" TEXT NOT NULL,
+ "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
"user_id" INTEGER NOT NULL,
"replica_id" INTEGER NOT NULL,
"is_host" BOOLEAN NOT NULL
);
CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id");
CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id");
-CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch");
+CREATE INDEX "index_project_collaborators_on_connection_server_id" ON "project_collaborators" ("connection_server_id");
CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id");
-CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch");
+CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_server_id" ON "project_collaborators" ("project_id", "connection_id", "connection_server_id");
CREATE TABLE "room_participants" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"room_id" INTEGER NOT NULL REFERENCES rooms (id),
"user_id" INTEGER NOT NULL REFERENCES users (id),
"answering_connection_id" INTEGER,
- "answering_connection_epoch" TEXT,
+ "answering_connection_server_id" INTEGER REFERENCES servers (id) ON DELETE CASCADE,
"answering_connection_lost" BOOLEAN NOT NULL,
"location_kind" INTEGER,
"location_project_id" INTEGER,
"initial_project_id" INTEGER,
"calling_user_id" INTEGER NOT NULL REFERENCES users (id),
"calling_connection_id" INTEGER NOT NULL,
- "calling_connection_epoch" TEXT NOT NULL
+ "calling_connection_server_id" INTEGER REFERENCES servers (id) ON DELETE SET NULL
);
CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id");
CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id");
-CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch");
-CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch");
+CREATE INDEX "index_room_participants_on_answering_connection_server_id" ON "room_participants" ("answering_connection_server_id");
+CREATE INDEX "index_room_participants_on_calling_connection_server_id" ON "room_participants" ("calling_connection_server_id");
CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id");
-CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch");
+CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_server_id" ON "room_participants" ("answering_connection_id", "answering_connection_server_id");
+
+CREATE TABLE "servers" (
+ "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "environment" VARCHAR NOT NULL
+);
@@ -0,0 +1,30 @@
+CREATE TABLE servers (
+ id SERIAL PRIMARY KEY,
+ environment VARCHAR NOT NULL
+);
+
+DROP TABLE worktree_extensions;
+DROP TABLE project_activity_periods;
+DELETE from projects;
+ALTER TABLE projects
+ DROP COLUMN host_connection_epoch,
+ ADD COLUMN host_connection_server_id INTEGER REFERENCES servers (id) ON DELETE CASCADE;
+CREATE INDEX "index_projects_on_host_connection_server_id" ON "projects" ("host_connection_server_id");
+CREATE INDEX "index_projects_on_host_connection_id_and_host_connection_server_id" ON "projects" ("host_connection_id", "host_connection_server_id");
+
+DELETE FROM project_collaborators;
+ALTER TABLE project_collaborators
+ DROP COLUMN connection_epoch,
+ ADD COLUMN connection_server_id INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE;
+CREATE INDEX "index_project_collaborators_on_connection_server_id" ON "project_collaborators" ("connection_server_id");
+CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_server_id" ON "project_collaborators" ("project_id", "connection_id", "connection_server_id");
+
+DELETE FROM room_participants;
+ALTER TABLE room_participants
+ DROP COLUMN answering_connection_epoch,
+ DROP COLUMN calling_connection_epoch,
+ ADD COLUMN answering_connection_server_id INTEGER REFERENCES servers (id) ON DELETE CASCADE,
+ ADD COLUMN calling_connection_server_id INTEGER REFERENCES servers (id) ON DELETE SET NULL;
+CREATE INDEX "index_room_participants_on_answering_connection_server_id" ON "room_participants" ("answering_connection_server_id");
+CREATE INDEX "index_room_participants_on_calling_connection_server_id" ON "room_participants" ("calling_connection_server_id");
+CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_server_id" ON "room_participants" ("answering_connection_id", "answering_connection_server_id");
@@ -5,6 +5,7 @@ mod project;
mod project_collaborator;
mod room;
mod room_participant;
+mod server;
mod signup;
#[cfg(test)]
mod tests;
@@ -48,7 +49,6 @@ pub struct Database {
background: Option<std::sync::Arc<gpui::executor::Background>>,
#[cfg(test)]
runtime: Option<tokio::runtime::Runtime>,
- epoch: parking_lot::RwLock<Uuid>,
}
impl Database {
@@ -61,18 +61,12 @@ impl Database {
background: None,
#[cfg(test)]
runtime: None,
- epoch: parking_lot::RwLock::new(Uuid::new_v4()),
})
}
#[cfg(test)]
pub fn reset(&self) {
self.rooms.clear();
- *self.epoch.write() = Uuid::new_v4();
- }
-
- fn epoch(&self) -> Uuid {
- *self.epoch.read()
}
pub async fn migrate(
@@ -116,14 +110,40 @@ impl Database {
Ok(new_migrations)
}
- pub async fn delete_stale_projects(&self) -> Result<()> {
+ pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
+ self.transaction(|tx| async move {
+ let server = server::ActiveModel {
+ environment: ActiveValue::set(environment.into()),
+ ..Default::default()
+ }
+ .insert(&*tx)
+ .await?;
+ Ok(server.id)
+ })
+ .await
+ }
+
+ pub async fn delete_stale_projects(
+ &self,
+ environment: &str,
+ new_server_id: ServerId,
+ ) -> Result<()> {
self.transaction(|tx| async move {
+ let stale_server_epochs = self
+ .stale_server_ids(environment, new_server_id, &tx)
+ .await?;
project_collaborator::Entity::delete_many()
- .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch()))
+ .filter(
+ project_collaborator::Column::ConnectionServerId
+ .is_in(stale_server_epochs.iter().copied()),
+ )
.exec(&*tx)
.await?;
project::Entity::delete_many()
- .filter(project::Column::HostConnectionEpoch.ne(self.epoch()))
+ .filter(
+ project::Column::HostConnectionServerId
+ .is_in(stale_server_epochs.iter().copied()),
+ )
.exec(&*tx)
.await?;
Ok(())
@@ -131,18 +151,28 @@ impl Database {
.await
}
- pub async fn stale_room_ids(&self) -> Result<Vec<RoomId>> {
+ pub async fn stale_room_ids(
+ &self,
+ environment: &str,
+ new_server_id: ServerId,
+ ) -> Result<Vec<RoomId>> {
self.transaction(|tx| async move {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
RoomId,
}
+ let stale_server_epochs = self
+ .stale_server_ids(environment, new_server_id, &tx)
+ .await?;
Ok(room_participant::Entity::find()
.select_only()
.column(room_participant::Column::RoomId)
.distinct()
- .filter(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()))
+ .filter(
+ room_participant::Column::AnsweringConnectionServerId
+ .is_in(stale_server_epochs),
+ )
.into_values::<_, QueryAs>()
.all(&*tx)
.await?)
@@ -150,12 +180,16 @@ impl Database {
.await
}
- pub async fn refresh_room(&self, room_id: RoomId) -> Result<RoomGuard<RefreshedRoom>> {
+ pub async fn refresh_room(
+ &self,
+ room_id: RoomId,
+ new_server_id: ServerId,
+ ) -> Result<RoomGuard<RefreshedRoom>> {
self.room_transaction(|tx| async move {
let stale_participant_filter = Condition::all()
.add(room_participant::Column::RoomId.eq(room_id))
.add(room_participant::Column::AnsweringConnectionId.is_not_null())
- .add(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()));
+ .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
let stale_participant_user_ids = room_participant::Entity::find()
.filter(stale_participant_filter.clone())
@@ -199,6 +233,42 @@ impl Database {
.await
}
+ pub async fn delete_stale_servers(
+ &self,
+ new_server_id: ServerId,
+ environment: &str,
+ ) -> Result<()> {
+ self.transaction(|tx| async move {
+ server::Entity::delete_many()
+ .filter(
+ Condition::all()
+ .add(server::Column::Environment.eq(environment))
+ .add(server::Column::Id.ne(new_server_id)),
+ )
+ .exec(&*tx)
+ .await?;
+ Ok(())
+ })
+ .await
+ }
+
+ async fn stale_server_ids(
+ &self,
+ environment: &str,
+ new_server_id: ServerId,
+ tx: &DatabaseTransaction,
+ ) -> Result<Vec<ServerId>> {
+ let stale_servers = server::Entity::find()
+ .filter(
+ Condition::all()
+ .add(server::Column::Environment.eq(environment))
+ .add(server::Column::Id.ne(new_server_id)),
+ )
+ .all(&*tx)
+ .await?;
+ Ok(stale_servers.into_iter().map(|server| server.id).collect())
+ }
+
// users
pub async fn create_user(
@@ -1076,7 +1146,7 @@ impl Database {
pub async fn create_room(
&self,
user_id: UserId,
- connection_id: ConnectionId,
+ connection: ConnectionId,
live_kit_room: &str,
) -> Result<RoomGuard<proto::Room>> {
self.room_transaction(|tx| async move {
@@ -1091,12 +1161,16 @@ impl Database {
room_participant::ActiveModel {
room_id: ActiveValue::set(room_id),
user_id: ActiveValue::set(user_id),
- answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
- answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
+ answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
+ answering_connection_server_id: ActiveValue::set(Some(ServerId(
+ connection.owner_id as i32,
+ ))),
answering_connection_lost: ActiveValue::set(false),
calling_user_id: ActiveValue::set(user_id),
- calling_connection_id: ActiveValue::set(connection_id.0 as i32),
- calling_connection_epoch: ActiveValue::set(self.epoch()),
+ calling_connection_id: ActiveValue::set(connection.id as i32),
+ calling_connection_server_id: ActiveValue::set(Some(ServerId(
+ connection.owner_id as i32,
+ ))),
..Default::default()
}
.insert(&*tx)
@@ -1112,7 +1186,7 @@ impl Database {
&self,
room_id: RoomId,
calling_user_id: UserId,
- calling_connection_id: ConnectionId,
+ calling_connection: ConnectionId,
called_user_id: UserId,
initial_project_id: Option<ProjectId>,
) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
@@ -1122,8 +1196,10 @@ impl Database {
user_id: ActiveValue::set(called_user_id),
answering_connection_lost: ActiveValue::set(false),
calling_user_id: ActiveValue::set(calling_user_id),
- calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
- calling_connection_epoch: ActiveValue::set(self.epoch()),
+ calling_connection_id: ActiveValue::set(calling_connection.id as i32),
+ calling_connection_server_id: ActiveValue::set(Some(ServerId(
+ calling_connection.owner_id as i32,
+ ))),
initial_project_id: ActiveValue::set(initial_project_id),
..Default::default()
}
@@ -1162,57 +1238,64 @@ impl Database {
&self,
expected_room_id: Option<RoomId>,
user_id: UserId,
- ) -> Result<RoomGuard<proto::Room>> {
- self.room_transaction(|tx| async move {
+ ) -> Result<Option<RoomGuard<proto::Room>>> {
+ self.optional_room_transaction(|tx| async move {
+ let mut filter = Condition::all()
+ .add(room_participant::Column::UserId.eq(user_id))
+ .add(room_participant::Column::AnsweringConnectionId.is_null());
+ if let Some(room_id) = expected_room_id {
+ filter = filter.add(room_participant::Column::RoomId.eq(room_id));
+ }
let participant = room_participant::Entity::find()
- .filter(
- room_participant::Column::UserId
- .eq(user_id)
- .and(room_participant::Column::AnsweringConnectionId.is_null()),
- )
+ .filter(filter)
.one(&*tx)
- .await?
- .ok_or_else(|| anyhow!("could not decline call"))?;
- let room_id = participant.room_id;
+ .await?;
- if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
- return Err(anyhow!("declining call on unexpected room"))?;
- }
+ let participant = if let Some(participant) = participant {
+ participant
+ } else if expected_room_id.is_some() {
+ return Err(anyhow!("could not find call to decline"))?;
+ } else {
+ return Ok(None);
+ };
+ let room_id = participant.room_id;
room_participant::Entity::delete(participant.into_active_model())
.exec(&*tx)
.await?;
let room = self.get_room(room_id, &tx).await?;
- Ok((room_id, room))
+ Ok(Some((room_id, room)))
})
.await
}
pub async fn cancel_call(
&self,
- expected_room_id: Option<RoomId>,
- calling_connection_id: ConnectionId,
+ room_id: RoomId,
+ calling_connection: ConnectionId,
called_user_id: UserId,
) -> Result<RoomGuard<proto::Room>> {
self.room_transaction(|tx| async move {
let participant = room_participant::Entity::find()
.filter(
- room_participant::Column::UserId
- .eq(called_user_id)
- .and(
+ Condition::all()
+ .add(room_participant::Column::UserId.eq(called_user_id))
+ .add(room_participant::Column::RoomId.eq(room_id))
+ .add(
room_participant::Column::CallingConnectionId
- .eq(calling_connection_id.0 as i32),
+ .eq(calling_connection.id as i32),
)
- .and(room_participant::Column::AnsweringConnectionId.is_null()),
+ .add(
+ room_participant::Column::CallingConnectionServerId
+ .eq(calling_connection.owner_id as i32),
+ )
+ .add(room_participant::Column::AnsweringConnectionId.is_null()),
)
.one(&*tx)
.await?
- .ok_or_else(|| anyhow!("could not cancel call"))?;
+ .ok_or_else(|| anyhow!("no call to cancel"))?;
let room_id = participant.room_id;
- if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
- return Err(anyhow!("canceling call on unexpected room"))?;
- }
room_participant::Entity::delete(participant.into_active_model())
.exec(&*tx)
@@ -1228,7 +1311,7 @@ impl Database {
&self,
room_id: RoomId,
user_id: UserId,
- connection_id: ConnectionId,
+ connection: ConnectionId,
) -> Result<RoomGuard<proto::Room>> {
self.room_transaction(|tx| async move {
let result = room_participant::Entity::update_many()
@@ -1241,14 +1324,16 @@ impl Database {
.add(room_participant::Column::AnsweringConnectionId.is_null())
.add(room_participant::Column::AnsweringConnectionLost.eq(true))
.add(
- room_participant::Column::AnsweringConnectionEpoch
- .ne(self.epoch()),
+ room_participant::Column::AnsweringConnectionServerId
+ .ne(connection.owner_id as i32),
),
),
)
.set(room_participant::ActiveModel {
- answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
- answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
+ answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
+ answering_connection_server_id: ActiveValue::set(Some(ServerId(
+ connection.owner_id as i32,
+ ))),
answering_connection_lost: ActiveValue::set(false),
..Default::default()
})
@@ -1264,10 +1349,23 @@ impl Database {
.await
}
- pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
- self.room_transaction(|tx| async move {
+ pub async fn leave_room(
+ &self,
+ connection: ConnectionId,
+ ) -> Result<Option<RoomGuard<LeftRoom>>> {
+ self.optional_room_transaction(|tx| async move {
let leaving_participant = room_participant::Entity::find()
- .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(
+ room_participant::Column::AnsweringConnectionId
+ .eq(connection.id as i32),
+ )
+ .add(
+ room_participant::Column::AnsweringConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
.one(&*tx)
.await?;
@@ -1281,9 +1379,16 @@ impl Database {
// Cancel pending calls initiated by the leaving user.
let called_participants = room_participant::Entity::find()
.filter(
- room_participant::Column::CallingConnectionId
- .eq(connection_id.0)
- .and(room_participant::Column::AnsweringConnectionId.is_null()),
+ Condition::all()
+ .add(
+ room_participant::Column::CallingConnectionId
+ .eq(connection.id as i32),
+ )
+ .add(
+ room_participant::Column::CallingConnectionServerId
+ .eq(connection.owner_id as i32),
+ )
+ .add(room_participant::Column::AnsweringConnectionId.is_null()),
)
.all(&*tx)
.await?;
@@ -1310,7 +1415,16 @@ impl Database {
project_collaborator::Column::ProjectId,
QueryProjectIds::ProjectId,
)
- .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(
+ project_collaborator::Column::ConnectionId.eq(connection.id as i32),
+ )
+ .add(
+ project_collaborator::Column::ConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
.into_values::<_, QueryProjectIds>()
.all(&*tx)
.await?;
@@ -1331,32 +1445,46 @@ impl Database {
host_connection_id: Default::default(),
});
- let collaborator_connection_id =
- ConnectionId(collaborator.connection_id as u32);
- if collaborator_connection_id != connection_id {
+ let collaborator_connection_id = ConnectionId {
+ owner_id: collaborator.connection_server_id.0 as u32,
+ id: collaborator.connection_id as u32,
+ };
+ if collaborator_connection_id != connection {
left_project.connection_ids.push(collaborator_connection_id);
}
if collaborator.is_host {
left_project.host_user_id = collaborator.user_id;
- left_project.host_connection_id =
- ConnectionId(collaborator.connection_id as u32);
+ left_project.host_connection_id = collaborator_connection_id;
}
}
drop(collaborators);
// Leave projects.
project_collaborator::Entity::delete_many()
- .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(
+ project_collaborator::Column::ConnectionId.eq(connection.id as i32),
+ )
+ .add(
+ project_collaborator::Column::ConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
.exec(&*tx)
.await?;
// Unshare projects.
project::Entity::delete_many()
.filter(
- project::Column::RoomId
- .eq(room_id)
- .and(project::Column::HostConnectionId.eq(connection_id.0 as i32)),
+ Condition::all()
+ .add(project::Column::RoomId.eq(room_id))
+ .add(project::Column::HostConnectionId.eq(connection.id as i32))
+ .add(
+ project::Column::HostConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
)
.exec(&*tx)
.await?;
@@ -1376,9 +1504,9 @@ impl Database {
self.rooms.remove(&room_id);
}
- Ok((room_id, left_room))
+ Ok(Some((room_id, left_room)))
} else {
- Err(anyhow!("could not leave room"))?
+ Ok(None)
}
})
.await
@@ -1387,7 +1515,7 @@ impl Database {
pub async fn update_room_participant_location(
&self,
room_id: RoomId,
- connection_id: ConnectionId,
+ connection: ConnectionId,
location: proto::ParticipantLocation,
) -> Result<RoomGuard<proto::Room>> {
self.room_transaction(|tx| async {
@@ -1414,9 +1542,18 @@ impl Database {
}
let result = room_participant::Entity::update_many()
- .filter(room_participant::Column::RoomId.eq(room_id).and(
- room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32),
- ))
+ .filter(
+ Condition::all()
+ .add(room_participant::Column::RoomId.eq(room_id))
+ .add(
+ room_participant::Column::AnsweringConnectionId
+ .eq(connection.id as i32),
+ )
+ .add(
+ room_participant::Column::AnsweringConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
.set(room_participant::ActiveModel {
location_kind: ActiveValue::set(Some(location_kind)),
location_project_id: ActiveValue::set(location_project_id),
@@ -1437,11 +1574,21 @@ impl Database {
pub async fn connection_lost(
&self,
- connection_id: ConnectionId,
+ connection: ConnectionId,
) -> Result<RoomGuard<Vec<LeftProject>>> {
self.room_transaction(|tx| async move {
let participant = room_participant::Entity::find()
- .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(
+ room_participant::Column::AnsweringConnectionId
+ .eq(connection.id as i32),
+ )
+ .add(
+ room_participant::Column::AnsweringConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("not a participant in any room"))?;
@@ -1456,11 +1603,25 @@ impl Database {
let collaborator_on_projects = project_collaborator::Entity::find()
.find_also_related(project::Entity)
- .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
+ .add(
+ project_collaborator::Column::ConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
.all(&*tx)
.await?;
project_collaborator::Entity::delete_many()
- .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
+ .add(
+ project_collaborator::Column::ConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
.exec(&*tx)
.await?;
@@ -1473,20 +1634,29 @@ impl Database {
.await?;
let connection_ids = collaborators
.into_iter()
- .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
+ .map(|collaborator| ConnectionId {
+ id: collaborator.connection_id as u32,
+ owner_id: collaborator.connection_server_id.0 as u32,
+ })
.collect();
left_projects.push(LeftProject {
id: project.id,
host_user_id: project.host_user_id,
- host_connection_id: ConnectionId(project.host_connection_id as u32),
+ host_connection_id: project.host_connection()?,
connection_ids,
});
}
}
project::Entity::delete_many()
- .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(project::Column::HostConnectionId.eq(connection.id as i32))
+ .add(
+ project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
+ ),
+ )
.exec(&*tx)
.await?;
@@ -1537,7 +1707,10 @@ impl Database {
let mut pending_participants = Vec::new();
while let Some(db_participant) = db_participants.next().await {
let db_participant = db_participant?;
- if let Some(answering_connection_id) = db_participant.answering_connection_id {
+ if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
+ .answering_connection_id
+ .zip(db_participant.answering_connection_server_id)
+ {
let location = match (
db_participant.location_kind,
db_participant.location_project_id,
@@ -1556,11 +1729,16 @@ impl Database {
Default::default(),
)),
};
+
+ let answering_connection = ConnectionId {
+ owner_id: answering_connection_server_id.0 as u32,
+ id: answering_connection_id as u32,
+ };
participants.insert(
- answering_connection_id,
+ answering_connection,
proto::Participant {
user_id: db_participant.user_id.to_proto(),
- peer_id: answering_connection_id as u32,
+ peer_id: Some(answering_connection.into()),
projects: Default::default(),
location: Some(proto::ParticipantLocation { variant: location }),
},
@@ -1583,7 +1761,8 @@ impl Database {
while let Some(row) = db_projects.next().await {
let (db_project, db_worktree) = row?;
- if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
+ let host_connection = db_project.host_connection()?;
+ if let Some(participant) = participants.get_mut(&host_connection) {
let project = if let Some(project) = participant
.projects
.iter_mut()
@@ -1637,12 +1816,22 @@ impl Database {
pub async fn share_project(
&self,
room_id: RoomId,
- connection_id: ConnectionId,
+ connection: ConnectionId,
worktrees: &[proto::WorktreeMetadata],
) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
self.room_transaction(|tx| async move {
let participant = room_participant::Entity::find()
- .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(
+ room_participant::Column::AnsweringConnectionId
+ .eq(connection.id as i32),
+ )
+ .add(
+ room_participant::Column::AnsweringConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("could not find participant"))?;
@@ -1653,8 +1842,10 @@ impl Database {
let project = project::ActiveModel {
room_id: ActiveValue::set(participant.room_id),
host_user_id: ActiveValue::set(participant.user_id),
- host_connection_id: ActiveValue::set(connection_id.0 as i32),
- host_connection_epoch: ActiveValue::set(self.epoch()),
+ host_connection_id: ActiveValue::set(Some(connection.id as i32)),
+ host_connection_server_id: ActiveValue::set(Some(ServerId(
+ connection.owner_id as i32,
+ ))),
..Default::default()
}
.insert(&*tx)
@@ -1678,8 +1869,8 @@ impl Database {
project_collaborator::ActiveModel {
project_id: ActiveValue::set(project.id),
- connection_id: ActiveValue::set(connection_id.0 as i32),
- connection_epoch: ActiveValue::set(self.epoch()),
+ connection_id: ActiveValue::set(connection.id as i32),
+ connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
user_id: ActiveValue::set(participant.user_id),
replica_id: ActiveValue::set(ReplicaId(0)),
is_host: ActiveValue::set(true),
@@ -1697,7 +1888,7 @@ impl Database {
pub async fn unshare_project(
&self,
project_id: ProjectId,
- connection_id: ConnectionId,
+ connection: ConnectionId,
) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
self.room_transaction(|tx| async move {
let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
@@ -1706,7 +1897,7 @@ impl Database {
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("project not found"))?;
- if project.host_connection_id == connection_id.0 as i32 {
+ if project.host_connection()? == connection {
let room_id = project.room_id;
project::Entity::delete(project.into_active_model())
.exec(&*tx)
@@ -1723,12 +1914,18 @@ impl Database {
pub async fn update_project(
&self,
project_id: ProjectId,
- connection_id: ConnectionId,
+ connection: ConnectionId,
worktrees: &[proto::WorktreeMetadata],
) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
self.room_transaction(|tx| async move {
let project = project::Entity::find_by_id(project_id)
- .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(project::Column::HostConnectionId.eq(connection.id as i32))
+ .add(
+ project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
+ ),
+ )
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
@@ -1774,7 +1971,7 @@ impl Database {
pub async fn update_worktree(
&self,
update: &proto::UpdateWorktree,
- connection_id: ConnectionId,
+ connection: ConnectionId,
) -> Result<RoomGuard<Vec<ConnectionId>>> {
self.room_transaction(|tx| async move {
let project_id = ProjectId::from_proto(update.project_id);
@@ -1782,7 +1979,13 @@ impl Database {
// Ensure the update comes from the host.
let project = project::Entity::find_by_id(project_id)
- .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(project::Column::HostConnectionId.eq(connection.id as i32))
+ .add(
+ project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
+ ),
+ )
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
@@ -1862,7 +2065,7 @@ impl Database {
pub async fn update_diagnostic_summary(
&self,
update: &proto::UpdateDiagnosticSummary,
- connection_id: ConnectionId,
+ connection: ConnectionId,
) -> Result<RoomGuard<Vec<ConnectionId>>> {
self.room_transaction(|tx| async move {
let project_id = ProjectId::from_proto(update.project_id);
@@ -1877,7 +2080,7 @@ impl Database {
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
- if project.host_connection_id != connection_id.0 as i32 {
+ if project.host_connection()? != connection {
return Err(anyhow!("can't update a project hosted by someone else"))?;
}
@@ -1916,7 +2119,7 @@ impl Database {
pub async fn start_language_server(
&self,
update: &proto::StartLanguageServer,
- connection_id: ConnectionId,
+ connection: ConnectionId,
) -> Result<RoomGuard<Vec<ConnectionId>>> {
self.room_transaction(|tx| async move {
let project_id = ProjectId::from_proto(update.project_id);
@@ -1930,7 +2133,7 @@ impl Database {
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
- if project.host_connection_id != connection_id.0 as i32 {
+ if project.host_connection()? != connection {
return Err(anyhow!("can't update a project hosted by someone else"))?;
}
@@ -1961,11 +2164,21 @@ impl Database {
pub async fn join_project(
&self,
project_id: ProjectId,
- connection_id: ConnectionId,
+ connection: ConnectionId,
) -> Result<RoomGuard<(Project, ReplicaId)>> {
self.room_transaction(|tx| async move {
let participant = room_participant::Entity::find()
- .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
+ .filter(
+ Condition::all()
+ .add(
+ room_participant::Column::AnsweringConnectionId
+ .eq(connection.id as i32),
+ )
+ .add(
+ room_participant::Column::AnsweringConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
+ )
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("must join a room first"))?;
@@ -1992,8 +2205,8 @@ impl Database {
}
let new_collaborator = project_collaborator::ActiveModel {
project_id: ActiveValue::set(project_id),
- connection_id: ActiveValue::set(connection_id.0 as i32),
- connection_epoch: ActiveValue::set(self.epoch()),
+ connection_id: ActiveValue::set(connection.id as i32),
+ connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
user_id: ActiveValue::set(participant.user_id),
replica_id: ActiveValue::set(replica_id),
is_host: ActiveValue::set(false),
@@ -2095,14 +2308,18 @@ impl Database {
pub async fn leave_project(
&self,
project_id: ProjectId,
- connection_id: ConnectionId,
+ connection: ConnectionId,
) -> Result<RoomGuard<LeftProject>> {
self.room_transaction(|tx| async move {
let result = project_collaborator::Entity::delete_many()
.filter(
- project_collaborator::Column::ProjectId
- .eq(project_id)
- .and(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)),
+ Condition::all()
+ .add(project_collaborator::Column::ProjectId.eq(project_id))
+ .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
+ .add(
+ project_collaborator::Column::ConnectionServerId
+ .eq(connection.owner_id as i32),
+ ),
)
.exec(&*tx)
.await?;
@@ -2120,13 +2337,16 @@ impl Database {
.await?;
let connection_ids = collaborators
.into_iter()
- .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
+ .map(|collaborator| ConnectionId {
+ owner_id: collaborator.connection_server_id.0 as u32,
+ id: collaborator.connection_id as u32,
+ })
.collect();
let left_project = LeftProject {
id: project_id,
host_user_id: project.host_user_id,
- host_connection_id: ConnectionId(project.host_connection_id as u32),
+ host_connection_id: project.host_connection()?,
connection_ids,
};
Ok((project.room_id, left_project))
@@ -2137,7 +2357,7 @@ impl Database {
pub async fn project_collaborators(
&self,
project_id: ProjectId,
- connection_id: ConnectionId,
+ connection: ConnectionId,
) -> Result<RoomGuard<Vec<project_collaborator::Model>>> {
self.room_transaction(|tx| async move {
let project = project::Entity::find_by_id(project_id)
@@ -2149,10 +2369,13 @@ impl Database {
.all(&*tx)
.await?;
- if collaborators
- .iter()
- .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
- {
+ if collaborators.iter().any(|collaborator| {
+ let collaborator_connection = ConnectionId {
+ owner_id: collaborator.connection_server_id.0 as u32,
+ id: collaborator.connection_id as u32,
+ };
+ collaborator_connection == connection
+ }) {
Ok((project.room_id, collaborators))
} else {
Err(anyhow!("no such project"))?
@@ -2167,29 +2390,22 @@ impl Database {
connection_id: ConnectionId,
) -> Result<RoomGuard<HashSet<ConnectionId>>> {
self.room_transaction(|tx| async move {
- #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
- enum QueryAs {
- ConnectionId,
- }
-
let project = project::Entity::find_by_id(project_id)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
- let mut db_connection_ids = project_collaborator::Entity::find()
- .select_only()
- .column_as(
- project_collaborator::Column::ConnectionId,
- QueryAs::ConnectionId,
- )
+ let mut participants = project_collaborator::Entity::find()
.filter(project_collaborator::Column::ProjectId.eq(project_id))
- .into_values::<i32, QueryAs>()
.stream(&*tx)
.await?;
let mut connection_ids = HashSet::default();
- while let Some(connection_id) = db_connection_ids.next().await {
- connection_ids.insert(ConnectionId(connection_id? as u32));
+ while let Some(participant) = participants.next().await {
+ let participant = participant?;
+ connection_ids.insert(ConnectionId {
+ owner_id: participant.connection_server_id.0 as u32,
+ id: participant.connection_id as u32,
+ });
}
if connection_ids.contains(&connection_id) {
@@ -2206,29 +2422,22 @@ impl Database {
project_id: ProjectId,
tx: &DatabaseTransaction,
) -> Result<Vec<ConnectionId>> {
- #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
- enum QueryAs {
- ConnectionId,
- }
-
- let mut db_guest_connection_ids = project_collaborator::Entity::find()
- .select_only()
- .column_as(
- project_collaborator::Column::ConnectionId,
- QueryAs::ConnectionId,
- )
+ let mut participants = project_collaborator::Entity::find()
.filter(
project_collaborator::Column::ProjectId
.eq(project_id)
.and(project_collaborator::Column::IsHost.eq(false)),
)
- .into_values::<i32, QueryAs>()
.stream(tx)
.await?;
let mut guest_connection_ids = Vec::new();
- while let Some(connection_id) = db_guest_connection_ids.next().await {
- guest_connection_ids.push(ConnectionId(connection_id? as u32));
+ while let Some(participant) = participants.next().await {
+ let participant = participant?;
+ guest_connection_ids.push(ConnectionId {
+ owner_id: participant.connection_server_id.0 as u32,
+ id: participant.connection_id as u32,
+ });
}
Ok(guest_connection_ids)
}
@@ -2327,25 +2536,25 @@ impl Database {
self.run(body).await
}
- async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
+ async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
where
F: Send + Fn(TransactionHandle) -> Fut,
- Fut: Send + Future<Output = Result<(RoomId, T)>>,
+ Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
{
let body = async {
loop {
let (tx, result) = self.with_transaction(&f).await?;
match result {
- Ok((room_id, data)) => {
+ Ok(Some((room_id, data))) => {
let lock = self.rooms.entry(room_id).or_default().clone();
let _guard = lock.lock_owned().await;
match tx.commit().await.map_err(Into::into) {
Ok(()) => {
- return Ok(RoomGuard {
+ return Ok(Some(RoomGuard {
data,
_guard,
_not_send: PhantomData,
- });
+ }));
}
Err(error) => {
if is_serialization_error(&error) {
@@ -2356,6 +2565,18 @@ impl Database {
}
}
}
+ Ok(None) => {
+ match tx.commit().await.map_err(Into::into) {
+ Ok(()) => return Ok(None),
+ Err(error) => {
+ if is_serialization_error(&error) {
+ // Retry (don't break the loop)
+ } else {
+ return Err(error);
+ }
+ }
+ }
+ }
Err(error) => {
tx.rollback().await?;
if is_serialization_error(&error) {
@@ -1,4 +1,6 @@
-use super::{ProjectId, RoomId, UserId};
+use super::{ProjectId, Result, RoomId, ServerId, UserId};
+use anyhow::anyhow;
+use rpc::ConnectionId;
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
@@ -8,8 +10,23 @@ pub struct Model {
pub id: ProjectId,
pub room_id: RoomId,
pub host_user_id: UserId,
- pub host_connection_id: i32,
- pub host_connection_epoch: Uuid,
+ pub host_connection_id: Option<i32>,
+ pub host_connection_server_id: Option<ServerId>,
+}
+
+impl Model {
+ pub fn host_connection(&self) -> Result<ConnectionId> {
+ let host_connection_server_id = self
+ .host_connection_server_id
+ .ok_or_else(|| anyhow!("empty host_connection_server_id"))?;
+ let host_connection_id = self
+ .host_connection_id
+ .ok_or_else(|| anyhow!("empty host_connection_id"))?;
+ Ok(ConnectionId {
+ owner_id: host_connection_server_id.0 as u32,
+ id: host_connection_id as u32,
+ })
+ }
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@@ -1,4 +1,4 @@
-use super::{ProjectCollaboratorId, ProjectId, ReplicaId, UserId};
+use super::{ProjectCollaboratorId, ProjectId, ReplicaId, ServerId, UserId};
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
@@ -8,7 +8,7 @@ pub struct Model {
pub id: ProjectCollaboratorId,
pub project_id: ProjectId,
pub connection_id: i32,
- pub connection_epoch: Uuid,
+ pub connection_server_id: ServerId,
pub user_id: UserId,
pub replica_id: ReplicaId,
pub is_host: bool,
@@ -1,4 +1,4 @@
-use super::{ProjectId, RoomId, RoomParticipantId, UserId};
+use super::{ProjectId, RoomId, RoomParticipantId, ServerId, UserId};
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
@@ -9,14 +9,14 @@ pub struct Model {
pub room_id: RoomId,
pub user_id: UserId,
pub answering_connection_id: Option<i32>,
- pub answering_connection_epoch: Option<Uuid>,
+ pub answering_connection_server_id: Option<ServerId>,
pub answering_connection_lost: bool,
pub location_kind: Option<i32>,
pub location_project_id: Option<ProjectId>,
pub initial_project_id: Option<ProjectId>,
pub calling_user_id: UserId,
pub calling_connection_id: i32,
- pub calling_connection_epoch: Uuid,
+ pub calling_connection_server_id: Option<ServerId>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@@ -0,0 +1,15 @@
+use super::ServerId;
+use sea_orm::entity::prelude::*;
+
+#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
+#[sea_orm(table_name = "servers")]
+pub struct Model {
+ #[sea_orm(primary_key)]
+ pub id: ServerId,
+ pub environment: String,
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {}
+
+impl ActiveModelBehavior for ActiveModel {}
@@ -410,6 +410,8 @@ test_both_dbs!(
test_project_count_sqlite,
db,
{
+ let owner_id = db.create_server("test").await.unwrap().0 as u32;
+
let user1 = db
.create_user(
&format!("admin@example.com"),
@@ -436,36 +438,44 @@ test_both_dbs!(
.unwrap();
let room_id = RoomId::from_proto(
- db.create_room(user1.user_id, ConnectionId(0), "")
+ db.create_room(user1.user_id, ConnectionId { owner_id, id: 0 }, "")
.await
.unwrap()
.id,
);
- db.call(room_id, user1.user_id, ConnectionId(0), user2.user_id, None)
- .await
- .unwrap();
- db.join_room(room_id, user2.user_id, ConnectionId(1))
+ db.call(
+ room_id,
+ user1.user_id,
+ ConnectionId { owner_id, id: 0 },
+ user2.user_id,
+ None,
+ )
+ .await
+ .unwrap();
+ db.join_room(room_id, user2.user_id, ConnectionId { owner_id, id: 1 })
.await
.unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0);
- db.share_project(room_id, ConnectionId(1), &[])
+ db.share_project(room_id, ConnectionId { owner_id, id: 1 }, &[])
.await
.unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 1);
- db.share_project(room_id, ConnectionId(1), &[])
+ db.share_project(room_id, ConnectionId { owner_id, id: 1 }, &[])
.await
.unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2);
// Projects shared by admins aren't counted.
- db.share_project(room_id, ConnectionId(0), &[])
+ db.share_project(room_id, ConnectionId { owner_id, id: 0 }, &[])
.await
.unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2);
- db.leave_room(ConnectionId(1)).await.unwrap();
+ db.leave_room(ConnectionId { owner_id, id: 1 })
+ .await
+ .unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0);
}
);
@@ -7,8 +7,8 @@ use crate::{
use anyhow::anyhow;
use call::{room, ActiveCall, ParticipantLocation, Room};
use client::{
- self, test::FakeHttpClient, Client, Connection, Credentials, EstablishConnectionError, PeerId,
- User, UserStore, RECEIVE_TIMEOUT,
+ self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials,
+ EstablishConnectionError, User, UserStore, RECEIVE_TIMEOUT,
};
use collections::{BTreeMap, HashMap, HashSet};
use editor::{
@@ -608,7 +608,7 @@ async fn test_server_restarts(
);
// The server is torn down.
- server.teardown();
+ server.reset().await;
// Users A and B reconnect to the call. User C has troubles reconnecting, so it leaves the room.
client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
@@ -778,7 +778,7 @@ async fn test_server_restarts(
);
// The server is torn down.
- server.teardown();
+ server.reset().await;
// Users A and B have troubles reconnecting, so they leave the room.
client_a.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
@@ -6125,7 +6125,7 @@ async fn test_random_collaboration(
.user_connection_ids(removed_guest_id)
.collect::<Vec<_>>();
assert_eq!(user_connection_ids.len(), 1);
- let removed_peer_id = PeerId(user_connection_ids[0].0);
+ let removed_peer_id = user_connection_ids[0].into();
let guest = clients.remove(guest_ix);
op_start_signals.remove(guest_ix);
server.forbid_connections();
@@ -6174,17 +6174,25 @@ async fn test_random_collaboration(
.user_connection_ids(user_id)
.collect::<Vec<_>>();
assert_eq!(user_connection_ids.len(), 1);
- let peer_id = PeerId(user_connection_ids[0].0);
+ let peer_id = user_connection_ids[0].into();
server.disconnect_client(peer_id);
deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
operations += 1;
}
30..=34 => {
log::info!("Simulating server restart");
- server.teardown();
- deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+ server.reset().await;
+ deterministic.advance_clock(RECEIVE_TIMEOUT);
server.start().await.unwrap();
deterministic.advance_clock(CLEANUP_TIMEOUT);
+ let environment = &server.app_state.config.zed_environment;
+ let stale_room_ids = server
+ .app_state
+ .db
+ .stale_room_ids(environment, server.id())
+ .await
+ .unwrap();
+ assert_eq!(stale_room_ids, vec![]);
}
_ if !op_start_signals.is_empty() => {
while operations < max_operations && rng.lock().gen_bool(0.7) {
@@ -6379,7 +6387,13 @@ impl TestServer {
)
.unwrap();
let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
+ let epoch = app_state
+ .db
+ .create_server(&app_state.config.zed_environment)
+ .await
+ .unwrap();
let server = Server::new(
+ epoch,
app_state.clone(),
Executor::Deterministic(deterministic.build_background()),
);
@@ -6396,9 +6410,15 @@ impl TestServer {
}
}
- fn teardown(&self) {
- self.server.teardown();
+ async fn reset(&self) {
self.app_state.db.reset();
+ let epoch = self
+ .app_state
+ .db
+ .create_server(&self.app_state.config.zed_environment)
+ .await
+ .unwrap();
+ self.server.reset(epoch);
}
async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
@@ -6488,7 +6508,7 @@ impl TestServer {
let connection_id = connection_id_rx.await.unwrap();
connection_killers
.lock()
- .insert(PeerId(connection_id.0), killed);
+ .insert(connection_id.into(), killed);
Ok(client_conn)
}
})
@@ -7310,7 +7330,7 @@ impl TestClient {
impl Drop for TestClient {
fn drop(&mut self) {
- self.client.tear_down();
+ self.client.teardown();
}
}
@@ -97,6 +97,7 @@ pub struct Config {
pub live_kit_secret: Option<String>,
pub rust_log: Option<String>,
pub log_json: Option<bool>,
+ pub zed_environment: String,
}
#[derive(Default, Deserialize)]
@@ -7,6 +7,7 @@ use std::{
net::{SocketAddr, TcpListener},
path::Path,
};
+use tokio::signal::unix::SignalKind;
use tracing_log::LogTracer;
use tracing_subscriber::{filter::EnvFilter, fmt::format::JsonFields, Layer};
use util::ResultExt;
@@ -56,7 +57,11 @@ async fn main() -> Result<()> {
let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port))
.expect("failed to bind TCP listener");
- let rpc_server = collab::rpc::Server::new(state.clone(), Executor::Production);
+ let epoch = state
+ .db
+ .create_server(&state.config.zed_environment)
+ .await?;
+ let rpc_server = collab::rpc::Server::new(epoch, state.clone(), Executor::Production);
rpc_server.start().await?;
let app = collab::api::routes(rpc_server.clone(), state.clone())
@@ -66,9 +71,15 @@ async fn main() -> Result<()> {
axum::Server::from_tcp(listener)?
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.with_graceful_shutdown(async move {
- tokio::signal::ctrl_c()
- .await
+ let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
+ .expect("failed to listen for interrupt signal");
+ let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
.expect("failed to listen for interrupt signal");
+ let sigterm = sigterm.recv();
+ let sigint = sigint.recv();
+ futures::pin_mut!(sigterm, sigint);
+ futures::future::select(sigterm, sigint).await;
+ tracing::info!("Received interrupt signal");
rpc_server.teardown();
})
.await?;
@@ -2,7 +2,7 @@ mod connection_pool;
use crate::{
auth,
- db::{self, Database, ProjectId, RoomId, User, UserId},
+ db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
executor::Executor,
AppState, Result,
};
@@ -138,6 +138,7 @@ impl Deref for DbHandle {
}
pub struct Server {
+ id: parking_lot::Mutex<ServerId>,
peer: Arc<Peer>,
pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
app_state: Arc<AppState>,
@@ -168,9 +169,10 @@ where
}
impl Server {
- pub fn new(app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
+ pub fn new(id: ServerId, app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
let mut server = Self {
- peer: Peer::new(),
+ id: parking_lot::Mutex::new(id),
+ peer: Peer::new(id.0 as u32),
app_state,
executor,
connection_pool: Default::default(),
@@ -239,97 +241,146 @@ impl Server {
}
pub async fn start(&self) -> Result<()> {
- self.app_state.db.delete_stale_projects().await?;
- let db = self.app_state.db.clone();
+ let server_id = *self.id.lock();
+ let app_state = self.app_state.clone();
let peer = self.peer.clone();
let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
let pool = self.connection_pool.clone();
let live_kit_client = self.app_state.live_kit_client.clone();
- self.executor.spawn_detached(async move {
- timeout.await;
- if let Some(room_ids) = db.stale_room_ids().await.trace_err() {
- for room_id in room_ids {
- let mut contacts_to_update = HashSet::default();
- let mut canceled_calls_to_user_ids = Vec::new();
- let mut live_kit_room = String::new();
- let mut delete_live_kit_room = false;
-
- if let Ok(mut refreshed_room) = db.refresh_room(room_id).await {
- room_updated(&refreshed_room.room, &peer);
- contacts_to_update
- .extend(refreshed_room.stale_participant_user_ids.iter().copied());
- contacts_to_update
- .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
- canceled_calls_to_user_ids =
- mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
- live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
- delete_live_kit_room = refreshed_room.room.participants.is_empty();
- }
- {
- let pool = pool.lock();
- for canceled_user_id in canceled_calls_to_user_ids {
- for connection_id in pool.user_connection_ids(canceled_user_id) {
- peer.send(
- connection_id,
- proto::CallCanceled {
- room_id: room_id.to_proto(),
- },
- )
- .trace_err();
- }
+ let span = info_span!("start server");
+ let span_enter = span.enter();
+
+ tracing::info!("begin deleting stale projects");
+ app_state
+ .db
+ .delete_stale_projects(&app_state.config.zed_environment, server_id)
+ .await?;
+ tracing::info!("finish deleting stale projects");
+
+ drop(span_enter);
+ self.executor.spawn_detached(
+ async move {
+ tracing::info!("waiting for cleanup timeout");
+ timeout.await;
+ tracing::info!("cleanup timeout expired, retrieving stale rooms");
+ if let Some(room_ids) = app_state
+ .db
+ .stale_room_ids(&app_state.config.zed_environment, server_id)
+ .await
+ .trace_err()
+ {
+ tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
+ for room_id in room_ids {
+ let mut contacts_to_update = HashSet::default();
+ let mut canceled_calls_to_user_ids = Vec::new();
+ let mut live_kit_room = String::new();
+ let mut delete_live_kit_room = false;
+
+ if let Ok(mut refreshed_room) =
+ app_state.db.refresh_room(room_id, server_id).await
+ {
+ tracing::info!(
+ room_id = room_id.0,
+ new_participant_count = refreshed_room.room.participants.len(),
+ "refreshed room"
+ );
+ room_updated(&refreshed_room.room, &peer);
+ contacts_to_update
+ .extend(refreshed_room.stale_participant_user_ids.iter().copied());
+ contacts_to_update
+ .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
+ canceled_calls_to_user_ids =
+ mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
+ live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
+ delete_live_kit_room = refreshed_room.room.participants.is_empty();
}
- }
- for user_id in contacts_to_update {
- let busy = db.is_user_busy(user_id).await.trace_err();
- let contacts = db.get_contacts(user_id).await.trace_err();
- if let Some((busy, contacts)) = busy.zip(contacts) {
+ {
let pool = pool.lock();
- let updated_contact = contact_for_user(user_id, false, busy, &pool);
- for contact in contacts {
- if let db::Contact::Accepted {
- user_id: contact_user_id,
- ..
- } = contact
- {
- for contact_conn_id in pool.user_connection_ids(contact_user_id)
+ for canceled_user_id in canceled_calls_to_user_ids {
+ for connection_id in pool.user_connection_ids(canceled_user_id) {
+ peer.send(
+ connection_id,
+ proto::CallCanceled {
+ room_id: room_id.to_proto(),
+ },
+ )
+ .trace_err();
+ }
+ }
+ }
+
+ for user_id in contacts_to_update {
+ let busy = app_state.db.is_user_busy(user_id).await.trace_err();
+ let contacts = app_state.db.get_contacts(user_id).await.trace_err();
+ if let Some((busy, contacts)) = busy.zip(contacts) {
+ let pool = pool.lock();
+ let updated_contact = contact_for_user(user_id, false, busy, &pool);
+ for contact in contacts {
+ if let db::Contact::Accepted {
+ user_id: contact_user_id,
+ ..
+ } = contact
{
- peer.send(
- contact_conn_id,
- proto::UpdateContacts {
- contacts: vec![updated_contact.clone()],
- remove_contacts: Default::default(),
- incoming_requests: Default::default(),
- remove_incoming_requests: Default::default(),
- outgoing_requests: Default::default(),
- remove_outgoing_requests: Default::default(),
- },
- )
- .trace_err();
+ for contact_conn_id in
+ pool.user_connection_ids(contact_user_id)
+ {
+ peer.send(
+ contact_conn_id,
+ proto::UpdateContacts {
+ contacts: vec![updated_contact.clone()],
+ remove_contacts: Default::default(),
+ incoming_requests: Default::default(),
+ remove_incoming_requests: Default::default(),
+ outgoing_requests: Default::default(),
+ remove_outgoing_requests: Default::default(),
+ },
+ )
+ .trace_err();
+ }
}
}
}
}
- }
- if let Some(live_kit) = live_kit_client.as_ref() {
- if delete_live_kit_room {
- live_kit.delete_room(live_kit_room).await.trace_err();
+ if let Some(live_kit) = live_kit_client.as_ref() {
+ if delete_live_kit_room {
+ live_kit.delete_room(live_kit_room).await.trace_err();
+ }
}
}
}
+
+ app_state
+ .db
+ .delete_stale_servers(server_id, &app_state.config.zed_environment)
+ .await
+ .trace_err();
}
- });
+ .instrument(span),
+ );
Ok(())
}
pub fn teardown(&self) {
- self.peer.reset();
+ self.peer.teardown();
self.connection_pool.lock().reset();
let _ = self.teardown.send(());
}
+ #[cfg(test)]
+ pub fn reset(&self, id: ServerId) {
+ self.teardown();
+ *self.id.lock() = id;
+ self.peer.reset(id.0 as u32);
+ }
+
+ #[cfg(test)]
+ pub fn id(&self) -> ServerId {
+ *self.id.lock()
+ }
+
fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
where
F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
@@ -438,7 +489,7 @@ impl Server {
});
tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
- this.peer.send(connection_id, proto::Hello { peer_id: connection_id.0 })?;
+ this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
if let Some(send_connection_id) = send_connection_id.take() {
@@ -769,7 +820,7 @@ async fn sign_out(
.is_user_online(session.user_id)
{
let db = session.db().await;
- if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() {
+ if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() {
room_updated(&room, &session.peer);
}
}
@@ -973,7 +1024,7 @@ async fn cancel_call(
let room = session
.db()
.await
- .cancel_call(Some(room_id), session.connection_id, called_user_id)
+ .cancel_call(room_id, session.connection_id, called_user_id)
.await?;
room_updated(&room, &session.peer);
}
@@ -1006,7 +1057,8 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<(
.db()
.await
.decline_call(Some(room_id), session.user_id)
- .await?;
+ .await?
+ .ok_or_else(|| anyhow!("failed to decline call"))?;
room_updated(&room, &session.peer);
}
@@ -1108,12 +1160,18 @@ async fn join_project(
let collaborators = project
.collaborators
.iter()
- .filter(|collaborator| collaborator.connection_id != session.connection_id.0 as i32)
- .map(|collaborator| proto::Collaborator {
- peer_id: collaborator.connection_id as u32,
- replica_id: collaborator.replica_id.0 as u32,
- user_id: collaborator.user_id.to_proto(),
+ .map(|collaborator| {
+ let peer_id = proto::PeerId {
+ owner_id: collaborator.connection_server_id.0 as u32,
+ id: collaborator.connection_id as u32,
+ };
+ proto::Collaborator {
+ peer_id: Some(peer_id),
+ replica_id: collaborator.replica_id.0 as u32,
+ user_id: collaborator.user_id.to_proto(),
+ }
})
+ .filter(|collaborator| collaborator.peer_id != Some(session.connection_id.into()))
.collect::<Vec<_>>();
let worktrees = project
.worktrees
@@ -1130,11 +1188,11 @@ async fn join_project(
session
.peer
.send(
- ConnectionId(collaborator.peer_id),
+ collaborator.peer_id.unwrap().into(),
proto::AddProjectCollaborator {
project_id: project_id.to_proto(),
collaborator: Some(proto::Collaborator {
- peer_id: session.connection_id.0,
+ peer_id: Some(session.connection_id.into()),
replica_id: replica_id.0 as u32,
user_id: guest_user_id.to_proto(),
}),
@@ -1355,13 +1413,14 @@ where
.await
.project_collaborators(project_id, session.connection_id)
.await?;
- ConnectionId(
- collaborators
- .iter()
- .find(|collaborator| collaborator.is_host)
- .ok_or_else(|| anyhow!("host not found"))?
- .connection_id as u32,
- )
+ let host = collaborators
+ .iter()
+ .find(|collaborator| collaborator.is_host)
+ .ok_or_else(|| anyhow!("host not found"))?;
+ ConnectionId {
+ owner_id: host.connection_server_id.0 as u32,
+ id: host.connection_id as u32,
+ }
};
let payload = session
@@ -1389,7 +1448,10 @@ async fn save_buffer(
.iter()
.find(|collaborator| collaborator.is_host)
.ok_or_else(|| anyhow!("host not found"))?;
- ConnectionId(host.connection_id as u32)
+ ConnectionId {
+ owner_id: host.connection_server_id.0 as u32,
+ id: host.connection_id as u32,
+ }
};
let response_payload = session
.peer
@@ -1401,11 +1463,17 @@ async fn save_buffer(
.await
.project_collaborators(project_id, session.connection_id)
.await?;
- collaborators
- .retain(|collaborator| collaborator.connection_id != session.connection_id.0 as i32);
- let project_connection_ids = collaborators
- .iter()
- .map(|collaborator| ConnectionId(collaborator.connection_id as u32));
+ collaborators.retain(|collaborator| {
+ let collaborator_connection = ConnectionId {
+ owner_id: collaborator.connection_server_id.0 as u32,
+ id: collaborator.connection_id as u32,
+ };
+ collaborator_connection != session.connection_id
+ });
+ let project_connection_ids = collaborators.iter().map(|collaborator| ConnectionId {
+ owner_id: collaborator.connection_server_id.0 as u32,
+ id: collaborator.connection_id as u32,
+ });
broadcast(host_connection_id, project_connection_ids, |conn_id| {
session
.peer
@@ -1419,11 +1487,10 @@ async fn create_buffer_for_peer(
request: proto::CreateBufferForPeer,
session: Session,
) -> Result<()> {
- session.peer.forward_send(
- session.connection_id,
- ConnectionId(request.peer_id),
- request,
- )?;
+ let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
+ session
+ .peer
+ .forward_send(session.connection_id, peer_id.into(), request)?;
Ok(())
}
@@ -1516,7 +1583,10 @@ async fn follow(
session: Session,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.project_id);
- let leader_id = ConnectionId(request.leader_id);
+ let leader_id = request
+ .leader_id
+ .ok_or_else(|| anyhow!("invalid leader id"))?
+ .into();
let follower_id = session.connection_id;
{
let project_connection_ids = session
@@ -1536,14 +1606,17 @@ async fn follow(
.await?;
response_payload
.views
- .retain(|view| view.leader_id != Some(follower_id.0));
+ .retain(|view| view.leader_id != Some(follower_id.into()));
response.send(response_payload)?;
Ok(())
}
async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> {
let project_id = ProjectId::from_proto(request.project_id);
- let leader_id = ConnectionId(request.leader_id);
+ let leader_id = request
+ .leader_id
+ .ok_or_else(|| anyhow!("invalid leader id"))?
+ .into();
let project_connection_ids = session
.db()
.await
@@ -1572,12 +1645,16 @@ async fn update_followers(request: proto::UpdateFollowers, session: Session) ->
proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
});
- for follower_id in &request.follower_ids {
- let follower_id = ConnectionId(*follower_id);
- if project_connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
- session
- .peer
- .forward_send(session.connection_id, follower_id, request.clone())?;
+ for follower_peer_id in request.follower_ids.iter().copied() {
+ let follower_connection_id = follower_peer_id.into();
+ if project_connection_ids.contains(&follower_connection_id)
+ && Some(follower_peer_id) != leader_id
+ {
+ session.peer.forward_send(
+ session.connection_id,
+ follower_connection_id,
+ request.clone(),
+ )?;
}
}
Ok(())
@@ -1892,13 +1969,19 @@ fn contact_for_user(
fn room_updated(room: &proto::Room, peer: &Peer) {
for participant in &room.participants {
- peer.send(
- ConnectionId(participant.peer_id),
- proto::RoomUpdated {
- room: Some(room.clone()),
- },
- )
- .trace_err();
+ if let Some(peer_id) = participant
+ .peer_id
+ .ok_or_else(|| anyhow!("invalid participant peer id"))
+ .trace_err()
+ {
+ peer.send(
+ peer_id.into(),
+ proto::RoomUpdated {
+ room: Some(room.clone()),
+ },
+ )
+ .trace_err();
+ }
}
}
@@ -1943,8 +2026,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
let canceled_calls_to_user_ids;
let live_kit_room;
let delete_live_kit_room;
- {
- let mut left_room = session.db().await.leave_room(session.connection_id).await?;
+ if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
contacts_to_update.insert(session.user_id);
for project in left_room.left_projects.values() {
@@ -1956,6 +2038,8 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
live_kit_room = mem::take(&mut left_room.room.live_kit_room);
delete_live_kit_room = left_room.room.participants.is_empty();
+ } else {
+ return Ok(());
}
{
@@ -2013,7 +2097,7 @@ fn project_left(project: &db::LeftProject, session: &Session) {
*connection_id,
proto::RemoveProjectCollaborator {
project_id: project.id.to_proto(),
- peer_id: session.connection_id.0,
+ peer_id: Some(session.connection_id.into()),
},
)
.trace_err();
@@ -1,6 +1,6 @@
use crate::{contact_notification::ContactNotification, contacts_popover};
use call::{ActiveCall, ParticipantLocation};
-use client::{Authenticate, ContactEventKind, PeerId, User, UserStore};
+use client::{proto::PeerId, Authenticate, ContactEventKind, User, UserStore};
use clock::ReplicaId;
use contacts_popover::ContactsPopover;
use gpui::{
@@ -474,7 +474,7 @@ impl CollabTitlebarItem {
cx.dispatch_action(ToggleFollow(peer_id))
})
.with_tooltip::<ToggleFollow, _>(
- peer_id.0 as usize,
+ peer_id.as_u64() as usize,
if is_followed {
format!("Unfollow {}", peer_github_login)
} else {
@@ -487,22 +487,24 @@ impl CollabTitlebarItem {
.boxed()
} else if let ParticipantLocation::SharedProject { project_id } = location {
let user_id = user.id;
- MouseEventHandler::<JoinProject>::new(peer_id.0 as usize, cx, move |_, _| content)
- .with_cursor_style(CursorStyle::PointingHand)
- .on_click(MouseButton::Left, move |_, cx| {
- cx.dispatch_action(JoinProject {
- project_id,
- follow_user_id: user_id,
- })
+ MouseEventHandler::<JoinProject>::new(peer_id.as_u64() as usize, cx, move |_, _| {
+ content
+ })
+ .with_cursor_style(CursorStyle::PointingHand)
+ .on_click(MouseButton::Left, move |_, cx| {
+ cx.dispatch_action(JoinProject {
+ project_id,
+ follow_user_id: user_id,
})
- .with_tooltip::<JoinProject, _>(
- peer_id.0 as usize,
- format!("Follow {} into external project", peer_github_login),
- Some(Box::new(FollowNextCollaborator)),
- theme.tooltip.clone(),
- cx,
- )
- .boxed()
+ })
+ .with_tooltip::<JoinProject, _>(
+ peer_id.as_u64() as usize,
+ format!("Follow {} into external project", peer_github_login),
+ Some(Box::new(FollowNextCollaborator)),
+ theme.tooltip.clone(),
+ cx,
+ )
+ .boxed()
} else {
content
}
@@ -2,7 +2,7 @@ use std::{mem, sync::Arc};
use crate::contacts_popover;
use call::ActiveCall;
-use client::{Contact, PeerId, User, UserStore};
+use client::{proto::PeerId, Contact, User, UserStore};
use editor::{Cancel, Editor};
use fuzzy::{match_strings, StringMatchCandidate};
use gpui::{
@@ -465,7 +465,7 @@ impl ContactList {
room.remote_participants()
.iter()
.map(|(peer_id, participant)| StringMatchCandidate {
- id: peer_id.0 as usize,
+ id: peer_id.as_u64() as usize,
string: participant.user.github_login.clone(),
char_bag: participant.user.github_login.chars().collect(),
}),
@@ -479,7 +479,7 @@ impl ContactList {
executor.clone(),
));
for mat in matches {
- let peer_id = PeerId(mat.candidate_id as u32);
+ let peer_id = PeerId::from_u64(mat.candidate_id as u64);
let participant = &room.remote_participants()[&peer_id];
participant_entries.push(ContactEntry::CallParticipant {
user: participant.user.clone(),
@@ -881,75 +881,80 @@ impl ContactList {
let baseline_offset =
row.name.text.baseline_offset(font_cache) + (theme.row_height - line_height) / 2.;
- MouseEventHandler::<OpenSharedScreen>::new(peer_id.0 as usize, cx, |mouse_state, _| {
- let tree_branch = *tree_branch.style_for(mouse_state, is_selected);
- let row = theme.project_row.style_for(mouse_state, is_selected);
-
- Flex::row()
- .with_child(
- Stack::new()
- .with_child(
- Canvas::new(move |bounds, _, cx| {
- let start_x = bounds.min_x() + (bounds.width() / 2.)
- - (tree_branch.width / 2.);
- let end_x = bounds.max_x();
- let start_y = bounds.min_y();
- let end_y = bounds.min_y() + baseline_offset - (cap_height / 2.);
+ MouseEventHandler::<OpenSharedScreen>::new(
+ peer_id.as_u64() as usize,
+ cx,
+ |mouse_state, _| {
+ let tree_branch = *tree_branch.style_for(mouse_state, is_selected);
+ let row = theme.project_row.style_for(mouse_state, is_selected);
- cx.scene.push_quad(gpui::Quad {
- bounds: RectF::from_points(
- vec2f(start_x, start_y),
- vec2f(
- start_x + tree_branch.width,
- if is_last { end_y } else { bounds.max_y() },
+ Flex::row()
+ .with_child(
+ Stack::new()
+ .with_child(
+ Canvas::new(move |bounds, _, cx| {
+ let start_x = bounds.min_x() + (bounds.width() / 2.)
+ - (tree_branch.width / 2.);
+ let end_x = bounds.max_x();
+ let start_y = bounds.min_y();
+ let end_y =
+ bounds.min_y() + baseline_offset - (cap_height / 2.);
+
+ cx.scene.push_quad(gpui::Quad {
+ bounds: RectF::from_points(
+ vec2f(start_x, start_y),
+ vec2f(
+ start_x + tree_branch.width,
+ if is_last { end_y } else { bounds.max_y() },
+ ),
),
- ),
- background: Some(tree_branch.color),
- border: gpui::Border::default(),
- corner_radius: 0.,
- });
- cx.scene.push_quad(gpui::Quad {
- bounds: RectF::from_points(
- vec2f(start_x, end_y),
- vec2f(end_x, end_y + tree_branch.width),
- ),
- background: Some(tree_branch.color),
- border: gpui::Border::default(),
- corner_radius: 0.,
- });
- })
+ background: Some(tree_branch.color),
+ border: gpui::Border::default(),
+ corner_radius: 0.,
+ });
+ cx.scene.push_quad(gpui::Quad {
+ bounds: RectF::from_points(
+ vec2f(start_x, end_y),
+ vec2f(end_x, end_y + tree_branch.width),
+ ),
+ background: Some(tree_branch.color),
+ border: gpui::Border::default(),
+ corner_radius: 0.,
+ });
+ })
+ .boxed(),
+ )
+ .constrained()
+ .with_width(host_avatar_height)
.boxed(),
- )
- .constrained()
- .with_width(host_avatar_height)
- .boxed(),
- )
- .with_child(
- Svg::new("icons/disable_screen_sharing_12.svg")
- .with_color(row.icon.color)
- .constrained()
- .with_width(row.icon.width)
- .aligned()
- .left()
- .contained()
- .with_style(row.icon.container)
- .boxed(),
- )
- .with_child(
- Label::new("Screen".into(), row.name.text.clone())
- .aligned()
- .left()
- .contained()
- .with_style(row.name.container)
- .flex(1., false)
- .boxed(),
- )
- .constrained()
- .with_height(theme.row_height)
- .contained()
- .with_style(row.container)
- .boxed()
- })
+ )
+ .with_child(
+ Svg::new("icons/disable_screen_sharing_12.svg")
+ .with_color(row.icon.color)
+ .constrained()
+ .with_width(row.icon.width)
+ .aligned()
+ .left()
+ .contained()
+ .with_style(row.icon.container)
+ .boxed(),
+ )
+ .with_child(
+ Label::new("Screen".into(), row.name.text.clone())
+ .aligned()
+ .left()
+ .contained()
+ .with_style(row.name.container)
+ .flex(1., false)
+ .boxed(),
+ )
+ .constrained()
+ .with_height(theme.row_height)
+ .contained()
+ .with_style(row.container)
+ .boxed()
+ },
+ )
.with_cursor_style(CursorStyle::PointingHand)
.on_click(MouseButton::Left, move |_, cx| {
cx.dispatch_action(OpenSharedScreen { peer_id });
@@ -1,9 +1,7 @@
-use std::{cell::RefCell, rc::Rc, time::Instant};
-
use drag_and_drop::DragAndDrop;
use futures::StreamExt;
use indoc::indoc;
-use rpc::PeerId;
+use std::{cell::RefCell, rc::Rc, time::Instant};
use unindent::Unindent;
use super::*;
@@ -5128,7 +5126,7 @@ async fn test_following_with_multiple_excerpts(cx: &mut gpui::TestAppContext) {
pane.clone(),
project.clone(),
ViewId {
- creator: PeerId(0),
+ creator: Default::default(),
id: 0,
},
&mut state_message,
@@ -5223,7 +5221,7 @@ async fn test_following_with_multiple_excerpts(cx: &mut gpui::TestAppContext) {
pane.clone(),
project.clone(),
ViewId {
- creator: PeerId(0),
+ creator: Default::default(),
id: 0,
},
&mut state_message,
@@ -3,7 +3,7 @@ use crate::{
};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
-use client::{proto, PeerId};
+use client::proto::{self, PeerId};
use gpui::{AppContext, AsyncAppContext, ModelHandle};
use language::{
point_from_lsp, point_to_lsp,
@@ -7,7 +7,7 @@ pub mod worktree;
mod project_tests;
use anyhow::{anyhow, Context, Result};
-use client::{proto, Client, PeerId, TypedEnvelope, UserStore};
+use client::{proto, Client, TypedEnvelope, UserStore};
use clock::ReplicaId;
use collections::{hash_map, BTreeMap, HashMap, HashSet};
use futures::{
@@ -15,7 +15,6 @@ use futures::{
future::Shared,
AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
};
-
use gpui::{
AnyModelHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle,
MutableAppContext, Task, UpgradeModelHandle, WeakModelHandle,
@@ -103,11 +102,11 @@ pub struct Project {
user_store: ModelHandle<UserStore>,
fs: Arc<dyn Fs>,
client_state: Option<ProjectClientState>,
- collaborators: HashMap<PeerId, Collaborator>,
+ collaborators: HashMap<proto::PeerId, Collaborator>,
client_subscriptions: Vec<client::Subscription>,
_subscriptions: Vec<gpui::Subscription>,
opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
- shared_buffers: HashMap<PeerId, HashSet<u64>>,
+ shared_buffers: HashMap<proto::PeerId, HashSet<u64>>,
#[allow(clippy::type_complexity)]
loading_buffers: HashMap<
ProjectPath,
@@ -164,7 +163,7 @@ enum ProjectClientState {
#[derive(Clone, Debug)]
pub struct Collaborator {
- pub peer_id: PeerId,
+ pub peer_id: proto::PeerId,
pub replica_id: ReplicaId,
}
@@ -185,7 +184,7 @@ pub enum Event {
},
RemoteIdChanged(Option<u64>),
DisconnectedFromHost,
- CollaboratorLeft(PeerId),
+ CollaboratorLeft(proto::PeerId),
}
pub enum LanguageServerState {
@@ -555,7 +554,7 @@ impl Project {
.await?;
let mut collaborators = HashMap::default();
for message in response.collaborators {
- let collaborator = Collaborator::from_proto(message);
+ let collaborator = Collaborator::from_proto(message)?;
collaborators.insert(collaborator.peer_id, collaborator);
}
@@ -754,7 +753,7 @@ impl Project {
}
}
- pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
+ pub fn collaborators(&self) -> &HashMap<proto::PeerId, Collaborator> {
&self.collaborators
}
@@ -4605,7 +4604,7 @@ impl Project {
.take()
.ok_or_else(|| anyhow!("empty collaborator"))?;
- let collaborator = Collaborator::from_proto(collaborator);
+ let collaborator = Collaborator::from_proto(collaborator)?;
this.update(&mut cx, |this, cx| {
this.collaborators
.insert(collaborator.peer_id, collaborator);
@@ -4622,7 +4621,10 @@ impl Project {
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |this, cx| {
- let peer_id = PeerId(envelope.payload.peer_id);
+ let peer_id = envelope
+ .payload
+ .peer_id
+ .ok_or_else(|| anyhow!("invalid peer id"))?;
let replica_id = this
.collaborators
.remove(&peer_id)
@@ -5489,7 +5491,7 @@ impl Project {
fn serialize_project_transaction_for_peer(
&mut self,
project_transaction: ProjectTransaction,
- peer_id: PeerId,
+ peer_id: proto::PeerId,
cx: &AppContext,
) -> proto::ProjectTransaction {
let mut serialized_transaction = proto::ProjectTransaction {
@@ -5545,7 +5547,7 @@ impl Project {
fn create_buffer_for_peer(
&mut self,
buffer: &ModelHandle<Buffer>,
- peer_id: PeerId,
+ peer_id: proto::PeerId,
cx: &AppContext,
) -> u64 {
let buffer_id = buffer.read(cx).remote_id();
@@ -5563,7 +5565,7 @@ impl Project {
client.send(proto::CreateBufferForPeer {
project_id,
- peer_id: peer_id.0,
+ peer_id: Some(peer_id),
variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
})?;
@@ -5580,7 +5582,7 @@ impl Project {
let is_last = operations.is_empty();
client.send(proto::CreateBufferForPeer {
project_id,
- peer_id: peer_id.0,
+ peer_id: Some(peer_id),
variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
proto::BufferChunk {
buffer_id,
@@ -6036,11 +6038,11 @@ impl Entity for Project {
}
impl Collaborator {
- fn from_proto(message: proto::Collaborator) -> Self {
- Self {
- peer_id: PeerId(message.peer_id),
+ fn from_proto(message: proto::Collaborator) -> Result<Self> {
+ Ok(Self {
+ peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
replica_id: message.replica_id as ReplicaId,
- }
+ })
}
}
@@ -1,10 +1,15 @@
syntax = "proto3";
package zed.messages;
+message PeerId {
+ uint32 owner_id = 1;
+ uint32 id = 2;
+}
+
message Envelope {
uint32 id = 1;
optional uint32 responding_to = 2;
- optional uint32 original_sender_id = 3;
+ optional PeerId original_sender_id = 3;
oneof payload {
Hello hello = 4;
Ack ack = 5;
@@ -125,7 +130,7 @@ message Envelope {
// Messages
message Hello {
- uint32 peer_id = 1;
+ PeerId peer_id = 1;
}
message Ping {}
@@ -167,7 +172,7 @@ message Room {
message Participant {
uint64 user_id = 1;
- uint32 peer_id = 2;
+ PeerId peer_id = 2;
repeated ParticipantProject projects = 3;
ParticipantLocation location = 4;
}
@@ -319,7 +324,7 @@ message AddProjectCollaborator {
message RemoveProjectCollaborator {
uint64 project_id = 1;
- uint32 peer_id = 2;
+ PeerId peer_id = 2;
}
message GetDefinition {
@@ -438,7 +443,7 @@ message OpenBufferResponse {
message CreateBufferForPeer {
uint64 project_id = 1;
- uint32 peer_id = 2;
+ PeerId peer_id = 2;
oneof variant {
BufferState state = 3;
BufferChunk chunk = 4;
@@ -794,7 +799,7 @@ message UpdateDiagnostics {
message Follow {
uint64 project_id = 1;
- uint32 leader_id = 2;
+ PeerId leader_id = 2;
}
message FollowResponse {
@@ -804,7 +809,7 @@ message FollowResponse {
message UpdateFollowers {
uint64 project_id = 1;
- repeated uint32 follower_ids = 2;
+ repeated PeerId follower_ids = 2;
oneof variant {
UpdateActiveView update_active_view = 3;
View create_view = 4;
@@ -814,7 +819,7 @@ message UpdateFollowers {
message Unfollow {
uint64 project_id = 1;
- uint32 leader_id = 2;
+ PeerId leader_id = 2;
}
message GetPrivateUserInfo {}
@@ -827,18 +832,18 @@ message GetPrivateUserInfoResponse {
// Entities
message ViewId {
- uint32 creator = 1;
+ PeerId creator = 1;
uint64 id = 2;
}
message UpdateActiveView {
optional ViewId id = 1;
- optional uint32 leader_id = 2;
+ optional PeerId leader_id = 2;
}
message UpdateView {
ViewId id = 1;
- optional uint32 leader_id = 2;
+ optional PeerId leader_id = 2;
oneof variant {
Editor editor = 3;
@@ -856,7 +861,7 @@ message UpdateView {
message View {
ViewId id = 1;
- optional uint32 leader_id = 2;
+ optional PeerId leader_id = 2;
oneof variant {
Editor editor = 3;
@@ -874,7 +879,7 @@ message View {
}
message Collaborator {
- uint32 peer_id = 1;
+ PeerId peer_id = 1;
uint32 replica_id = 2;
uint64 user_id = 3;
}
@@ -6,7 +6,10 @@ macro_rules! messages {
$(Some(envelope::Payload::$name(payload)) => {
Some(Box::new(TypedEnvelope {
sender_id,
- original_sender_id: envelope.original_sender_id.map(PeerId),
+ original_sender_id: envelope.original_sender_id.map(|original_sender| PeerId {
+ owner_id: original_sender.owner_id,
+ id: original_sender.id
+ }),
message_id: envelope.id,
payload,
}))
@@ -24,7 +27,7 @@ macro_rules! messages {
self,
id: u32,
responding_to: Option<u32>,
- original_sender_id: Option<u32>,
+ original_sender_id: Option<PeerId>,
) -> Envelope {
Envelope {
id,
@@ -1,5 +1,5 @@
use super::{
- proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage},
+ proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage},
Connection,
};
use anyhow::{anyhow, Context, Result};
@@ -11,9 +11,8 @@ use futures::{
};
use parking_lot::{Mutex, RwLock};
use serde::{ser::SerializeStruct, Serialize};
-use std::sync::atomic::Ordering::SeqCst;
+use std::{fmt, sync::atomic::Ordering::SeqCst};
use std::{
- fmt,
future::Future,
marker::PhantomData,
sync::{
@@ -25,20 +24,32 @@ use std::{
use tracing::instrument;
#[derive(Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Serialize)]
-pub struct ConnectionId(pub u32);
+pub struct ConnectionId {
+ pub owner_id: u32,
+ pub id: u32,
+}
-impl fmt::Display for ConnectionId {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(f)
+impl Into<PeerId> for ConnectionId {
+ fn into(self) -> PeerId {
+ PeerId {
+ owner_id: self.owner_id,
+ id: self.id,
+ }
}
}
-#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
-pub struct PeerId(pub u32);
+impl From<PeerId> for ConnectionId {
+ fn from(peer_id: PeerId) -> Self {
+ Self {
+ owner_id: peer_id.owner_id,
+ id: peer_id.id,
+ }
+ }
+}
-impl fmt::Display for PeerId {
+impl fmt::Display for ConnectionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(f)
+ write!(f, "{}/{}", self.owner_id, self.id)
}
}
@@ -85,6 +96,7 @@ impl<T: RequestMessage> TypedEnvelope<T> {
}
pub struct Peer {
+ epoch: AtomicU32,
pub connections: RwLock<HashMap<ConnectionId, ConnectionState>>,
next_connection_id: AtomicU32,
}
@@ -105,13 +117,18 @@ const WRITE_TIMEOUT: Duration = Duration::from_secs(2);
pub const RECEIVE_TIMEOUT: Duration = Duration::from_secs(5);
impl Peer {
- pub fn new() -> Arc<Self> {
+ pub fn new(epoch: u32) -> Arc<Self> {
Arc::new(Self {
+ epoch: AtomicU32::new(epoch),
connections: Default::default(),
next_connection_id: Default::default(),
})
}
+ pub fn epoch(&self) -> u32 {
+ self.epoch.load(SeqCst)
+ }
+
#[instrument(skip_all)]
pub fn add_connection<F, Fut, Out>(
self: &Arc<Self>,
@@ -138,7 +155,10 @@ impl Peer {
let (mut incoming_tx, incoming_rx) = mpsc::channel(INCOMING_BUFFER_SIZE);
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded();
- let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
+ let connection_id = ConnectionId {
+ owner_id: self.epoch.load(SeqCst),
+ id: self.next_connection_id.fetch_add(1, SeqCst),
+ };
let connection_state = ConnectionState {
outgoing_tx,
next_message_id: Default::default(),
@@ -255,11 +275,7 @@ impl Peer {
let message_id = incoming.id;
tracing::debug!(?incoming, "incoming message future: start");
let _end = util::defer(move || {
- tracing::debug!(
- %connection_id,
- message_id,
- "incoming message future: end"
- );
+ tracing::debug!(%connection_id, message_id, "incoming message future: end");
});
if let Some(responding_to) = incoming.responding_to {
@@ -306,11 +322,7 @@ impl Peer {
None
} else {
- tracing::debug!(
- %connection_id,
- message_id,
- "incoming message: received"
- );
+ tracing::debug!(%connection_id, message_id, "incoming message: received");
proto::build_typed_envelope(connection_id, incoming).or_else(|| {
tracing::error!(
%connection_id,
@@ -343,7 +355,13 @@ impl Peer {
self.connections.write().remove(&connection_id);
}
- pub fn reset(&self) {
+ pub fn reset(&self, epoch: u32) {
+ self.teardown();
+ self.next_connection_id.store(0, SeqCst);
+ self.epoch.store(epoch, SeqCst);
+ }
+
+ pub fn teardown(&self) {
self.connections.write().clear();
}
@@ -384,7 +402,7 @@ impl Peer {
.unbounded_send(proto::Message::Envelope(request.into_envelope(
message_id,
None,
- original_sender_id.map(|id| id.0),
+ original_sender_id.map(Into::into),
)))
.map_err(|_| anyhow!("connection was closed"))?;
Ok(())
@@ -433,7 +451,7 @@ impl Peer {
.unbounded_send(proto::Message::Envelope(message.into_envelope(
message_id,
None,
- Some(sender_id.0),
+ Some(sender_id.into()),
)))?;
Ok(())
}
@@ -515,9 +533,9 @@ mod tests {
let executor = cx.foreground();
// create 2 clients connected to 1 server
- let server = Peer::new();
- let client1 = Peer::new();
- let client2 = Peer::new();
+ let server = Peer::new(0);
+ let client1 = Peer::new(0);
+ let client2 = Peer::new(0);
let (client1_to_server_conn, server_to_client_1_conn, _kill) =
Connection::in_memory(cx.background());
@@ -609,8 +627,8 @@ mod tests {
#[gpui::test(iterations = 50)]
async fn test_order_of_response_and_incoming(cx: &mut TestAppContext) {
let executor = cx.foreground();
- let server = Peer::new();
- let client = Peer::new();
+ let server = Peer::new(0);
+ let client = Peer::new(0);
let (client_to_server_conn, server_to_client_conn, _kill) =
Connection::in_memory(cx.background());
@@ -707,8 +725,8 @@ mod tests {
#[gpui::test(iterations = 50)]
async fn test_dropping_request_before_completion(cx: &mut TestAppContext) {
let executor = cx.foreground();
- let server = Peer::new();
- let client = Peer::new();
+ let server = Peer::new(0);
+ let client = Peer::new(0);
let (client_to_server_conn, server_to_client_conn, _kill) =
Connection::in_memory(cx.background());
@@ -822,7 +840,7 @@ mod tests {
let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background());
- let client = Peer::new();
+ let client = Peer::new(0);
let (connection_id, io_handler, mut incoming) =
client.add_test_connection(client_conn, cx.background());
@@ -857,7 +875,7 @@ mod tests {
let executor = cx.foreground();
let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background());
- let client = Peer::new();
+ let client = Peer::new(0);
let (connection_id, io_handler, mut incoming) =
client.add_test_connection(client_conn, cx.background());
executor.spawn(io_handler).detach();
@@ -1,14 +1,16 @@
-use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
+use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
use anyhow::{anyhow, Result};
use async_tungstenite::tungstenite::Message as WebSocketMessage;
use futures::{SinkExt as _, StreamExt as _};
use prost::Message as _;
use serde::Serialize;
use std::any::{Any, TypeId};
-use std::{cmp, iter, mem};
+use std::fmt;
+use std::str::FromStr;
use std::{
+ cmp,
fmt::Debug,
- io,
+ io, iter, mem,
time::{Duration, SystemTime, UNIX_EPOCH},
};
@@ -21,7 +23,7 @@ pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 's
self,
id: u32,
responding_to: Option<u32>,
- original_sender_id: Option<u32>,
+ original_sender_id: Option<PeerId>,
) -> Envelope;
fn from_envelope(envelope: Envelope) -> Option<Self>;
}
@@ -74,6 +76,66 @@ impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
}
}
+impl PeerId {
+ pub fn from_u64(peer_id: u64) -> Self {
+ let owner_id = (peer_id >> 32) as u32;
+ let id = peer_id as u32;
+ Self { owner_id, id }
+ }
+
+ pub fn as_u64(self) -> u64 {
+ ((self.owner_id as u64) << 32) | (self.id as u64)
+ }
+}
+
+impl Copy for PeerId {}
+
+impl Eq for PeerId {}
+
+impl Ord for PeerId {
+ fn cmp(&self, other: &Self) -> cmp::Ordering {
+ self.owner_id
+ .cmp(&other.owner_id)
+ .then_with(|| self.id.cmp(&other.id))
+ }
+}
+
+impl PartialOrd for PeerId {
+ fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl std::hash::Hash for PeerId {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.owner_id.hash(state);
+ self.id.hash(state);
+ }
+}
+
+impl fmt::Display for PeerId {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{}/{}", self.owner_id, self.id)
+ }
+}
+
+impl FromStr for PeerId {
+ type Err = anyhow::Error;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let mut components = s.split('/');
+ let owner_id = components
+ .next()
+ .ok_or_else(|| anyhow!("invalid peer id {:?}", s))?
+ .parse()?;
+ let id = components
+ .next()
+ .ok_or_else(|| anyhow!("invalid peer id {:?}", s))?
+ .parse()?;
+ Ok(PeerId { owner_id, id })
+ }
+}
+
messages!(
(Ack, Foreground),
(AddProjectCollaborator, Foreground),
@@ -477,4 +539,28 @@ mod tests {
stream.read().await.unwrap();
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
}
+
+ #[gpui::test]
+ fn test_converting_peer_id_from_and_to_u64() {
+ let peer_id = PeerId {
+ owner_id: 10,
+ id: 3,
+ };
+ assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
+ let peer_id = PeerId {
+ owner_id: u32::MAX,
+ id: 3,
+ };
+ assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
+ let peer_id = PeerId {
+ owner_id: 10,
+ id: u32::MAX,
+ };
+ assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
+ let peer_id = PeerId {
+ owner_id: u32::MAX,
+ id: u32::MAX,
+ };
+ assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
+ }
}
@@ -286,7 +286,7 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
.remote_id(&workspace.client, cx)
.map(|id| id.to_proto()),
variant: Some(message),
- leader_id: workspace.leader_for_pane(&pane).map(|id| id.0),
+ leader_id: workspace.leader_for_pane(&pane),
}),
cx,
);
@@ -342,7 +342,7 @@ impl<T: Item> ItemHandle for ViewHandle<T> {
.remote_id(&this.client, cx)
.map(|id| id.to_proto()),
variant: pending_update.borrow_mut().take(),
- leader_id: leader_id.map(|id| id.0),
+ leader_id,
},
),
cx,
@@ -3,7 +3,7 @@ use crate::{
};
use anyhow::{anyhow, Result};
use call::participant::{Frame, RemoteVideoTrack};
-use client::{PeerId, User};
+use client::{proto::PeerId, User};
use futures::StreamExt;
use gpui::{
elements::*,
@@ -16,7 +16,10 @@ mod toolbar;
use anyhow::{anyhow, Result};
use call::ActiveCall;
-use client::{proto, Client, PeerId, TypedEnvelope, UserStore};
+use client::{
+ proto::{self, PeerId},
+ Client, TypedEnvelope, UserStore,
+};
use collections::{hash_map, HashMap, HashSet};
use dock::{Dock, DockDefaultItemFactory, ToggleDockButton};
use drag_and_drop::DragAndDrop;
@@ -1466,7 +1469,7 @@ impl Workspace {
.remote_id(&self.client, cx)
.map(|id| id.to_proto())
}),
- leader_id: self.leader_for_pane(&pane).map(|id| id.0),
+ leader_id: self.leader_for_pane(&pane),
}),
cx,
);
@@ -1643,7 +1646,7 @@ impl Workspace {
let project_id = self.project.read(cx).remote_id()?;
let request = self.client.request(proto::Follow {
project_id,
- leader_id: leader_id.0,
+ leader_id: Some(leader_id),
});
Some(cx.spawn_weak(|this, mut cx| async move {
let response = request.await?;
@@ -1654,7 +1657,11 @@ impl Workspace {
.get_mut(&leader_id)
.and_then(|states_by_pane| states_by_pane.get_mut(&pane))
.ok_or_else(|| anyhow!("following interrupted"))?;
- state.active_view_id = response.active_view_id.map(ViewId::from_proto);
+ state.active_view_id = if let Some(active_view_id) = response.active_view_id {
+ Some(ViewId::from_proto(active_view_id)?)
+ } else {
+ None
+ };
Ok::<_, anyhow::Error>(())
})?;
Self::add_views_from_leader(
@@ -1720,7 +1727,7 @@ impl Workspace {
self.client
.send(proto::Unfollow {
project_id,
- leader_id: leader_id.0,
+ leader_id: Some(leader_id),
})
.log_err();
}
@@ -1920,7 +1927,7 @@ impl Workspace {
.panes()
.iter()
.flat_map(|pane| {
- let leader_id = this.leader_for_pane(pane).map(|id| id.0);
+ let leader_id = this.leader_for_pane(pane);
pane.read(cx).items().filter_map({
let cx = &cx;
move |item| {
@@ -1980,10 +1987,15 @@ impl Workspace {
if let Some(state) = this.follower_states_by_leader.get_mut(&leader_id) {
for state in state.values_mut() {
state.active_view_id =
- update_active_view.id.clone().map(ViewId::from_proto);
+ if let Some(active_view_id) = update_active_view.id.clone() {
+ Some(ViewId::from_proto(active_view_id)?)
+ } else {
+ None
+ };
}
}
- });
+ anyhow::Ok(())
+ })?;
}
proto::update_followers::Variant::UpdateView(update_view) => {
let variant = update_view
@@ -1997,15 +2009,14 @@ impl Workspace {
let project = this.project.clone();
if let Some(state) = this.follower_states_by_leader.get_mut(&leader_id) {
for state in state.values_mut() {
- if let Some(item) = state
- .items_by_leader_view_id
- .get(&ViewId::from_proto(id.clone()))
- {
+ let view_id = ViewId::from_proto(id.clone())?;
+ if let Some(item) = state.items_by_leader_view_id.get(&view_id) {
tasks.push(item.apply_update_proto(&project, variant.clone(), cx));
}
}
}
- });
+ anyhow::Ok(())
+ })?;
try_join_all(tasks).await.log_err();
}
proto::update_followers::Variant::CreateView(view) => {
@@ -2054,7 +2065,7 @@ impl Workspace {
let mut leader_view_ids = Vec::new();
for view in &views {
let Some(id) = &view.id else { continue };
- let id = ViewId::from_proto(id.clone());
+ let id = ViewId::from_proto(id.clone())?;
let mut variant = view.variant.clone();
if variant.is_none() {
Err(anyhow!("missing variant"))?;
@@ -2105,7 +2116,7 @@ impl Workspace {
self.client
.send(proto::UpdateFollowers {
project_id,
- follower_ids: self.leader_state.followers.iter().map(|f| f.0).collect(),
+ follower_ids: self.leader_state.followers.iter().copied().collect(),
variant: Some(update),
})
.log_err();
@@ -2587,16 +2598,18 @@ impl View for Workspace {
}
impl ViewId {
- pub(crate) fn from_proto(message: proto::ViewId) -> Self {
- Self {
- creator: PeerId(message.creator),
+ pub(crate) fn from_proto(message: proto::ViewId) -> Result<Self> {
+ Ok(Self {
+ creator: message
+ .creator
+ .ok_or_else(|| anyhow!("creator is missing"))?,
id: message.id,
- }
+ })
}
pub(crate) fn to_proto(&self) -> proto::ViewId {
proto::ViewId {
- creator: self.creator.0,
+ creator: Some(self.creator),
id: self.id,
}
}