diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index baa5da9bf719965de8e93a48534cfcc31f283f45..a32f25fbe9bf0b96585b5e4242daff92216a34a2 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -19,7 +19,9 @@ env:
jobs:
rustfmt:
name: Check formatting
- runs-on: self-hosted
+ runs-on:
+ - self-hosted
+ - test
steps:
- name: Install Rust
run: |
diff --git a/Cargo.lock b/Cargo.lock
index e8410b25f045c09b6226df2bd4be4a22f8559d73..615825bd757f5897d6c11970307ffe77799c39eb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -794,7 +794,7 @@ dependencies = [
[[package]]
name = "bromberg_sl2"
version = "0.6.0"
-source = "git+https://github.com/zed-industries/bromberg_sl2?rev=dac565a90e8f9245f48ff46225c915dc50f76920#dac565a90e8f9245f48ff46225c915dc50f76920"
+source = "git+https://github.com/zed-industries/bromberg_sl2?rev=950bc5482c216c395049ae33ae4501e08975f17f#950bc5482c216c395049ae33ae4501e08975f17f"
dependencies = [
"digest 0.9.0",
"lazy_static",
@@ -1188,7 +1188,7 @@ dependencies = [
[[package]]
name = "collab"
-version = "0.5.4"
+version = "0.6.1"
dependencies = [
"anyhow",
"async-tungstenite",
@@ -1257,6 +1257,7 @@ dependencies = [
"client",
"clock",
"collections",
+ "context_menu",
"editor",
"futures 0.3.25",
"fuzzy",
@@ -8356,7 +8357,7 @@ checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "zed"
-version = "0.75.0"
+version = "0.76.0"
dependencies = [
"activity_indicator",
"anyhow",
diff --git a/assets/icons/ellipsis_14.svg b/assets/icons/ellipsis_14.svg
new file mode 100644
index 0000000000000000000000000000000000000000..5d45af2b6f249f103ae2f1f3e8df48905f2fd832
--- /dev/null
+++ b/assets/icons/ellipsis_14.svg
@@ -0,0 +1,3 @@
+
diff --git a/assets/icons/leave_12.svg b/assets/icons/leave_12.svg
new file mode 100644
index 0000000000000000000000000000000000000000..84491384b8cc7f80d4a727e75c142ee509b451ac
--- /dev/null
+++ b/assets/icons/leave_12.svg
@@ -0,0 +1,3 @@
+
diff --git a/assets/keymaps/default.json b/assets/keymaps/default.json
index e8f055cb7d740fc1ce15e88d496dce60a4b0ea6b..cce65eda8abcadcb632d1b496c10b2dc283dc548 100644
--- a/assets/keymaps/default.json
+++ b/assets/keymaps/default.json
@@ -228,6 +228,7 @@
"replace_newest": true
}
],
+ "cmd-k cmd-i": "editor::Hover",
"cmd-/": [
"editor::ToggleComments",
{
@@ -418,7 +419,7 @@
{
"bindings": {
"ctrl-alt-cmd-f": "workspace::FollowNextCollaborator",
- "cmd-shift-c": "collab::ToggleCollaborationMenu",
+ "cmd-shift-c": "collab::ToggleContactsMenu",
"cmd-alt-i": "zed::DebugElements"
}
},
@@ -456,7 +457,7 @@
}
},
{
- "context": "Dock",
+ "context": "Pane && docked",
"bindings": {
"shift-escape": "dock::HideDock",
"cmd-escape": "dock::RemoveTabFromDock"
diff --git a/assets/keymaps/vim.json b/assets/keymaps/vim.json
index 824fb63c0f35969efeda1bb9cb2ded01e386d539..a24c4aff6900b14b939767b4b5caca6abb9c4486 100644
--- a/assets/keymaps/vim.json
+++ b/assets/keymaps/vim.json
@@ -27,6 +27,7 @@
"h": "vim::Left",
"backspace": "vim::Backspace",
"j": "vim::Down",
+ "enter": "vim::NextLineStart",
"k": "vim::Up",
"l": "vim::Right",
"$": "vim::EndOfLine",
diff --git a/assets/settings/default.json b/assets/settings/default.json
index f6fb61d65c83e0352dc43455d10627708b2e35b9..00866af2caf86817e686d69677ab3225aceb09ff 100644
--- a/assets/settings/default.json
+++ b/assets/settings/default.json
@@ -83,7 +83,7 @@
"hard_tabs": false,
// How many columns a tab should occupy.
"tab_size": 4,
- // Control what info Zed sends to our servers
+ // Control what info is collected by Zed.
"telemetry": {
// Send debug info like crash reports.
"diagnostics": true,
diff --git a/crates/activity_indicator/src/activity_indicator.rs b/crates/activity_indicator/src/activity_indicator.rs
index f3a6f7328ad34ea0e6cffc30fed3742937b2275a..2041bbc793f75f634582fa5bdbb8c03044d95b68 100644
--- a/crates/activity_indicator/src/activity_indicator.rs
+++ b/crates/activity_indicator/src/activity_indicator.rs
@@ -33,6 +33,19 @@ struct LspStatus {
status: LanguageServerBinaryStatus,
}
+struct PendingWork<'a> {
+ language_server_name: &'a str,
+ progress_token: &'a str,
+ progress: &'a LanguageServerProgress,
+}
+
+#[derive(Default)]
+struct Content {
+ icon: Option<&'static str>,
+ message: String,
+ action: Option>,
+}
+
pub fn init(cx: &mut MutableAppContext) {
cx.add_action(ActivityIndicator::show_error_message);
cx.add_action(ActivityIndicator::dismiss_error_message);
@@ -69,6 +82,8 @@ impl ActivityIndicator {
if let Some(auto_updater) = auto_updater.as_ref() {
cx.observe(auto_updater, |_, _, cx| cx.notify()).detach();
}
+ cx.observe_active_labeled_tasks(|_, cx| cx.notify())
+ .detach();
Self {
statuses: Default::default(),
@@ -130,7 +145,7 @@ impl ActivityIndicator {
fn pending_language_server_work<'a>(
&self,
cx: &'a AppContext,
- ) -> impl Iterator- {
+ ) -> impl Iterator
- > {
self.project
.read(cx)
.language_server_statuses()
@@ -142,23 +157,29 @@ impl ActivityIndicator {
let mut pending_work = status
.pending_work
.iter()
- .map(|(token, progress)| (status.name.as_str(), token.as_str(), progress))
+ .map(|(token, progress)| PendingWork {
+ language_server_name: status.name.as_str(),
+ progress_token: token.as_str(),
+ progress,
+ })
.collect::>();
- pending_work.sort_by_key(|(_, _, progress)| Reverse(progress.last_update_at));
+ pending_work.sort_by_key(|work| Reverse(work.progress.last_update_at));
Some(pending_work)
}
})
.flatten()
}
- fn content_to_render(
- &mut self,
- cx: &mut RenderContext,
- ) -> (Option<&'static str>, String, Option>) {
+ fn content_to_render(&mut self, cx: &mut RenderContext) -> Content {
// Show any language server has pending activity.
let mut pending_work = self.pending_language_server_work(cx);
- if let Some((lang_server_name, progress_token, progress)) = pending_work.next() {
- let mut message = lang_server_name.to_string();
+ if let Some(PendingWork {
+ language_server_name,
+ progress_token,
+ progress,
+ }) = pending_work.next()
+ {
+ let mut message = language_server_name.to_string();
message.push_str(": ");
if let Some(progress_message) = progress.message.as_ref() {
@@ -176,7 +197,11 @@ impl ActivityIndicator {
write!(&mut message, " + {} more", additional_work_count).unwrap();
}
- return (None, message, None);
+ return Content {
+ icon: None,
+ message,
+ action: None,
+ };
}
// Show any language server installation info.
@@ -199,19 +224,19 @@ impl ActivityIndicator {
}
if !downloading.is_empty() {
- return (
- Some(DOWNLOAD_ICON),
- format!(
+ return Content {
+ icon: Some(DOWNLOAD_ICON),
+ message: format!(
"Downloading {} language server{}...",
downloading.join(", "),
if downloading.len() > 1 { "s" } else { "" }
),
- None,
- );
+ action: None,
+ };
} else if !checking_for_update.is_empty() {
- return (
- Some(DOWNLOAD_ICON),
- format!(
+ return Content {
+ icon: Some(DOWNLOAD_ICON),
+ message: format!(
"Checking for updates to {} language server{}...",
checking_for_update.join(", "),
if checking_for_update.len() > 1 {
@@ -220,53 +245,61 @@ impl ActivityIndicator {
""
}
),
- None,
- );
+ action: None,
+ };
} else if !failed.is_empty() {
- return (
- Some(WARNING_ICON),
- format!(
+ return Content {
+ icon: Some(WARNING_ICON),
+ message: format!(
"Failed to download {} language server{}. Click to show error.",
failed.join(", "),
if failed.len() > 1 { "s" } else { "" }
),
- Some(Box::new(ShowErrorMessage)),
- );
+ action: Some(Box::new(ShowErrorMessage)),
+ };
}
// Show any application auto-update info.
if let Some(updater) = &self.auto_updater {
- match &updater.read(cx).status() {
- AutoUpdateStatus::Checking => (
- Some(DOWNLOAD_ICON),
- "Checking for Zed updates…".to_string(),
- None,
- ),
- AutoUpdateStatus::Downloading => (
- Some(DOWNLOAD_ICON),
- "Downloading Zed update…".to_string(),
- None,
- ),
- AutoUpdateStatus::Installing => (
- Some(DOWNLOAD_ICON),
- "Installing Zed update…".to_string(),
- None,
- ),
- AutoUpdateStatus::Updated => (
- None,
- "Click to restart and update Zed".to_string(),
- Some(Box::new(workspace::Restart)),
- ),
- AutoUpdateStatus::Errored => (
- Some(WARNING_ICON),
- "Auto update failed".to_string(),
- Some(Box::new(DismissErrorMessage)),
- ),
+ return match &updater.read(cx).status() {
+ AutoUpdateStatus::Checking => Content {
+ icon: Some(DOWNLOAD_ICON),
+ message: "Checking for Zed updates…".to_string(),
+ action: None,
+ },
+ AutoUpdateStatus::Downloading => Content {
+ icon: Some(DOWNLOAD_ICON),
+ message: "Downloading Zed update…".to_string(),
+ action: None,
+ },
+ AutoUpdateStatus::Installing => Content {
+ icon: Some(DOWNLOAD_ICON),
+ message: "Installing Zed update…".to_string(),
+ action: None,
+ },
+ AutoUpdateStatus::Updated => Content {
+ icon: None,
+ message: "Click to restart and update Zed".to_string(),
+ action: Some(Box::new(workspace::Restart)),
+ },
+ AutoUpdateStatus::Errored => Content {
+ icon: Some(WARNING_ICON),
+ message: "Auto update failed".to_string(),
+ action: Some(Box::new(DismissErrorMessage)),
+ },
AutoUpdateStatus::Idle => Default::default(),
- }
- } else {
- Default::default()
+ };
}
+
+ if let Some(most_recent_active_task) = cx.active_labeled_tasks().last() {
+ return Content {
+ icon: None,
+ message: most_recent_active_task.to_string(),
+ action: None,
+ };
+ }
+
+ Default::default()
}
}
@@ -280,7 +313,11 @@ impl View for ActivityIndicator {
}
fn render(&mut self, cx: &mut RenderContext) -> ElementBox {
- let (icon, message, action) = self.content_to_render(cx);
+ let Content {
+ icon,
+ message,
+ action,
+ } = self.content_to_render(cx);
let mut element = MouseEventHandler::::new(0, cx, |state, cx| {
let theme = &cx
diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs
index 64584e61400b414620ba640f2fbc0b79825c535e..dfe4f39e0e85bef14a52a22a3c8c0f1b9bfa84b6 100644
--- a/crates/call/src/call.rs
+++ b/crates/call/src/call.rs
@@ -284,6 +284,18 @@ impl ActiveCall {
}
}
+ pub fn unshare_project(
+ &mut self,
+ project: ModelHandle,
+ cx: &mut ModelContext,
+ ) -> Result<()> {
+ if let Some((room, _)) = self.room.as_ref() {
+ room.update(cx, |room, cx| room.unshare_project(project, cx))
+ } else {
+ Err(anyhow!("no active call"))
+ }
+ }
+
pub fn set_location(
&mut self,
project: Option<&ModelHandle>,
diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs
index 7527a693269725f0a6eae3bea48790e7c79dfb00..06680c1dcb86335c2e120df3043fd413d3f26af1 100644
--- a/crates/call/src/room.rs
+++ b/crates/call/src/room.rs
@@ -55,6 +55,7 @@ pub struct Room {
leave_when_empty: bool,
client: Arc,
user_store: ModelHandle,
+ follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec>,
subscriptions: Vec,
pending_room_update: Option>,
maintain_connection: Option>>,
@@ -148,6 +149,7 @@ impl Room {
pending_room_update: None,
client,
user_store,
+ follows_by_leader_id_project_id: Default::default(),
maintain_connection: Some(maintain_connection),
}
}
@@ -275,14 +277,12 @@ impl Room {
) -> Result<()> {
let mut client_status = client.status();
loop {
- let is_connected = client_status
- .next()
- .await
- .map_or(false, |s| s.is_connected());
-
+ let _ = client_status.try_recv();
+ let is_connected = client_status.borrow().is_connected();
// Even if we're initially connected, any future change of the status means we momentarily disconnected.
if !is_connected || client_status.next().await.is_some() {
log::info!("detected client disconnection");
+
this.upgrade(&cx)
.ok_or_else(|| anyhow!("room was dropped"))?
.update(&mut cx, |this, cx| {
@@ -296,12 +296,7 @@ impl Room {
let client_reconnection = async {
let mut remaining_attempts = 3;
while remaining_attempts > 0 {
- log::info!(
- "waiting for client status change, remaining attempts {}",
- remaining_attempts
- );
- let Some(status) = client_status.next().await else { break };
- if status.is_connected() {
+ if client_status.borrow().is_connected() {
log::info!("client reconnected, attempting to rejoin room");
let Some(this) = this.upgrade(&cx) else { break };
@@ -315,7 +310,15 @@ impl Room {
} else {
remaining_attempts -= 1;
}
+ } else if client_status.borrow().is_signed_out() {
+ return false;
}
+
+ log::info!(
+ "waiting for client status change, remaining attempts {}",
+ remaining_attempts
+ );
+ client_status.next().await;
}
false
}
@@ -337,18 +340,20 @@ impl Room {
}
}
- // The client failed to re-establish a connection to the server
- // or an error occurred while trying to re-join the room. Either way
- // we leave the room and return an error.
- if let Some(this) = this.upgrade(&cx) {
- log::info!("reconnection failed, leaving room");
- let _ = this.update(&mut cx, |this, cx| this.leave(cx));
- }
- return Err(anyhow!(
- "can't reconnect to room: client failed to re-establish connection"
- ));
+ break;
}
}
+
+ // The client failed to re-establish a connection to the server
+ // or an error occurred while trying to re-join the room. Either way
+ // we leave the room and return an error.
+ if let Some(this) = this.upgrade(&cx) {
+ log::info!("reconnection failed, leaving room");
+ let _ = this.update(&mut cx, |this, cx| this.leave(cx));
+ }
+ Err(anyhow!(
+ "can't reconnect to room: client failed to re-establish connection"
+ ))
}
fn rejoin(&mut self, cx: &mut ModelContext) -> Task> {
@@ -457,6 +462,12 @@ impl Room {
self.participant_user_ids.contains(&user_id)
}
+ pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
+ self.follows_by_leader_id_project_id
+ .get(&(leader_id, project_id))
+ .map_or(&[], |v| v.as_slice())
+ }
+
async fn handle_room_updated(
this: ModelHandle,
envelope: TypedEnvelope,
@@ -487,11 +498,13 @@ impl Room {
.iter()
.map(|p| p.user_id)
.collect::>();
+
let remote_participant_user_ids = room
.participants
.iter()
.map(|p| p.user_id)
.collect::>();
+
let (remote_participants, pending_participants) =
self.user_store.update(cx, move |user_store, cx| {
(
@@ -499,6 +512,7 @@ impl Room {
user_store.get_users(pending_participant_user_ids, cx),
)
});
+
self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
let (remote_participants, pending_participants) =
futures::join!(remote_participants, pending_participants);
@@ -620,6 +634,27 @@ impl Room {
}
}
+ this.follows_by_leader_id_project_id.clear();
+ for follower in room.followers {
+ let project_id = follower.project_id;
+ let (leader, follower) = match (follower.leader_id, follower.follower_id) {
+ (Some(leader), Some(follower)) => (leader, follower),
+
+ _ => {
+ log::error!("Follower message {follower:?} missing some state");
+ continue;
+ }
+ };
+
+ let list = this
+ .follows_by_leader_id_project_id
+ .entry((leader, project_id))
+ .or_insert(Vec::new());
+ if !list.contains(&follower) {
+ list.push(follower);
+ }
+ }
+
this.pending_room_update.take();
if this.should_leave() {
log::info!("room is empty, leaving");
@@ -793,6 +828,20 @@ impl Room {
})
}
+ pub(crate) fn unshare_project(
+ &mut self,
+ project: ModelHandle,
+ cx: &mut ModelContext,
+ ) -> Result<()> {
+ let project_id = match project.read(cx).remote_id() {
+ Some(project_id) => project_id,
+ None => return Ok(()),
+ };
+
+ self.client.send(proto::UnshareProject { project_id })?;
+ project.update(cx, |this, cx| this.unshare(cx))
+ }
+
pub(crate) fn set_location(
&mut self,
project: Option<&ModelHandle>,
diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs
index eba58304d7c89c8de749aee932034cb8079928eb..f36fa67d9d6790be5b124200ebd46f0ebc35d0cf 100644
--- a/crates/client/src/client.rs
+++ b/crates/client/src/client.rs
@@ -66,7 +66,7 @@ pub const ZED_SECRET_CLIENT_TOKEN: &str = "618033988749894";
pub const INITIAL_RECONNECTION_DELAY: Duration = Duration::from_millis(100);
pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
-actions!(client, [Authenticate]);
+actions!(client, [Authenticate, SignOut]);
pub fn init(client: Arc, cx: &mut MutableAppContext) {
cx.add_global_action({
@@ -79,6 +79,16 @@ pub fn init(client: Arc, cx: &mut MutableAppContext) {
.detach();
}
});
+ cx.add_global_action({
+ let client = client.clone();
+ move |_: &SignOut, cx| {
+ let client = client.clone();
+ cx.spawn(|cx| async move {
+ client.disconnect(&cx);
+ })
+ .detach();
+ }
+ });
}
pub struct Client {
@@ -169,6 +179,10 @@ impl Status {
pub fn is_connected(&self) -> bool {
matches!(self, Self::Connected { .. })
}
+
+ pub fn is_signed_out(&self) -> bool {
+ matches!(self, Self::SignedOut | Self::UpgradeRequired)
+ }
}
struct ClientState {
@@ -1152,11 +1166,9 @@ impl Client {
})
}
- pub fn disconnect(self: &Arc, cx: &AsyncAppContext) -> Result<()> {
- let conn_id = self.connection_id()?;
- self.peer.disconnect(conn_id);
+ pub fn disconnect(self: &Arc, cx: &AsyncAppContext) {
+ self.peer.teardown();
self.set_status(Status::SignedOut, cx);
- Ok(())
}
fn connection_id(&self) -> Result {
diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml
index 9301a1974aad839a222c60b5be290d058bdfa378..86fe9174bf0bfec0aeb5b90daab8d7e09c898bd7 100644
--- a/crates/collab/Cargo.toml
+++ b/crates/collab/Cargo.toml
@@ -3,7 +3,7 @@ authors = ["Nathan Sobo "]
default-run = "collab"
edition = "2021"
name = "collab"
-version = "0.5.4"
+version = "0.6.1"
publish = false
[[bin]]
diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql
index 32254d5757da77f7b90f6c675b0a432418d32624..89b924087ef987c89ec58e65f2b165a7d11b4afa 100644
--- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql
+++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql
@@ -143,3 +143,17 @@ CREATE TABLE "servers" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"environment" VARCHAR NOT NULL
);
+
+CREATE TABLE "followers" (
+ "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "room_id" INTEGER NOT NULL REFERENCES rooms (id) ON DELETE CASCADE,
+ "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
+ "leader_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+ "leader_connection_id" INTEGER NOT NULL,
+ "follower_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+ "follower_connection_id" INTEGER NOT NULL
+);
+CREATE UNIQUE INDEX
+ "index_followers_on_project_id_and_leader_connection_server_id_and_leader_connection_id_and_follower_connection_server_id_and_follower_connection_id"
+ON "followers" ("project_id", "leader_connection_server_id", "leader_connection_id", "follower_connection_server_id", "follower_connection_id");
+CREATE INDEX "index_followers_on_room_id" ON "followers" ("room_id");
diff --git a/crates/collab/migrations/20230202155735_followers.sql b/crates/collab/migrations/20230202155735_followers.sql
new file mode 100644
index 0000000000000000000000000000000000000000..c82d6ba3bdaa4f2b2a60771bca7401c47678f247
--- /dev/null
+++ b/crates/collab/migrations/20230202155735_followers.sql
@@ -0,0 +1,15 @@
+CREATE TABLE IF NOT EXISTS "followers" (
+ "id" SERIAL PRIMARY KEY,
+ "room_id" INTEGER NOT NULL REFERENCES rooms (id) ON DELETE CASCADE,
+ "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
+ "leader_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+ "leader_connection_id" INTEGER NOT NULL,
+ "follower_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+ "follower_connection_id" INTEGER NOT NULL
+);
+
+CREATE UNIQUE INDEX
+ "index_followers_on_project_id_and_leader_connection_server_id_and_leader_connection_id_and_follower_connection_server_id_and_follower_connection_id"
+ON "followers" ("project_id", "leader_connection_server_id", "leader_connection_id", "follower_connection_server_id", "follower_connection_id");
+
+CREATE INDEX "index_followers_on_room_id" ON "followers" ("room_id");
diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs
index af30073ab4ef424b8c9f84557cdd692cdfbfcf46..c4ff2e39188a1133cc086b00ab5194d242ef0cc8 100644
--- a/crates/collab/src/db.rs
+++ b/crates/collab/src/db.rs
@@ -1,5 +1,6 @@
mod access_token;
mod contact;
+mod follower;
mod language_server;
mod project;
mod project_collaborator;
@@ -157,7 +158,7 @@ impl Database {
room_id: RoomId,
new_server_id: ServerId,
) -> Result> {
- self.room_transaction(|tx| async move {
+ self.room_transaction(room_id, |tx| async move {
let stale_participant_filter = Condition::all()
.add(room_participant::Column::RoomId.eq(room_id))
.add(room_participant::Column::AnsweringConnectionId.is_not_null())
@@ -190,17 +191,18 @@ impl Database {
.filter(room_participant::Column::RoomId.eq(room_id))
.exec(&*tx)
.await?;
+ project::Entity::delete_many()
+ .filter(project::Column::RoomId.eq(room_id))
+ .exec(&*tx)
+ .await?;
room::Entity::delete_by_id(room_id).exec(&*tx).await?;
}
- Ok((
- room_id,
- RefreshedRoom {
- room,
- stale_participant_user_ids,
- canceled_calls_to_user_ids,
- },
- ))
+ Ok(RefreshedRoom {
+ room,
+ stale_participant_user_ids,
+ canceled_calls_to_user_ids,
+ })
})
.await
}
@@ -1129,18 +1131,16 @@ impl Database {
user_id: UserId,
connection: ConnectionId,
live_kit_room: &str,
- ) -> Result> {
- self.room_transaction(|tx| async move {
+ ) -> Result {
+ self.transaction(|tx| async move {
let room = room::ActiveModel {
live_kit_room: ActiveValue::set(live_kit_room.into()),
..Default::default()
}
.insert(&*tx)
.await?;
- let room_id = room.id;
-
room_participant::ActiveModel {
- room_id: ActiveValue::set(room_id),
+ room_id: ActiveValue::set(room.id),
user_id: ActiveValue::set(user_id),
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
answering_connection_server_id: ActiveValue::set(Some(ServerId(
@@ -1157,8 +1157,8 @@ impl Database {
.insert(&*tx)
.await?;
- let room = self.get_room(room_id, &tx).await?;
- Ok((room_id, room))
+ let room = self.get_room(room.id, &tx).await?;
+ Ok(room)
})
.await
}
@@ -1171,7 +1171,7 @@ impl Database {
called_user_id: UserId,
initial_project_id: Option,
) -> Result> {
- self.room_transaction(|tx| async move {
+ self.room_transaction(room_id, |tx| async move {
room_participant::ActiveModel {
room_id: ActiveValue::set(room_id),
user_id: ActiveValue::set(called_user_id),
@@ -1190,7 +1190,7 @@ impl Database {
let room = self.get_room(room_id, &tx).await?;
let incoming_call = Self::build_incoming_call(&room, called_user_id)
.ok_or_else(|| anyhow!("failed to build incoming call"))?;
- Ok((room_id, (room, incoming_call)))
+ Ok((room, incoming_call))
})
.await
}
@@ -1200,7 +1200,7 @@ impl Database {
room_id: RoomId,
called_user_id: UserId,
) -> Result> {
- self.room_transaction(|tx| async move {
+ self.room_transaction(room_id, |tx| async move {
room_participant::Entity::delete_many()
.filter(
room_participant::Column::RoomId
@@ -1210,7 +1210,7 @@ impl Database {
.exec(&*tx)
.await?;
let room = self.get_room(room_id, &tx).await?;
- Ok((room_id, room))
+ Ok(room)
})
.await
}
@@ -1257,7 +1257,7 @@ impl Database {
calling_connection: ConnectionId,
called_user_id: UserId,
) -> Result> {
- self.room_transaction(|tx| async move {
+ self.room_transaction(room_id, |tx| async move {
let participant = room_participant::Entity::find()
.filter(
Condition::all()
@@ -1276,14 +1276,13 @@ impl Database {
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no call to cancel"))?;
- let room_id = participant.room_id;
room_participant::Entity::delete(participant.into_active_model())
.exec(&*tx)
.await?;
let room = self.get_room(room_id, &tx).await?;
- Ok((room_id, room))
+ Ok(room)
})
.await
}
@@ -1294,7 +1293,7 @@ impl Database {
user_id: UserId,
connection: ConnectionId,
) -> Result> {
- self.room_transaction(|tx| async move {
+ self.room_transaction(room_id, |tx| async move {
let result = room_participant::Entity::update_many()
.filter(
Condition::all()
@@ -1316,7 +1315,7 @@ impl Database {
Err(anyhow!("room does not exist or was already joined"))?
} else {
let room = self.get_room(room_id, &tx).await?;
- Ok((room_id, room))
+ Ok(room)
}
})
.await
@@ -1328,9 +1327,9 @@ impl Database {
user_id: UserId,
connection: ConnectionId,
) -> Result> {
- self.room_transaction(|tx| async {
+ let room_id = RoomId::from_proto(rejoin_room.id);
+ self.room_transaction(room_id, |tx| async {
let tx = tx;
- let room_id = RoomId::from_proto(rejoin_room.id);
let participant_update = room_participant::Entity::update_many()
.filter(
Condition::all()
@@ -1549,14 +1548,11 @@ impl Database {
}
let room = self.get_room(room_id, &tx).await?;
- Ok((
- room_id,
- RejoinedRoom {
- room,
- rejoined_projects,
- reshared_projects,
- },
- ))
+ Ok(RejoinedRoom {
+ room,
+ rejoined_projects,
+ reshared_projects,
+ })
})
.await
}
@@ -1717,13 +1713,78 @@ impl Database {
.await
}
+ pub async fn follow(
+ &self,
+ project_id: ProjectId,
+ leader_connection: ConnectionId,
+ follower_connection: ConnectionId,
+ ) -> Result> {
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
+ follower::ActiveModel {
+ room_id: ActiveValue::set(room_id),
+ project_id: ActiveValue::set(project_id),
+ leader_connection_server_id: ActiveValue::set(ServerId(
+ leader_connection.owner_id as i32,
+ )),
+ leader_connection_id: ActiveValue::set(leader_connection.id as i32),
+ follower_connection_server_id: ActiveValue::set(ServerId(
+ follower_connection.owner_id as i32,
+ )),
+ follower_connection_id: ActiveValue::set(follower_connection.id as i32),
+ ..Default::default()
+ }
+ .insert(&*tx)
+ .await?;
+
+ let room = self.get_room(room_id, &*tx).await?;
+ Ok(room)
+ })
+ .await
+ }
+
+ pub async fn unfollow(
+ &self,
+ project_id: ProjectId,
+ leader_connection: ConnectionId,
+ follower_connection: ConnectionId,
+ ) -> Result> {
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
+ follower::Entity::delete_many()
+ .filter(
+ Condition::all()
+ .add(follower::Column::ProjectId.eq(project_id))
+ .add(
+ follower::Column::LeaderConnectionServerId
+ .eq(leader_connection.owner_id)
+ .and(follower::Column::LeaderConnectionId.eq(leader_connection.id)),
+ )
+ .add(
+ follower::Column::FollowerConnectionServerId
+ .eq(follower_connection.owner_id)
+ .and(
+ follower::Column::FollowerConnectionId
+ .eq(follower_connection.id),
+ ),
+ ),
+ )
+ .exec(&*tx)
+ .await?;
+
+ let room = self.get_room(room_id, &*tx).await?;
+ Ok(room)
+ })
+ .await
+ }
+
pub async fn update_room_participant_location(
&self,
room_id: RoomId,
connection: ConnectionId,
location: proto::ParticipantLocation,
) -> Result> {
- self.room_transaction(|tx| async {
+ self.room_transaction(room_id, |tx| async {
let tx = tx;
let location_kind;
let location_project_id;
@@ -1769,7 +1830,7 @@ impl Database {
if result.rows_affected == 1 {
let room = self.get_room(room_id, &tx).await?;
- Ok((room_id, room))
+ Ok(room)
} else {
Err(anyhow!("could not update room participant location"))?
}
@@ -1926,12 +1987,25 @@ impl Database {
}
}
}
+ drop(db_projects);
+
+ let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
+ let mut followers = Vec::new();
+ while let Some(db_follower) = db_followers.next().await {
+ let db_follower = db_follower?;
+ followers.push(proto::Follower {
+ leader_id: Some(db_follower.leader_connection().into()),
+ follower_id: Some(db_follower.follower_connection().into()),
+ project_id: db_follower.project_id.to_proto(),
+ });
+ }
Ok(proto::Room {
id: db_room.id.to_proto(),
live_kit_room: db_room.live_kit_room,
participants: participants.into_values().collect(),
pending_participants,
+ followers,
})
}
@@ -1963,7 +2037,7 @@ impl Database {
connection: ConnectionId,
worktrees: &[proto::WorktreeMetadata],
) -> Result> {
- self.room_transaction(|tx| async move {
+ self.room_transaction(room_id, |tx| async move {
let participant = room_participant::Entity::find()
.filter(
Condition::all()
@@ -2024,7 +2098,7 @@ impl Database {
.await?;
let room = self.get_room(room_id, &tx).await?;
- Ok((room_id, (project.id, room)))
+ Ok((project.id, room))
})
.await
}
@@ -2034,7 +2108,8 @@ impl Database {
project_id: ProjectId,
connection: ConnectionId,
) -> Result)>> {
- self.room_transaction(|tx| async move {
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
let project = project::Entity::find_by_id(project_id)
@@ -2042,12 +2117,11 @@ impl Database {
.await?
.ok_or_else(|| anyhow!("project not found"))?;
if project.host_connection()? == connection {
- let room_id = project.room_id;
project::Entity::delete(project.into_active_model())
.exec(&*tx)
.await?;
let room = self.get_room(room_id, &tx).await?;
- Ok((room_id, (room, guest_connection_ids)))
+ Ok((room, guest_connection_ids))
} else {
Err(anyhow!("cannot unshare a project hosted by another user"))?
}
@@ -2061,7 +2135,8 @@ impl Database {
connection: ConnectionId,
worktrees: &[proto::WorktreeMetadata],
) -> Result)>> {
- self.room_transaction(|tx| async move {
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
let project = project::Entity::find_by_id(project_id)
.filter(
Condition::all()
@@ -2079,7 +2154,7 @@ impl Database {
let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
let room = self.get_room(project.room_id, &tx).await?;
- Ok((project.room_id, (room, guest_connection_ids)))
+ Ok((room, guest_connection_ids))
})
.await
}
@@ -2124,12 +2199,12 @@ impl Database {
update: &proto::UpdateWorktree,
connection: ConnectionId,
) -> Result>> {
- self.room_transaction(|tx| async move {
- let project_id = ProjectId::from_proto(update.project_id);
- let worktree_id = update.worktree_id as i64;
-
+ let project_id = ProjectId::from_proto(update.project_id);
+ let worktree_id = update.worktree_id as i64;
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
// Ensure the update comes from the host.
- let project = project::Entity::find_by_id(project_id)
+ let _project = project::Entity::find_by_id(project_id)
.filter(
Condition::all()
.add(project::Column::HostConnectionId.eq(connection.id as i32))
@@ -2140,7 +2215,6 @@ impl Database {
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
- let room_id = project.room_id;
// Update metadata.
worktree::Entity::update(worktree::ActiveModel {
@@ -2220,7 +2294,7 @@ impl Database {
}
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
- Ok((room_id, connection_ids))
+ Ok(connection_ids)
})
.await
}
@@ -2230,9 +2304,10 @@ impl Database {
update: &proto::UpdateDiagnosticSummary,
connection: ConnectionId,
) -> Result>> {
- self.room_transaction(|tx| async move {
- let project_id = ProjectId::from_proto(update.project_id);
- let worktree_id = update.worktree_id as i64;
+ let project_id = ProjectId::from_proto(update.project_id);
+ let worktree_id = update.worktree_id as i64;
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
let summary = update
.summary
.as_ref()
@@ -2274,7 +2349,7 @@ impl Database {
.await?;
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
- Ok((project.room_id, connection_ids))
+ Ok(connection_ids)
})
.await
}
@@ -2284,8 +2359,9 @@ impl Database {
update: &proto::StartLanguageServer,
connection: ConnectionId,
) -> Result>> {
- self.room_transaction(|tx| async move {
- let project_id = ProjectId::from_proto(update.project_id);
+ let project_id = ProjectId::from_proto(update.project_id);
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
let server = update
.server
.as_ref()
@@ -2319,7 +2395,7 @@ impl Database {
.await?;
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
- Ok((project.room_id, connection_ids))
+ Ok(connection_ids)
})
.await
}
@@ -2329,7 +2405,8 @@ impl Database {
project_id: ProjectId,
connection: ConnectionId,
) -> Result> {
- self.room_transaction(|tx| async move {
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
let participant = room_participant::Entity::find()
.filter(
Condition::all()
@@ -2455,7 +2532,6 @@ impl Database {
.all(&*tx)
.await?;
- let room_id = project.room_id;
let project = Project {
collaborators: collaborators
.into_iter()
@@ -2475,7 +2551,7 @@ impl Database {
})
.collect(),
};
- Ok((room_id, (project, replica_id as ReplicaId)))
+ Ok((project, replica_id as ReplicaId))
})
.await
}
@@ -2485,7 +2561,8 @@ impl Database {
project_id: ProjectId,
connection: ConnectionId,
) -> Result> {
- self.room_transaction(|tx| async move {
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
let result = project_collaborator::Entity::delete_many()
.filter(
Condition::all()
@@ -2521,7 +2598,7 @@ impl Database {
host_connection_id: project.host_connection()?,
connection_ids,
};
- Ok((project.room_id, left_project))
+ Ok(left_project)
})
.await
}
@@ -2531,11 +2608,8 @@ impl Database {
project_id: ProjectId,
connection_id: ConnectionId,
) -> Result>> {
- self.room_transaction(|tx| async move {
- let project = project::Entity::find_by_id(project_id)
- .one(&*tx)
- .await?
- .ok_or_else(|| anyhow!("no such project"))?;
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
let collaborators = project_collaborator::Entity::find()
.filter(project_collaborator::Column::ProjectId.eq(project_id))
.all(&*tx)
@@ -2553,7 +2627,7 @@ impl Database {
.iter()
.any(|collaborator| collaborator.connection_id == connection_id)
{
- Ok((project.room_id, collaborators))
+ Ok(collaborators)
} else {
Err(anyhow!("no such project"))?
}
@@ -2566,11 +2640,8 @@ impl Database {
project_id: ProjectId,
connection_id: ConnectionId,
) -> Result>> {
- self.room_transaction(|tx| async move {
- let project = project::Entity::find_by_id(project_id)
- .one(&*tx)
- .await?
- .ok_or_else(|| anyhow!("no such project"))?;
+ let room_id = self.room_id_for_project(project_id).await?;
+ self.room_transaction(room_id, |tx| async move {
let mut collaborators = project_collaborator::Entity::find()
.filter(project_collaborator::Column::ProjectId.eq(project_id))
.stream(&*tx)
@@ -2583,7 +2654,7 @@ impl Database {
}
if connection_ids.contains(&connection_id) {
- Ok((project.room_id, connection_ids))
+ Ok(connection_ids)
} else {
Err(anyhow!("no such project"))?
}
@@ -2613,6 +2684,17 @@ impl Database {
Ok(guest_connection_ids)
}
+ async fn room_id_for_project(&self, project_id: ProjectId) -> Result {
+ self.transaction(|tx| async move {
+ let project = project::Entity::find_by_id(project_id)
+ .one(&*tx)
+ .await?
+ .ok_or_else(|| anyhow!("project {} not found", project_id))?;
+ Ok(project.room_id)
+ })
+ .await
+ }
+
// access tokens
pub async fn create_access_token_hash(
@@ -2763,21 +2845,48 @@ impl Database {
self.run(body).await
}
- async fn room_transaction(&self, f: F) -> Result>
+ async fn room_transaction(&self, room_id: RoomId, f: F) -> Result>
where
F: Send + Fn(TransactionHandle) -> Fut,
- Fut: Send + Future