WIP: Start integrating with LiveKit when creating/joining rooms

Antonio Scandurra created

Change summary

Cargo.lock                             |  1 
crates/call/src/room.rs                |  3 
crates/collab/Cargo.toml               |  2 
crates/collab/src/integration_tests.rs |  1 
crates/collab/src/main.rs              | 20 +++++++++
crates/collab/src/rpc.rs               | 59 +++++++++++++++++++++++++--
crates/collab/src/rpc/store.rs         | 33 +++++++++------
crates/live_kit_server/src/api.rs      | 15 +++++++
crates/rpc/proto/zed.proto             | 10 +++-
9 files changed, 120 insertions(+), 24 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1100,6 +1100,7 @@ dependencies = [
  "language",
  "lazy_static",
  "lipsum",
+ "live_kit_server",
  "log",
  "lsp",
  "nanoid",

crates/call/src/room.rs 🔗

@@ -94,7 +94,8 @@ impl Room {
     ) -> Task<Result<ModelHandle<Self>>> {
         cx.spawn(|mut cx| async move {
             let response = client.request(proto::CreateRoom {}).await?;
-            let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
+            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
+            let room = cx.add_model(|cx| Self::new(room_proto.id, client, user_store, cx));
 
             let initial_project_id = if let Some(initial_project) = initial_project {
                 let initial_project_id = room

crates/collab/Cargo.toml 🔗

@@ -14,8 +14,10 @@ required-features = ["seed-support"]
 
 [dependencies]
 collections = { path = "../collections" }
+live_kit_server = { path = "../live_kit_server" }
 rpc = { path = "../rpc" }
 util = { path = "../util" }
+
 anyhow = "1.0.40"
 async-trait = "0.1.50"
 async-tungstenite = "0.16"

crates/collab/src/integration_tests.rs 🔗

@@ -6321,6 +6321,7 @@ impl TestServer {
     async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
         Arc::new(AppState {
             db: test_db.db().clone(),
+            live_kit_client: None,
             api_token: Default::default(),
             invite_link_prefix: Default::default(),
         })

crates/collab/src/main.rs 🔗

@@ -30,6 +30,9 @@ pub struct Config {
     pub invite_link_prefix: String,
     pub honeycomb_api_key: Option<String>,
     pub honeycomb_dataset: Option<String>,
+    pub live_kit_server: Option<String>,
+    pub live_kit_key: Option<String>,
+    pub live_kit_secret: Option<String>,
     pub rust_log: Option<String>,
     pub log_json: Option<bool>,
 }
@@ -38,13 +41,30 @@ pub struct AppState {
     db: Arc<dyn Db>,
     api_token: String,
     invite_link_prefix: String,
+    live_kit_client: Option<live_kit_server::api::Client>,
 }
 
 impl AppState {
     async fn new(config: &Config) -> Result<Arc<Self>> {
         let db = PostgresDb::new(&config.database_url, 5).await?;
+        let live_kit_client = if let Some(((server, key), secret)) = config
+            .live_kit_server
+            .as_ref()
+            .zip(config.live_kit_key.as_ref())
+            .zip(config.live_kit_secret.as_ref())
+        {
+            Some(live_kit_server::api::Client::new(
+                server.clone(),
+                key.clone(),
+                secret.clone(),
+            ))
+        } else {
+            None
+        };
+
         let this = Self {
             db: Arc::new(db),
+            live_kit_client,
             api_token: config.api_token.clone(),
             invite_link_prefix: config.invite_link_prefix.clone(),
         };

crates/collab/src/rpc.rs 🔗

@@ -5,7 +5,7 @@ use crate::{
     db::{self, ChannelId, MessageId, ProjectId, User, UserId},
     AppState, Result,
 };
-use anyhow::anyhow;
+use anyhow::{anyhow, Context};
 use async_tungstenite::tungstenite::{
     protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
 };
@@ -49,6 +49,7 @@ use std::{
     },
     time::Duration,
 };
+pub use store::{Store, Worktree};
 use time::OffsetDateTime;
 use tokio::{
     sync::{Mutex, MutexGuard},
@@ -57,8 +58,6 @@ use tokio::{
 use tower::ServiceBuilder;
 use tracing::{info_span, instrument, Instrument};
 
-pub use store::{Store, Worktree};
-
 lazy_static! {
     static ref METRIC_CONNECTIONS: IntGauge =
         register_int_gauge!("connections", "number of connections").unwrap();
@@ -597,13 +596,45 @@ impl Server {
         response: Response<proto::CreateRoom>,
     ) -> Result<()> {
         let user_id;
-        let room_id;
+        let room;
         {
             let mut store = self.store().await;
             user_id = store.user_id_for_connection(request.sender_id)?;
-            room_id = store.create_room(request.sender_id)?;
+            room = store.create_room(request.sender_id)?.clone();
         }
-        response.send(proto::CreateRoomResponse { id: room_id })?;
+
+        let live_kit_token = if let Some(live_kit) = self.app_state.live_kit_client.as_ref() {
+            if let Some(_) = live_kit
+                .create_room(room.live_kit_room.clone())
+                .await
+                .with_context(|| {
+                    format!(
+                        "error creating LiveKit room (LiveKit room: {}, Zed room: {})",
+                        room.live_kit_room, room.id
+                    )
+                })
+                .trace_err()
+            {
+                live_kit
+                    .room_token_for_user(&room.live_kit_room, &user_id.to_string())
+                    .with_context(|| {
+                        format!(
+                            "error creating LiveKit access token (LiveKit room: {}, Zed room: {})",
+                            room.live_kit_room, room.id
+                        )
+                    })
+                    .trace_err()
+            } else {
+                None
+            }
+        } else {
+            None
+        };
+
+        response.send(proto::CreateRoomResponse {
+            room: Some(room),
+            live_kit_token,
+        })?;
         self.update_user_contacts(user_id).await?;
         Ok(())
     }
@@ -624,8 +655,24 @@ impl Server {
                     .send(recipient_id, proto::CallCanceled {})
                     .trace_err();
             }
+
+            let live_kit_token = if let Some(live_kit) = self.app_state.live_kit_client.as_ref() {
+                live_kit
+                    .room_token_for_user(&room.live_kit_room, &user_id.to_string())
+                    .with_context(|| {
+                        format!(
+                            "error creating LiveKit access token (LiveKit room: {}, Zed room: {})",
+                            room.live_kit_room, room.id
+                        )
+                    })
+                    .trace_err()
+            } else {
+                None
+            };
+
             response.send(proto::JoinRoomResponse {
                 room: Some(room.clone()),
+                live_kit_token,
             })?;
             self.room_updated(room);
         }

crates/collab/src/rpc/store.rs 🔗

@@ -1,6 +1,7 @@
 use crate::db::{self, ChannelId, ProjectId, UserId};
 use anyhow::{anyhow, Result};
 use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
+use nanoid::nanoid;
 use rpc::{proto, ConnectionId};
 use serde::Serialize;
 use std::{mem, path::PathBuf, str, time::Duration};
@@ -339,7 +340,7 @@ impl Store {
         }
     }
 
-    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
+    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
         let connection = self
             .connections
             .get_mut(&creator_connection_id)
@@ -353,19 +354,23 @@ impl Store {
             "can't create a room with an active call"
         );
 
-        let mut room = proto::Room::default();
-        room.participants.push(proto::Participant {
-            user_id: connection.user_id.to_proto(),
-            peer_id: creator_connection_id.0,
-            projects: Default::default(),
-            location: Some(proto::ParticipantLocation {
-                variant: Some(proto::participant_location::Variant::External(
-                    proto::participant_location::External {},
-                )),
-            }),
-        });
-
         let room_id = post_inc(&mut self.next_room_id);
+        let room = proto::Room {
+            id: room_id,
+            participants: vec![proto::Participant {
+                user_id: connection.user_id.to_proto(),
+                peer_id: creator_connection_id.0,
+                projects: Default::default(),
+                location: Some(proto::ParticipantLocation {
+                    variant: Some(proto::participant_location::Variant::External(
+                        proto::participant_location::External {},
+                    )),
+                }),
+            }],
+            pending_participant_user_ids: Default::default(),
+            live_kit_room: nanoid!(30),
+        };
+
         self.rooms.insert(room_id, room);
         connected_user.active_call = Some(Call {
             caller_user_id: connection.user_id,
@@ -373,7 +378,7 @@ impl Store {
             connection_id: Some(creator_connection_id),
             initial_project_id: None,
         });
-        Ok(room_id)
+        Ok(self.rooms.get(&room_id).unwrap())
     }
 
     pub fn join_room(

crates/live_kit_server/src/api.rs 🔗

@@ -54,6 +54,21 @@ impl Client {
         }
     }
 
+    pub fn room_token_for_user(&self, room: &str, identity: &str) -> Result<String> {
+        token::create(
+            &self.key,
+            &self.secret,
+            Some(identity),
+            token::VideoGrant {
+                room: Some(room),
+                room_join: Some(true),
+                can_publish: Some(true),
+                can_subscribe: Some(true),
+                ..Default::default()
+            },
+        )
+    }
+
     fn request<Req, Res>(
         &self,
         path: &str,

crates/rpc/proto/zed.proto 🔗

@@ -140,7 +140,8 @@ message Test {
 message CreateRoom {}
 
 message CreateRoomResponse {
-    uint64 id = 1;
+    Room room = 1;
+    optional string live_kit_token = 2;
 }
 
 message JoinRoom {
@@ -149,6 +150,7 @@ message JoinRoom {
 
 message JoinRoomResponse {
     Room room = 1;
+    optional string live_kit_token = 2;
 }
 
 message LeaveRoom {
@@ -156,8 +158,10 @@ message LeaveRoom {
 }
 
 message Room {
-    repeated Participant participants = 1;
-    repeated uint64 pending_participant_user_ids = 2;
+    uint64 id = 1;
+    repeated Participant participants = 2;
+    repeated uint64 pending_participant_user_ids = 3;
+    string live_kit_room = 4;
 }
 
 message Participant {