From c9225bb87c269d51fa0cbc058a13d6b4a51e232b Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 17 Oct 2022 12:20:55 +0200 Subject: [PATCH] WIP: Start integrating with LiveKit when creating/joining rooms --- Cargo.lock | 1 + crates/call/src/room.rs | 3 +- crates/collab/Cargo.toml | 2 + crates/collab/src/integration_tests.rs | 1 + crates/collab/src/main.rs | 20 +++++++++ crates/collab/src/rpc.rs | 59 +++++++++++++++++++++++--- crates/collab/src/rpc/store.rs | 33 ++++++++------ crates/live_kit_server/src/api.rs | 15 +++++++ crates/rpc/proto/zed.proto | 10 +++-- 9 files changed, 120 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51e037e748d019ab726e4f67fafd280294197ab3..4ea784749dc3b1a9538056d6e12108e7c9800cfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1100,6 +1100,7 @@ dependencies = [ "language", "lazy_static", "lipsum", + "live_kit_server", "log", "lsp", "nanoid", diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 09b49716e03b69c178ad8b19b5eb8a8fcd72bacd..258400da9242f4d8ebb4a8986b613d578b7bbd03 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -94,7 +94,8 @@ impl Room { ) -> Task>> { cx.spawn(|mut cx| async move { let response = client.request(proto::CreateRoom {}).await?; - let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx)); + let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?; + let room = cx.add_model(|cx| Self::new(room_proto.id, client, user_store, cx)); let initial_project_id = if let Some(initial_project) = initial_project { let initial_project_id = room diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index de41e8a1f3e0827e31aa7654381b07819d9a6abd..8ce42afb64ffae142f1009ea02d97972b5404dbd 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -14,8 +14,10 @@ required-features = ["seed-support"] [dependencies] collections = { path = "../collections" } +live_kit_server = { path = "../live_kit_server" } rpc = { path = "../rpc" } util = { path = "../util" } + anyhow = "1.0.40" async-trait = "0.1.50" async-tungstenite = "0.16" diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 90d7b6d4b582f7254ba80a132c8fea530ad479dd..44a70e71e92e2ccf26699bc82e9e002a4f2f1e48 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -6321,6 +6321,7 @@ impl TestServer { async fn build_app_state(test_db: &TestDb) -> Arc { Arc::new(AppState { db: test_db.db().clone(), + live_kit_client: None, api_token: Default::default(), invite_link_prefix: Default::default(), }) diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index 272d52cc954140b71d18d2b1296e6613753be581..49a82bb9267664708070a05bfc9f8421cddcce9b 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -30,6 +30,9 @@ pub struct Config { pub invite_link_prefix: String, pub honeycomb_api_key: Option, pub honeycomb_dataset: Option, + pub live_kit_server: Option, + pub live_kit_key: Option, + pub live_kit_secret: Option, pub rust_log: Option, pub log_json: Option, } @@ -38,13 +41,30 @@ pub struct AppState { db: Arc, api_token: String, invite_link_prefix: String, + live_kit_client: Option, } impl AppState { async fn new(config: &Config) -> Result> { let db = PostgresDb::new(&config.database_url, 5).await?; + let live_kit_client = if let Some(((server, key), secret)) = config + .live_kit_server + .as_ref() + .zip(config.live_kit_key.as_ref()) + .zip(config.live_kit_secret.as_ref()) + { + Some(live_kit_server::api::Client::new( + server.clone(), + key.clone(), + secret.clone(), + )) + } else { + None + }; + let this = Self { db: Arc::new(db), + live_kit_client, api_token: config.api_token.clone(), invite_link_prefix: config.invite_link_prefix.clone(), }; diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 564e173fec3430ab27f7d52d2b9e90960cdb76d7..8983f69f9bac2a2d5457d27c7bbe880ec8da312e 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -5,7 +5,7 @@ use crate::{ db::{self, ChannelId, MessageId, ProjectId, User, UserId}, AppState, Result, }; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_tungstenite::tungstenite::{ protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage, }; @@ -49,6 +49,7 @@ use std::{ }, time::Duration, }; +pub use store::{Store, Worktree}; use time::OffsetDateTime; use tokio::{ sync::{Mutex, MutexGuard}, @@ -57,8 +58,6 @@ use tokio::{ use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; -pub use store::{Store, Worktree}; - lazy_static! { static ref METRIC_CONNECTIONS: IntGauge = register_int_gauge!("connections", "number of connections").unwrap(); @@ -597,13 +596,45 @@ impl Server { response: Response, ) -> Result<()> { let user_id; - let room_id; + let room; { let mut store = self.store().await; user_id = store.user_id_for_connection(request.sender_id)?; - room_id = store.create_room(request.sender_id)?; + room = store.create_room(request.sender_id)?.clone(); } - response.send(proto::CreateRoomResponse { id: room_id })?; + + let live_kit_token = if let Some(live_kit) = self.app_state.live_kit_client.as_ref() { + if let Some(_) = live_kit + .create_room(room.live_kit_room.clone()) + .await + .with_context(|| { + format!( + "error creating LiveKit room (LiveKit room: {}, Zed room: {})", + room.live_kit_room, room.id + ) + }) + .trace_err() + { + live_kit + .room_token_for_user(&room.live_kit_room, &user_id.to_string()) + .with_context(|| { + format!( + "error creating LiveKit access token (LiveKit room: {}, Zed room: {})", + room.live_kit_room, room.id + ) + }) + .trace_err() + } else { + None + } + } else { + None + }; + + response.send(proto::CreateRoomResponse { + room: Some(room), + live_kit_token, + })?; self.update_user_contacts(user_id).await?; Ok(()) } @@ -624,8 +655,24 @@ impl Server { .send(recipient_id, proto::CallCanceled {}) .trace_err(); } + + let live_kit_token = if let Some(live_kit) = self.app_state.live_kit_client.as_ref() { + live_kit + .room_token_for_user(&room.live_kit_room, &user_id.to_string()) + .with_context(|| { + format!( + "error creating LiveKit access token (LiveKit room: {}, Zed room: {})", + room.live_kit_room, room.id + ) + }) + .trace_err() + } else { + None + }; + response.send(proto::JoinRoomResponse { room: Some(room.clone()), + live_kit_token, })?; self.room_updated(room); } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index b7dd39cff17528172086faa385c9972858d1f874..096ac0fa063900faa3fcc1615dc07a8158b81fa5 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -1,6 +1,7 @@ use crate::db::{self, ChannelId, ProjectId, UserId}; use anyhow::{anyhow, Result}; use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet}; +use nanoid::nanoid; use rpc::{proto, ConnectionId}; use serde::Serialize; use std::{mem, path::PathBuf, str, time::Duration}; @@ -339,7 +340,7 @@ impl Store { } } - pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result { + pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> { let connection = self .connections .get_mut(&creator_connection_id) @@ -353,19 +354,23 @@ impl Store { "can't create a room with an active call" ); - let mut room = proto::Room::default(); - room.participants.push(proto::Participant { - user_id: connection.user_id.to_proto(), - peer_id: creator_connection_id.0, - projects: Default::default(), - location: Some(proto::ParticipantLocation { - variant: Some(proto::participant_location::Variant::External( - proto::participant_location::External {}, - )), - }), - }); - let room_id = post_inc(&mut self.next_room_id); + let room = proto::Room { + id: room_id, + participants: vec![proto::Participant { + user_id: connection.user_id.to_proto(), + peer_id: creator_connection_id.0, + projects: Default::default(), + location: Some(proto::ParticipantLocation { + variant: Some(proto::participant_location::Variant::External( + proto::participant_location::External {}, + )), + }), + }], + pending_participant_user_ids: Default::default(), + live_kit_room: nanoid!(30), + }; + self.rooms.insert(room_id, room); connected_user.active_call = Some(Call { caller_user_id: connection.user_id, @@ -373,7 +378,7 @@ impl Store { connection_id: Some(creator_connection_id), initial_project_id: None, }); - Ok(room_id) + Ok(self.rooms.get(&room_id).unwrap()) } pub fn join_room( diff --git a/crates/live_kit_server/src/api.rs b/crates/live_kit_server/src/api.rs index 9abf3bc7c62e4870f31ad284436c303fe8d493e5..2bbad785c3f5996a2e11e10f6beb7fb35c20a233 100644 --- a/crates/live_kit_server/src/api.rs +++ b/crates/live_kit_server/src/api.rs @@ -54,6 +54,21 @@ impl Client { } } + pub fn room_token_for_user(&self, room: &str, identity: &str) -> Result { + token::create( + &self.key, + &self.secret, + Some(identity), + token::VideoGrant { + room: Some(room), + room_join: Some(true), + can_publish: Some(true), + can_subscribe: Some(true), + ..Default::default() + }, + ) + } + fn request( &self, path: &str, diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 1248bb05519788a2d4ac1f198161afae3733a6ab..ee612e049987cd634265f9f8d8626f9558978773 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -140,7 +140,8 @@ message Test { message CreateRoom {} message CreateRoomResponse { - uint64 id = 1; + Room room = 1; + optional string live_kit_token = 2; } message JoinRoom { @@ -149,6 +150,7 @@ message JoinRoom { message JoinRoomResponse { Room room = 1; + optional string live_kit_token = 2; } message LeaveRoom { @@ -156,8 +158,10 @@ message LeaveRoom { } message Room { - repeated Participant participants = 1; - repeated uint64 pending_participant_user_ids = 2; + uint64 id = 1; + repeated Participant participants = 2; + repeated uint64 pending_participant_user_ids = 3; + string live_kit_room = 4; } message Participant {