From 75c339851fd8e8898ce02739abe7eb2061a4ea93 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 17 Oct 2022 11:24:09 +0200 Subject: [PATCH] Add `live_kit_server::api::Client::{create,delete}_room` --- Cargo.lock | 3 +- crates/live_kit_server/Cargo.toml | 3 +- crates/live_kit_server/src/api.rs | 92 ++++++++++++++++++++----------- 3 files changed, 65 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 16417f6906823408e1c866e7f60519375a99c1f2..51e037e748d019ab726e4f67fafd280294197ab3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3202,12 +3202,13 @@ name = "live_kit_server" version = "0.1.0" dependencies = [ "anyhow", + "futures 0.3.24", "hmac 0.12.1", - "hyper", "jwt", "prost 0.8.0", "prost-build", "prost-types 0.8.0", + "reqwest", "serde", "sha2 0.10.6", ] diff --git a/crates/live_kit_server/Cargo.toml b/crates/live_kit_server/Cargo.toml index 51d134ca956f889368dbf782177d99281d10abcf..7cc9d82333efe3d64000e95330df03816f27e44f 100644 --- a/crates/live_kit_server/Cargo.toml +++ b/crates/live_kit_server/Cargo.toml @@ -10,11 +10,12 @@ doctest = false [dependencies] anyhow = "1.0.38" +futures = "0.3" hmac = "0.12" jwt = "0.16" -hyper = { version = "0.14", features = ["client", "http1"] } prost = "0.8" prost-types = "0.8" +reqwest = "0.11" serde = { version = "1.0", features = ["derive", "rc"] } sha2 = "0.10" diff --git a/crates/live_kit_server/src/api.rs b/crates/live_kit_server/src/api.rs index 52a911c5935a1664761166be1112844eef312dd4..9abf3bc7c62e4870f31ad284436c303fe8d493e5 100644 --- a/crates/live_kit_server/src/api.rs +++ b/crates/live_kit_server/src/api.rs @@ -1,21 +1,24 @@ use crate::{proto, token}; use anyhow::{anyhow, Result}; -use hyper::{client::HttpConnector, header::AUTHORIZATION, Method, Request, Uri}; +use prost::Message; +use reqwest::header::CONTENT_TYPE; use std::future::Future; pub struct Client { - http: hyper::Client, - uri: Uri, + http: reqwest::Client, + uri: String, key: String, secret: String, } impl Client { - pub fn new(uri: Uri, key: String, secret: String) -> Self { - assert!(uri.scheme().is_some(), "base uri must have a scheme"); - assert!(uri.authority().is_some(), "base uri must have an authority"); + pub fn new(mut uri: String, key: String, secret: String) -> Self { + if uri.ends_with('/') { + uri.pop(); + } + Self { - http: hyper::Client::new(), + http: reqwest::Client::new(), uri, key, secret, @@ -23,39 +26,66 @@ impl Client { } pub fn create_room(&self, name: String) -> impl Future> { - let token = token::create( - &self.key, - &self.secret, - None, + self.request( + "twirp/livekit.RoomService/CreateRoom", token::VideoGrant { room_create: Some(true), ..Default::default() }, + proto::CreateRoomRequest { + name, + ..Default::default() + }, + ) + } + + pub fn delete_room(&self, name: String) -> impl Future> { + let response = self.request( + "twirp/livekit.RoomService/DeleteRoom", + token::VideoGrant { + room_create: Some(true), + ..Default::default() + }, + proto::DeleteRoomRequest { room: name }, ); + async move { + response.await?; + Ok(()) + } + } + fn request( + &self, + path: &str, + grant: token::VideoGrant, + body: Req, + ) -> impl Future> + where + Req: Message, + Res: Default + Message, + { let client = self.http.clone(); - let uri = Uri::builder() - .scheme(self.uri.scheme().unwrap().clone()) - .authority(self.uri.authority().unwrap().clone()) - .path_and_query("twirp/livekit.RoomService/CreateRoom") - .build(); + let token = token::create(&self.key, &self.secret, None, grant); + let uri = format!("{}/{}", self.uri, path); async move { let token = token?; - let uri = uri?; - let body = proto::CreateRoomRequest { - name: todo!(), - empty_timeout: todo!(), - max_participants: todo!(), - node_id: todo!(), - metadata: todo!(), - egress: todo!(), - }; - let mut request = Request::builder() - .uri(uri) - .method(Method::POST) - .header(AUTHORIZATION, format!("Bearer {}", token)) - .body(body); - Err(anyhow!("yeah")) + let response = client + .post(&uri) + .header(CONTENT_TYPE, "application/protobuf") + .bearer_auth(token) + .body(body.encode_to_vec()) + .send() + .await?; + if response.status().is_success() { + Ok(Res::decode(response.bytes().await?)?) + } else { + Err(anyhow!( + "POST {} failed with status code {:?}, {:?}", + uri, + response.status(), + response.text().await + )) + } } } }