WIP - make livekit work in GPUI2

Mikayla created

Change summary

Cargo.lock                                                                      |  35 
crates/call2/Cargo.toml                                                         |   6 
crates/call2/src/participant.rs                                                 |   4 
crates/call2/src/room.rs                                                        | 233 
crates/live_kit_client/LiveKitBridge/Package.resolved                           |   4 
crates/live_kit_client2/.cargo/config.toml                                      |   2 
crates/live_kit_client2/Cargo.toml                                              |  71 
crates/live_kit_client2/LiveKitBridge/Package.resolved                          |  52 
crates/live_kit_client2/LiveKitBridge/Package.swift                             |  27 
crates/live_kit_client2/LiveKitBridge/README.md                                 |   3 
crates/live_kit_client2/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift | 327 
crates/live_kit_client2/build.rs                                                | 172 
crates/live_kit_client2/examples/test_app.rs                                    | 175 
crates/live_kit_client2/src/live_kit_client2.rs                                 |  11 
crates/live_kit_client2/src/prod.rs                                             | 943 
crates/live_kit_client2/src/test.rs                                             | 647 
16 files changed, 2,590 insertions(+), 122 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1169,7 +1169,7 @@ dependencies = [
  "futures 0.3.28",
  "gpui2",
  "language2",
- "live_kit_client",
+ "live_kit_client2",
  "log",
  "media",
  "postage",
@@ -4589,6 +4589,39 @@ dependencies = [
  "simplelog",
 ]
 
+[[package]]
+name = "live_kit_client2"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "async-broadcast",
+ "async-trait",
+ "block",
+ "byteorder",
+ "bytes 1.5.0",
+ "cocoa",
+ "collections",
+ "core-foundation",
+ "core-graphics",
+ "foreign-types",
+ "futures 0.3.28",
+ "gpui2",
+ "hmac 0.12.1",
+ "jwt",
+ "live_kit_server",
+ "log",
+ "media",
+ "nanoid",
+ "objc",
+ "parking_lot 0.11.2",
+ "postage",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "sha2 0.10.7",
+ "simplelog",
+]
+
 [[package]]
 name = "live_kit_server"
 version = "0.1.0"

crates/call2/Cargo.toml 🔗

@@ -13,7 +13,7 @@ test-support = [
     "client2/test-support",
     "collections/test-support",
     "gpui2/test-support",
-    "live_kit_client/test-support",
+    "live_kit_client2/test-support",
     "project2/test-support",
     "util/test-support"
 ]
@@ -24,7 +24,7 @@ client2 = { path = "../client2" }
 collections = { path = "../collections" }
 gpui2 = { path = "../gpui2" }
 log.workspace = true
-live_kit_client = { path = "../live_kit_client" }
+live_kit_client2 = { path = "../live_kit_client2" }
 fs2 = { path = "../fs2" }
 language2 = { path = "../language2" }
 media = { path = "../media" }
@@ -47,6 +47,6 @@ fs2 = { path = "../fs2", features = ["test-support"] }
 language2 = { path = "../language2", features = ["test-support"] }
 collections = { path = "../collections", features = ["test-support"] }
 gpui2 = { path = "../gpui2", features = ["test-support"] }
-live_kit_client = { path = "../live_kit_client", features = ["test-support"] }
+live_kit_client2 = { path = "../live_kit_client2", features = ["test-support"] }
 project2 = { path = "../project2", features = ["test-support"] }
 util = { path = "../util", features = ["test-support"] }

crates/call2/src/participant.rs 🔗

@@ -2,7 +2,7 @@ use anyhow::{anyhow, Result};
 use client2::ParticipantIndex;
 use client2::{proto, User};
 use gpui2::WeakModel;
-pub use live_kit_client::Frame;
+pub use live_kit_client2::Frame;
 use project2::Project;
 use std::{fmt, sync::Arc};
 
@@ -51,7 +51,7 @@ pub struct RemoteParticipant {
 
 #[derive(Clone)]
 pub struct RemoteVideoTrack {
-    pub(crate) live_kit_track: Arc<live_kit_client::RemoteVideoTrack>,
+    pub(crate) live_kit_track: Arc<live_kit_client2::RemoteVideoTrack>,
 }
 
 unsafe impl Send for RemoteVideoTrack {}

crates/call2/src/room.rs 🔗

@@ -19,7 +19,7 @@ use gpui2::{
     AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
 };
 use language2::LanguageRegistry;
-use live_kit_client::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate};
+use live_kit_client2::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate};
 use postage::{sink::Sink, stream::Stream, watch};
 use project2::Project;
 use settings2::Settings;
@@ -59,7 +59,7 @@ pub enum Event {
 pub struct Room {
     id: u64,
     channel_id: Option<u64>,
-    // live_kit: Option<LiveKitRoom>,
+    live_kit: Option<LiveKitRoom>,
     status: RoomStatus,
     shared_projects: HashSet<WeakModel<Project>>,
     joined_projects: HashSet<WeakModel<Project>>,
@@ -114,125 +114,130 @@ impl Room {
         user_store: Model<UserStore>,
         cx: &mut ModelContext<Self>,
     ) -> Self {
-        todo!()
-        // let _live_kit_room = if let Some(connection_info) = live_kit_connection_info {
-        //     let room = live_kit_client::Room::new();
-        //     let mut status = room.status();
-        //     // Consume the initial status of the room.
-        //     let _ = status.try_recv();
-        //     let _maintain_room = cx.spawn(|this, mut cx| async move {
-        //         while let Some(status) = status.next().await {
-        //             let this = if let Some(this) = this.upgrade() {
-        //                 this
-        //             } else {
-        //                 break;
-        //             };
+        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
+            let room = live_kit_client2::Room::new();
+            let mut status = room.status();
+            // Consume the initial status of the room.
+            let _ = status.try_recv();
+            let _maintain_room = cx.spawn(|this, mut cx| async move {
+                while let Some(status) = status.next().await {
+                    let this = if let Some(this) = this.upgrade() {
+                        this
+                    } else {
+                        break;
+                    };
 
-        //             if status == live_kit_client::ConnectionState::Disconnected {
-        //                 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
-        //                     .ok();
-        //                 break;
-        //             }
-        //         }
-        //     });
+                    if status == live_kit_client2::ConnectionState::Disconnected {
+                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
+                            .ok();
+                        break;
+                    }
+                }
+            });
 
-        //     let mut track_video_changes = room.remote_video_track_updates();
-        //     let _maintain_video_tracks = cx.spawn(|this, mut cx| async move {
-        //         while let Some(track_change) = track_video_changes.next().await {
-        //             let this = if let Some(this) = this.upgrade() {
-        //                 this
-        //             } else {
-        //                 break;
-        //             };
+            let _maintain_video_tracks = cx.spawn_on_main({
+                let room = room.clone();
+                move |this, mut cx| async move {
+                    let mut track_video_changes = room.remote_video_track_updates();
+                    while let Some(track_change) = track_video_changes.next().await {
+                        let this = if let Some(this) = this.upgrade() {
+                            this
+                        } else {
+                            break;
+                        };
 
-        //             this.update(&mut cx, |this, cx| {
-        //                 this.remote_video_track_updated(track_change, cx).log_err()
-        //             })
-        //             .ok();
-        //         }
-        //     });
+                        this.update(&mut cx, |this, cx| {
+                            this.remote_video_track_updated(track_change, cx).log_err()
+                        })
+                        .ok();
+                    }
+                }
+            });
 
-        //     let mut track_audio_changes = room.remote_audio_track_updates();
-        //     let _maintain_audio_tracks = cx.spawn(|this, mut cx| async move {
-        //         while let Some(track_change) = track_audio_changes.next().await {
-        //             let this = if let Some(this) = this.upgrade() {
-        //                 this
-        //             } else {
-        //                 break;
-        //             };
+            let _maintain_audio_tracks = cx.spawn_on_main({
+                let room = room.clone();
+                |this, mut cx| async move {
+                    let mut track_audio_changes = room.remote_audio_track_updates();
+                    while let Some(track_change) = track_audio_changes.next().await {
+                        let this = if let Some(this) = this.upgrade() {
+                            this
+                        } else {
+                            break;
+                        };
 
-        //             this.update(&mut cx, |this, cx| {
-        //                 this.remote_audio_track_updated(track_change, cx).log_err()
-        //             })
-        //             .ok();
-        //         }
-        //     });
+                        this.update(&mut cx, |this, cx| {
+                            this.remote_audio_track_updated(track_change, cx).log_err()
+                        })
+                        .ok();
+                    }
+                }
+            });
 
-        //     let connect = room.connect(&connection_info.server_url, &connection_info.token);
-        //     cx.spawn(|this, mut cx| async move {
-        //         connect.await?;
+            let connect = room.connect(&connection_info.server_url, &connection_info.token);
+            cx.spawn(|this, mut cx| async move {
+                connect.await?;
 
-        //         if !cx.update(|cx| Self::mute_on_join(cx))? {
-        //             this.update(&mut cx, |this, cx| this.share_microphone(cx))?
-        //                 .await?;
-        //         }
+                if !cx.update(|cx| Self::mute_on_join(cx))? {
+                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
+                        .await?;
+                }
 
-        //         anyhow::Ok(())
-        //     })
-        //     .detach_and_log_err(cx);
-
-        //     Some(LiveKitRoom {
-        //         room,
-        //         screen_track: LocalTrack::None,
-        //         microphone_track: LocalTrack::None,
-        //         next_publish_id: 0,
-        //         muted_by_user: false,
-        //         deafened: false,
-        //         speaking: false,
-        //         _maintain_room,
-        //         _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
-        //     })
-        // } else {
-        //     None
-        // };
+                anyhow::Ok(())
+            })
+            .detach_and_log_err(cx);
+
+            Some(LiveKitRoom {
+                room,
+                screen_track: LocalTrack::None,
+                microphone_track: LocalTrack::None,
+                next_publish_id: 0,
+                muted_by_user: false,
+                deafened: false,
+                speaking: false,
+                _maintain_room,
+                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
+            })
+        } else {
+            None
+        };
 
-        // let maintain_connection = cx.spawn({
-        //     let client = client.clone();
-        //     move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
-        // });
-
-        // Audio::play_sound(Sound::Joined, cx);
-
-        // let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
-
-        // Self {
-        //     id,
-        //     channel_id,
-        //     // live_kit: live_kit_room,
-        //     status: RoomStatus::Online,
-        //     shared_projects: Default::default(),
-        //     joined_projects: Default::default(),
-        //     participant_user_ids: Default::default(),
-        //     local_participant: Default::default(),
-        //     remote_participants: Default::default(),
-        //     pending_participants: Default::default(),
-        //     pending_call_count: 0,
-        //     client_subscriptions: vec![
-        //         client.add_message_handler(cx.weak_handle(), Self::handle_room_updated)
-        //     ],
-        //     _subscriptions: vec![
-        //         cx.on_release(Self::released),
-        //         cx.on_app_quit(Self::app_will_quit),
-        //     ],
-        //     leave_when_empty: false,
-        //     pending_room_update: None,
-        //     client,
-        //     user_store,
-        //     follows_by_leader_id_project_id: Default::default(),
-        //     maintain_connection: Some(maintain_connection),
-        //     room_update_completed_tx,
-        //     room_update_completed_rx,
-        // }
+        let maintain_connection = cx.spawn({
+            let client = client.clone();
+            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
+        });
+
+        Audio::play_sound(Sound::Joined, cx);
+
+        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
+
+        Self {
+            id,
+            channel_id,
+            live_kit: live_kit_room,
+            status: RoomStatus::Online,
+            shared_projects: Default::default(),
+            joined_projects: Default::default(),
+            participant_user_ids: Default::default(),
+            local_participant: Default::default(),
+            remote_participants: Default::default(),
+            pending_participants: Default::default(),
+            pending_call_count: 0,
+            client_subscriptions: vec![
+                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
+            ],
+            _subscriptions: vec![
+                cx.on_release(Self::released),
+                cx.on_app_quit(Self::app_will_quit),
+            ],
+            leave_when_empty: false,
+            pending_room_update: None,
+            client,
+            user_store,
+            follows_by_leader_id_project_id: Default::default(),
+            maintain_connection: Some(maintain_connection),
+            room_update_completed_tx,
+            room_update_completed_rx,
+        }
     }
 
     pub(crate) fn create(
@@ -1518,7 +1523,7 @@ impl Room {
     }
 
     #[cfg(any(test, feature = "test-support"))]
-    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
+    pub fn set_display_sources(&self, sources: Vec<live_kit_client2::MacOSDisplay>) {
         todo!()
         // self.live_kit
         //     .as_ref()
@@ -1529,7 +1534,7 @@ impl Room {
 }
 
 struct LiveKitRoom {
-    room: Arc<live_kit_client::Room>,
+    room: Arc<live_kit_client2::Room>,
     screen_track: LocalTrack,
     microphone_track: LocalTrack,
     /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.

crates/live_kit_client/LiveKitBridge/Package.resolved 🔗

@@ -42,8 +42,8 @@
         "repositoryURL": "https://github.com/apple/swift-protobuf.git",
         "state": {
           "branch": null,
-          "revision": "ce20dc083ee485524b802669890291c0d8090170",
-          "version": "1.22.1"
+          "revision": "0af9125c4eae12a4973fb66574c53a54962a9e1e",
+          "version": "1.21.0"
         }
       }
     ]

crates/live_kit_client2/Cargo.toml 🔗

@@ -0,0 +1,71 @@
+[package]
+name = "live_kit_client2"
+version = "0.1.0"
+edition = "2021"
+description = "Bindings to LiveKit Swift client SDK"
+publish = false
+
+[lib]
+path = "src/live_kit_client2.rs"
+doctest = false
+
+[[example]]
+name = "test_app"
+
+[features]
+test-support = [
+    "async-trait",
+    "collections/test-support",
+    "gpui2/test-support",
+    "live_kit_server",
+    "nanoid",
+]
+
+[dependencies]
+collections = { path = "../collections", optional = true }
+gpui2 = { path = "../gpui2", optional = true }
+live_kit_server = { path = "../live_kit_server", optional = true }
+media = { path = "../media" }
+
+anyhow.workspace = true
+async-broadcast = "0.4"
+core-foundation = "0.9.3"
+core-graphics = "0.22.3"
+futures.workspace = true
+log.workspace = true
+parking_lot.workspace = true
+postage.workspace = true
+
+async-trait = { workspace = true, optional = true }
+nanoid = { version ="0.4", optional = true}
+
+[dev-dependencies]
+collections = { path = "../collections", features = ["test-support"] }
+gpui2 = { path = "../gpui2", features = ["test-support"] }
+live_kit_server = { path = "../live_kit_server" }
+media = { path = "../media" }
+nanoid = "0.4"
+
+anyhow.workspace = true
+async-trait.workspace = true
+block = "0.1"
+bytes = "1.2"
+byteorder = "1.4"
+cocoa = "0.24"
+core-foundation = "0.9.3"
+core-graphics = "0.22.3"
+foreign-types = "0.3"
+futures.workspace = true
+hmac = "0.12"
+jwt = "0.16"
+objc = "0.2"
+parking_lot.workspace = true
+serde.workspace = true
+serde_derive.workspace = true
+sha2 = "0.10"
+simplelog = "0.9"
+
+[build-dependencies]
+serde.workspace = true
+serde_derive.workspace = true
+serde_json.workspace = true

crates/live_kit_client2/LiveKitBridge/Package.resolved 🔗

@@ -0,0 +1,52 @@
+{
+  "object": {
+    "pins": [
+      {
+        "package": "LiveKit",
+        "repositoryURL": "https://github.com/livekit/client-sdk-swift.git",
+        "state": {
+          "branch": null,
+          "revision": "7331b813a5ab8a95cfb81fb2b4ed10519428b9ff",
+          "version": "1.0.12"
+        }
+      },
+      {
+        "package": "Promises",
+        "repositoryURL": "https://github.com/google/promises.git",
+        "state": {
+          "branch": null,
+          "revision": "ec957ccddbcc710ccc64c9dcbd4c7006fcf8b73a",
+          "version": "2.2.0"
+        }
+      },
+      {
+        "package": "WebRTC",
+        "repositoryURL": "https://github.com/webrtc-sdk/Specs.git",
+        "state": {
+          "branch": null,
+          "revision": "2f6bab30c8df0fe59ab3e58bc99097f757f85f65",
+          "version": "104.5112.17"
+        }
+      },
+      {
+        "package": "swift-log",
+        "repositoryURL": "https://github.com/apple/swift-log.git",
+        "state": {
+          "branch": null,
+          "revision": "32e8d724467f8fe623624570367e3d50c5638e46",
+          "version": "1.5.2"
+        }
+      },
+      {
+        "package": "SwiftProtobuf",
+        "repositoryURL": "https://github.com/apple/swift-protobuf.git",
+        "state": {
+          "branch": null,
+          "revision": "ce20dc083ee485524b802669890291c0d8090170",
+          "version": "1.22.1"
+        }
+      }
+    ]
+  },
+  "version": 1
+}

crates/live_kit_client2/LiveKitBridge/Package.swift 🔗

@@ -0,0 +1,27 @@
+// swift-tools-version: 5.5
+
+import PackageDescription
+
+let package = Package(
+    name: "LiveKitBridge",
+    platforms: [
+        .macOS(.v10_15)
+    ],
+    products: [
+        // Products define the executables and libraries a package produces, and make them visible to other packages.
+        .library(
+            name: "LiveKitBridge",
+            type: .static,
+            targets: ["LiveKitBridge"]),
+    ],
+    dependencies: [
+        .package(url: "https://github.com/livekit/client-sdk-swift.git", .exact("1.0.12")),
+    ],
+    targets: [
+        // Targets are the basic building blocks of a package. A target can define a module or a test suite.
+        // Targets can depend on other targets in this package, and on products in packages this package depends on.
+        .target(
+            name: "LiveKitBridge",
+            dependencies: [.product(name: "LiveKit", package: "client-sdk-swift")]),
+    ]
+)

crates/live_kit_client2/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift 🔗

@@ -0,0 +1,327 @@
+import Foundation
+import LiveKit
+import WebRTC
+import ScreenCaptureKit
+
+class LKRoomDelegate: RoomDelegate {
+    var data: UnsafeRawPointer
+    var onDidDisconnect: @convention(c) (UnsafeRawPointer) -> Void
+    var onDidSubscribeToRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void
+    var onDidUnsubscribeFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
+    var onMuteChangedFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void
+    var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void
+    var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void
+    var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
+
+    init(
+        data: UnsafeRawPointer,
+        onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void,
+        onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void,
+        onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void,
+        onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void,
+        onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void,
+        onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
+        onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void)
+    {
+        self.data = data
+        self.onDidDisconnect = onDidDisconnect
+        self.onDidSubscribeToRemoteAudioTrack = onDidSubscribeToRemoteAudioTrack
+        self.onDidUnsubscribeFromRemoteAudioTrack = onDidUnsubscribeFromRemoteAudioTrack
+        self.onDidSubscribeToRemoteVideoTrack = onDidSubscribeToRemoteVideoTrack
+        self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack
+        self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack
+        self.onActiveSpeakersChanged = onActiveSpeakersChanged
+    }
+
+    func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) {
+        if connectionState.isDisconnected {
+            self.onDidDisconnect(self.data)
+        }
+    }
+
+    func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) {
+        if track.kind == .video {
+            self.onDidSubscribeToRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque())
+        } else if track.kind == .audio {
+            self.onDidSubscribeToRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque(), Unmanaged.passUnretained(publication).toOpaque())
+        }
+    }
+
+    func room(_ room: Room, participant: Participant, didUpdate publication: TrackPublication, muted: Bool) {
+        if publication.kind == .audio {
+            self.onMuteChangedFromRemoteAudioTrack(self.data, publication.sid as CFString, muted)
+        }
+    }
+
+    func room(_ room: Room, didUpdate speakers: [Participant]) {
+        guard let speaker_ids = speakers.compactMap({ $0.identity as CFString }) as CFArray? else { return }
+        self.onActiveSpeakersChanged(self.data, speaker_ids)
+    }
+
+    func room(_ room: Room, participant: RemoteParticipant, didUnsubscribe publication: RemoteTrackPublication, track: Track) {
+        if track.kind == .video {
+            self.onDidUnsubscribeFromRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString)
+        } else if track.kind == .audio {
+            self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString)
+        }
+    }
+}
+
+class LKVideoRenderer: NSObject, VideoRenderer {
+    var data: UnsafeRawPointer
+    var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool
+    var onDrop: @convention(c) (UnsafeRawPointer) -> Void
+    var adaptiveStreamIsEnabled: Bool = false
+    var adaptiveStreamSize: CGSize = .zero
+    weak var track: VideoTrack?
+
+    init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) {
+        self.data = data
+        self.onFrame = onFrame
+        self.onDrop = onDrop
+    }
+
+    deinit {
+        self.onDrop(self.data)
+    }
+
+    func setSize(_ size: CGSize) {
+    }
+
+    func renderFrame(_ frame: RTCVideoFrame?) {
+        let buffer = frame?.buffer as? RTCCVPixelBuffer
+        if let pixelBuffer = buffer?.pixelBuffer {
+            if !self.onFrame(self.data, pixelBuffer) {
+                DispatchQueue.main.async {
+                    self.track?.remove(videoRenderer: self)
+                }
+            }
+        }
+    }
+}
+
+@_cdecl("LKRoomDelegateCreate")
+public func LKRoomDelegateCreate(
+    data: UnsafeRawPointer,
+    onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void,
+    onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void,
+    onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void,
+    onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void,
+    onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void,
+    onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
+    onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
+) -> UnsafeMutableRawPointer {
+    let delegate = LKRoomDelegate(
+        data: data,
+        onDidDisconnect: onDidDisconnect,
+        onDidSubscribeToRemoteAudioTrack: onDidSubscribeToRemoteAudioTrack,
+        onDidUnsubscribeFromRemoteAudioTrack: onDidUnsubscribeFromRemoteAudioTrack,
+        onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack,
+        onActiveSpeakersChanged: onActiveSpeakerChanged,
+        onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack,
+        onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack
+    )
+    return Unmanaged.passRetained(delegate).toOpaque()
+}
+
+@_cdecl("LKRoomCreate")
+public func LKRoomCreate(delegate: UnsafeRawPointer) -> UnsafeMutableRawPointer  {
+    let delegate = Unmanaged<LKRoomDelegate>.fromOpaque(delegate).takeUnretainedValue()
+    return Unmanaged.passRetained(Room(delegate: delegate)).toOpaque()
+}
+
+@_cdecl("LKRoomConnect")
+public func LKRoomConnect(room: UnsafeRawPointer, url: CFString, token: CFString, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) {
+    let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
+
+    room.connect(url as String, token as String).then { _ in
+        callback(callback_data, UnsafeRawPointer(nil) as! CFString?)
+    }.catch { error in
+        callback(callback_data, error.localizedDescription as CFString)
+    }
+}
+
+@_cdecl("LKRoomDisconnect")
+public func LKRoomDisconnect(room: UnsafeRawPointer) {
+    let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
+    room.disconnect()
+}
+
+@_cdecl("LKRoomPublishVideoTrack")
+public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) {
+    let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
+    let track = Unmanaged<LocalVideoTrack>.fromOpaque(track).takeUnretainedValue()
+    room.localParticipant?.publishVideoTrack(track: track).then { publication in
+        callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil)
+    }.catch { error in
+        callback(callback_data, nil, error.localizedDescription as CFString)
+    }
+}
+
+@_cdecl("LKRoomPublishAudioTrack")
+public func LKRoomPublishAudioTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) {
+    let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
+    let track = Unmanaged<LocalAudioTrack>.fromOpaque(track).takeUnretainedValue()
+    room.localParticipant?.publishAudioTrack(track: track).then { publication in
+        callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil)
+    }.catch { error in
+        callback(callback_data, nil, error.localizedDescription as CFString)
+    }
+}
+
+
+@_cdecl("LKRoomUnpublishTrack")
+public func LKRoomUnpublishTrack(room: UnsafeRawPointer, publication: UnsafeRawPointer) {
+    let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
+    let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
+    let _ = room.localParticipant?.unpublish(publication: publication)
+}
+
+@_cdecl("LKRoomAudioTracksForRemoteParticipant")
+public func LKRoomAudioTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? {
+    let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
+
+    for (_, participant) in room.remoteParticipants {
+        if participant.identity == participantId as String {
+            return participant.audioTracks.compactMap { $0.track as? RemoteAudioTrack } as CFArray?
+        }
+    }
+
+    return nil;
+}
+
+@_cdecl("LKRoomAudioTrackPublicationsForRemoteParticipant")
+public func LKRoomAudioTrackPublicationsForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? {
+    let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
+
+    for (_, participant) in room.remoteParticipants {
+        if participant.identity == participantId as String {
+            return participant.audioTracks.compactMap { $0 as? RemoteTrackPublication } as CFArray?
+        }
+    }
+
+    return nil;
+}
+
+@_cdecl("LKRoomVideoTracksForRemoteParticipant")
+public func LKRoomVideoTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? {
+    let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
+
+    for (_, participant) in room.remoteParticipants {
+        if participant.identity == participantId as String {
+            return participant.videoTracks.compactMap { $0.track as? RemoteVideoTrack } as CFArray?
+        }
+    }
+
+    return nil;
+}
+
+@_cdecl("LKLocalAudioTrackCreateTrack")
+public func LKLocalAudioTrackCreateTrack() -> UnsafeMutableRawPointer {
+    let track = LocalAudioTrack.createTrack(options: AudioCaptureOptions(
+      echoCancellation: true,
+      noiseSuppression: true
+    ))
+
+    return Unmanaged.passRetained(track).toOpaque()
+}
+
+
+@_cdecl("LKCreateScreenShareTrackForDisplay")
+public func LKCreateScreenShareTrackForDisplay(display: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
+    let display = Unmanaged<MacOSDisplay>.fromOpaque(display).takeUnretainedValue()
+    let track = LocalVideoTrack.createMacOSScreenShareTrack(source: display, preferredMethod: .legacy)
+    return Unmanaged.passRetained(track).toOpaque()
+}
+
+@_cdecl("LKVideoRendererCreate")
+public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
+    Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque()
+}
+
+@_cdecl("LKVideoTrackAddRenderer")
+public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) {
+    let track = Unmanaged<Track>.fromOpaque(track).takeUnretainedValue() as! VideoTrack
+    let renderer = Unmanaged<LKVideoRenderer>.fromOpaque(renderer).takeRetainedValue()
+    renderer.track = track
+    track.add(videoRenderer: renderer)
+}
+
+@_cdecl("LKRemoteVideoTrackGetSid")
+public func LKRemoteVideoTrackGetSid(track: UnsafeRawPointer) -> CFString {
+    let track = Unmanaged<RemoteVideoTrack>.fromOpaque(track).takeUnretainedValue()
+    return track.sid! as CFString
+}
+
+@_cdecl("LKRemoteAudioTrackGetSid")
+public func LKRemoteAudioTrackGetSid(track: UnsafeRawPointer) -> CFString {
+    let track = Unmanaged<RemoteAudioTrack>.fromOpaque(track).takeUnretainedValue()
+    return track.sid! as CFString
+}
+
+@_cdecl("LKDisplaySources")
+public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) {
+    MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in
+        callback(data, displaySources as CFArray, nil)
+    }.catch { error in
+        callback(data, nil, error.localizedDescription as CFString)
+    }
+}
+
+@_cdecl("LKLocalTrackPublicationSetMute")
+public func LKLocalTrackPublicationSetMute(
+    publication: UnsafeRawPointer,
+    muted: Bool,
+    on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void,
+    callback_data: UnsafeRawPointer
+) {
+    let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
+
+    if muted {
+        publication.mute().then {
+            on_complete(callback_data, nil)
+        }.catch { error in
+            on_complete(callback_data, error.localizedDescription as CFString)
+        }
+    } else {
+        publication.unmute().then {
+            on_complete(callback_data, nil)
+        }.catch { error in
+            on_complete(callback_data, error.localizedDescription as CFString)
+        }
+    }
+}
+
+@_cdecl("LKRemoteTrackPublicationSetEnabled")
+public func LKRemoteTrackPublicationSetEnabled(
+    publication: UnsafeRawPointer,
+    enabled: Bool,
+    on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void,
+    callback_data: UnsafeRawPointer
+) {
+    let publication = Unmanaged<RemoteTrackPublication>.fromOpaque(publication).takeUnretainedValue()
+
+    publication.set(enabled: enabled).then {
+        on_complete(callback_data, nil)
+    }.catch { error in
+        on_complete(callback_data, error.localizedDescription as CFString)
+    }
+}
+
+@_cdecl("LKRemoteTrackPublicationIsMuted")
+public func LKRemoteTrackPublicationIsMuted(
+    publication: UnsafeRawPointer
+) -> Bool {
+    let publication = Unmanaged<RemoteTrackPublication>.fromOpaque(publication).takeUnretainedValue()
+
+    return publication.muted
+}
+
+@_cdecl("LKRemoteTrackPublicationGetSid")
+public func LKRemoteTrackPublicationGetSid(
+    publication: UnsafeRawPointer
+) -> CFString {
+    let publication = Unmanaged<RemoteTrackPublication>.fromOpaque(publication).takeUnretainedValue()
+
+    return publication.sid as CFString
+}

crates/live_kit_client2/build.rs 🔗

@@ -0,0 +1,172 @@
+use serde::Deserialize;
+use std::{
+    env,
+    path::{Path, PathBuf},
+    process::Command,
+};
+
+const SWIFT_PACKAGE_NAME: &str = "LiveKitBridge";
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SwiftTargetInfo {
+    pub triple: String,
+    pub unversioned_triple: String,
+    pub module_triple: String,
+    pub swift_runtime_compatibility_version: String,
+    #[serde(rename = "librariesRequireRPath")]
+    pub libraries_require_rpath: bool,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SwiftPaths {
+    pub runtime_library_paths: Vec<String>,
+    pub runtime_library_import_paths: Vec<String>,
+    pub runtime_resource_path: String,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct SwiftTarget {
+    pub target: SwiftTargetInfo,
+    pub paths: SwiftPaths,
+}
+
+const MACOS_TARGET_VERSION: &str = "10.15.7";
+
+fn main() {
+    if cfg!(not(any(test, feature = "test-support"))) {
+        let swift_target = get_swift_target();
+
+        build_bridge(&swift_target);
+        link_swift_stdlib(&swift_target);
+        link_webrtc_framework(&swift_target);
+
+        // Register exported Objective-C selectors, protocols, etc when building example binaries.
+        println!("cargo:rustc-link-arg=-Wl,-ObjC");
+    }
+}
+
+fn build_bridge(swift_target: &SwiftTarget) {
+    println!("cargo:rerun-if-env-changed=MACOSX_DEPLOYMENT_TARGET");
+    println!("cargo:rerun-if-changed={}/Sources", SWIFT_PACKAGE_NAME);
+    println!(
+        "cargo:rerun-if-changed={}/Package.swift",
+        SWIFT_PACKAGE_NAME
+    );
+    println!(
+        "cargo:rerun-if-changed={}/Package.resolved",
+        SWIFT_PACKAGE_NAME
+    );
+
+    let swift_package_root = swift_package_root();
+    let swift_target_folder = swift_target_folder();
+    if !Command::new("swift")
+        .arg("build")
+        .arg("--disable-automatic-resolution")
+        .args(["--configuration", &env::var("PROFILE").unwrap()])
+        .args(["--triple", &swift_target.target.triple])
+        .args(["--build-path".into(), swift_target_folder])
+        .current_dir(&swift_package_root)
+        .status()
+        .unwrap()
+        .success()
+    {
+        panic!(
+            "Failed to compile swift package in {}",
+            swift_package_root.display()
+        );
+    }
+
+    println!(
+        "cargo:rustc-link-search=native={}",
+        swift_target.out_dir_path().display()
+    );
+    println!("cargo:rustc-link-lib=static={}", SWIFT_PACKAGE_NAME);
+}
+
+fn link_swift_stdlib(swift_target: &SwiftTarget) {
+    for path in &swift_target.paths.runtime_library_paths {
+        println!("cargo:rustc-link-search=native={}", path);
+    }
+}
+
+fn link_webrtc_framework(swift_target: &SwiftTarget) {
+    let swift_out_dir_path = swift_target.out_dir_path();
+    println!("cargo:rustc-link-lib=framework=WebRTC");
+    println!(
+        "cargo:rustc-link-search=framework={}",
+        swift_out_dir_path.display()
+    );
+    // Find WebRTC.framework as a sibling of the executable when running tests.
+    println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path");
+    // Find WebRTC.framework in parent directory of the executable when running examples.
+    println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path/..");
+
+    let source_path = swift_out_dir_path.join("WebRTC.framework");
+    let deps_dir_path =
+        PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../deps/WebRTC.framework");
+    let target_dir_path =
+        PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../WebRTC.framework");
+    copy_dir(&source_path, &deps_dir_path);
+    copy_dir(&source_path, &target_dir_path);
+}
+
+fn get_swift_target() -> SwiftTarget {
+    let mut arch = env::var("CARGO_CFG_TARGET_ARCH").unwrap();
+    if arch == "aarch64" {
+        arch = "arm64".into();
+    }
+    let target = format!("{}-apple-macosx{}", arch, MACOS_TARGET_VERSION);
+
+    let swift_target_info_str = Command::new("swift")
+        .args(["-target", &target, "-print-target-info"])
+        .output()
+        .unwrap()
+        .stdout;
+
+    serde_json::from_slice(&swift_target_info_str).unwrap()
+}
+
+fn swift_package_root() -> PathBuf {
+    env::current_dir().unwrap().join(SWIFT_PACKAGE_NAME)
+}
+
+fn swift_target_folder() -> PathBuf {
+    env::current_dir()
+        .unwrap()
+        .join(format!("../../target/{SWIFT_PACKAGE_NAME}"))
+}
+
+fn copy_dir(source: &Path, destination: &Path) {
+    assert!(
+        Command::new("rm")
+            .arg("-rf")
+            .arg(destination)
+            .status()
+            .unwrap()
+            .success(),
+        "could not remove {:?} before copying",
+        destination
+    );
+
+    assert!(
+        Command::new("cp")
+            .arg("-R")
+            .args([source, destination])
+            .status()
+            .unwrap()
+            .success(),
+        "could not copy {:?} to {:?}",
+        source,
+        destination
+    );
+}
+
+impl SwiftTarget {
+    fn out_dir_path(&self) -> PathBuf {
+        swift_target_folder()
+            .join(&self.target.unversioned_triple)
+            .join(env::var("PROFILE").unwrap())
+    }
+}

crates/live_kit_client2/examples/test_app.rs 🔗

@@ -0,0 +1,175 @@
+// use std::time::Duration;
+// todo!()
+
+// use futures::StreamExt;
+// use gpui2::{actions, keymap_matcher::Binding, Menu, MenuItem};
+// use live_kit_client2::{
+//     LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room,
+// };
+// use live_kit_server::token::{self, VideoGrant};
+// use log::LevelFilter;
+// use simplelog::SimpleLogger;
+
+// actions!(capture, [Quit]);
+
+fn main() {
+    //     SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger");
+
+    //     gpui2::App::new(()).unwrap().run(|cx| {
+    //         #[cfg(any(test, feature = "test-support"))]
+    //         println!("USING TEST LIVEKIT");
+
+    //         #[cfg(not(any(test, feature = "test-support")))]
+    //         println!("USING REAL LIVEKIT");
+
+    //         cx.platform().activate(true);
+    //         cx.add_global_action(quit);
+
+    //         cx.add_bindings([Binding::new("cmd-q", Quit, None)]);
+    //         cx.set_menus(vec![Menu {
+    //             name: "Zed",
+    //             items: vec![MenuItem::Action {
+    //                 name: "Quit",
+    //                 action: Box::new(Quit),
+    //                 os_action: None,
+    //             }],
+    //         }]);
+
+    //         let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into());
+    //         let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into());
+    //         let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into());
+
+    //         cx.spawn(|cx| async move {
+    //             let user_a_token = token::create(
+    //                 &live_kit_key,
+    //                 &live_kit_secret,
+    //                 Some("test-participant-1"),
+    //                 VideoGrant::to_join("test-room"),
+    //             )
+    //             .unwrap();
+    //             let room_a = Room::new();
+    //             room_a.connect(&live_kit_url, &user_a_token).await.unwrap();
+
+    //             let user2_token = token::create(
+    //                 &live_kit_key,
+    //                 &live_kit_secret,
+    //                 Some("test-participant-2"),
+    //                 VideoGrant::to_join("test-room"),
+    //             )
+    //             .unwrap();
+    //             let room_b = Room::new();
+    //             room_b.connect(&live_kit_url, &user2_token).await.unwrap();
+
+    //             let mut audio_track_updates = room_b.remote_audio_track_updates();
+    //             let audio_track = LocalAudioTrack::create();
+    //             let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap();
+
+    //             if let RemoteAudioTrackUpdate::Subscribed(track, _) =
+    //                 audio_track_updates.next().await.unwrap()
+    //             {
+    //                 let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
+    //                 assert_eq!(remote_tracks.len(), 1);
+    //                 assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1");
+    //                 assert_eq!(track.publisher_id(), "test-participant-1");
+    //             } else {
+    //                 panic!("unexpected message");
+    //             }
+
+    //             audio_track_publication.set_mute(true).await.unwrap();
+
+    //             println!("waiting for mute changed!");
+    //             if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } =
+    //                 audio_track_updates.next().await.unwrap()
+    //             {
+    //                 let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
+    //                 assert_eq!(remote_tracks[0].sid(), track_id);
+    //                 assert_eq!(muted, true);
+    //             } else {
+    //                 panic!("unexpected message");
+    //             }
+
+    //             audio_track_publication.set_mute(false).await.unwrap();
+
+    //             if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } =
+    //                 audio_track_updates.next().await.unwrap()
+    //             {
+    //                 let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
+    //                 assert_eq!(remote_tracks[0].sid(), track_id);
+    //                 assert_eq!(muted, false);
+    //             } else {
+    //                 panic!("unexpected message");
+    //             }
+
+    //             println!("Pausing for 5 seconds to test audio, make some noise!");
+    //             let timer = cx.background().timer(Duration::from_secs(5));
+    //             timer.await;
+    //             let remote_audio_track = room_b
+    //                 .remote_audio_tracks("test-participant-1")
+    //                 .pop()
+    //                 .unwrap();
+    //             room_a.unpublish_track(audio_track_publication);
+
+    //             // Clear out any active speakers changed messages
+    //             let mut next = audio_track_updates.next().await.unwrap();
+    //             while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next {
+    //                 println!("Speakers changed: {:?}", speakers);
+    //                 next = audio_track_updates.next().await.unwrap();
+    //             }
+
+    //             if let RemoteAudioTrackUpdate::Unsubscribed {
+    //                 publisher_id,
+    //                 track_id,
+    //             } = next
+    //             {
+    //                 assert_eq!(publisher_id, "test-participant-1");
+    //                 assert_eq!(remote_audio_track.sid(), track_id);
+    //                 assert_eq!(room_b.remote_audio_tracks("test-participant-1").len(), 0);
+    //             } else {
+    //                 panic!("unexpected message");
+    //             }
+
+    //             let mut video_track_updates = room_b.remote_video_track_updates();
+    //             let displays = room_a.display_sources().await.unwrap();
+    //             let display = displays.into_iter().next().unwrap();
+
+    //             let local_video_track = LocalVideoTrack::screen_share_for_display(&display);
+    //             let local_video_track_publication =
+    //                 room_a.publish_video_track(local_video_track).await.unwrap();
+
+    //             if let RemoteVideoTrackUpdate::Subscribed(track) =
+    //                 video_track_updates.next().await.unwrap()
+    //             {
+    //                 let remote_video_tracks = room_b.remote_video_tracks("test-participant-1");
+    //                 assert_eq!(remote_video_tracks.len(), 1);
+    //                 assert_eq!(remote_video_tracks[0].publisher_id(), "test-participant-1");
+    //                 assert_eq!(track.publisher_id(), "test-participant-1");
+    //             } else {
+    //                 panic!("unexpected message");
+    //             }
+
+    //             let remote_video_track = room_b
+    //                 .remote_video_tracks("test-participant-1")
+    //                 .pop()
+    //                 .unwrap();
+    //             room_a.unpublish_track(local_video_track_publication);
+    //             if let RemoteVideoTrackUpdate::Unsubscribed {
+    //                 publisher_id,
+    //                 track_id,
+    //             } = video_track_updates.next().await.unwrap()
+    //             {
+    //                 assert_eq!(publisher_id, "test-participant-1");
+    //                 assert_eq!(remote_video_track.sid(), track_id);
+    //                 assert_eq!(room_b.remote_video_tracks("test-participant-1").len(), 0);
+    //             } else {
+    //                 panic!("unexpected message");
+    //             }
+
+    //             cx.platform().quit();
+    //         })
+    //         .detach();
+    //     });
+}
+
+// fn quit(_: &Quit, cx: &mut gpui2::AppContext) {
+//     cx.platform().quit();
+// }

crates/live_kit_client2/src/live_kit_client2.rs 🔗

@@ -0,0 +1,11 @@
+// #[cfg(not(any(test, feature = "test-support")))]
+pub mod prod;
+
+// #[cfg(not(any(test, feature = "test-support")))]
+pub use prod::*;
+
+// #[cfg(any(test, feature = "test-support"))]
+// pub mod test;
+
+// #[cfg(any(test, feature = "test-support"))]
+// pub use test::*;

crates/live_kit_client2/src/prod.rs 🔗

@@ -0,0 +1,943 @@
+use anyhow::{anyhow, Context, Result};
+use core_foundation::{
+    array::{CFArray, CFArrayRef},
+    base::{CFRelease, CFRetain, TCFType},
+    string::{CFString, CFStringRef},
+};
+use futures::{
+    channel::{mpsc, oneshot},
+    Future,
+};
+pub use media::core_video::CVImageBuffer;
+use media::core_video::CVImageBufferRef;
+use parking_lot::Mutex;
+use postage::watch;
+use std::{
+    ffi::c_void,
+    sync::{Arc, Weak},
+};
+
+// SAFETY: Most live kit types are threadsafe:
+// https://github.com/livekit/client-sdk-swift#thread-safety
+macro_rules! pointer_type {
+    ($pointer_name:ident) => {
+        #[repr(transparent)]
+        #[derive(Copy, Clone, Debug)]
+        pub struct $pointer_name(pub *const std::ffi::c_void);
+        unsafe impl Send for $pointer_name {}
+    };
+}
+
+mod swift {
+    pointer_type!(Room);
+    pointer_type!(LocalAudioTrack);
+    pointer_type!(RemoteAudioTrack);
+    pointer_type!(LocalVideoTrack);
+    pointer_type!(RemoteVideoTrack);
+    pointer_type!(LocalTrackPublication);
+    pointer_type!(RemoteTrackPublication);
+    pointer_type!(MacOSDisplay);
+    pointer_type!(RoomDelegate);
+}
+
+extern "C" {
+    fn LKRoomDelegateCreate(
+        callback_data: *mut c_void,
+        on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
+        on_did_subscribe_to_remote_audio_track: extern "C" fn(
+            callback_data: *mut c_void,
+            publisher_id: CFStringRef,
+            track_id: CFStringRef,
+            remote_track: swift::RemoteAudioTrack,
+            remote_publication: swift::RemoteTrackPublication,
+        ),
+        on_did_unsubscribe_from_remote_audio_track: extern "C" fn(
+            callback_data: *mut c_void,
+            publisher_id: CFStringRef,
+            track_id: CFStringRef,
+        ),
+        on_mute_changed_from_remote_audio_track: extern "C" fn(
+            callback_data: *mut c_void,
+            track_id: CFStringRef,
+            muted: bool,
+        ),
+        on_active_speakers_changed: extern "C" fn(
+            callback_data: *mut c_void,
+            participants: CFArrayRef,
+        ),
+        on_did_subscribe_to_remote_video_track: extern "C" fn(
+            callback_data: *mut c_void,
+            publisher_id: CFStringRef,
+            track_id: CFStringRef,
+            remote_track: swift::RemoteVideoTrack,
+        ),
+        on_did_unsubscribe_from_remote_video_track: extern "C" fn(
+            callback_data: *mut c_void,
+            publisher_id: CFStringRef,
+            track_id: CFStringRef,
+        ),
+    ) -> swift::RoomDelegate;
+
+    fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room;
+    fn LKRoomConnect(
+        room: swift::Room,
+        url: CFStringRef,
+        token: CFStringRef,
+        callback: extern "C" fn(*mut c_void, CFStringRef),
+        callback_data: *mut c_void,
+    );
+    fn LKRoomDisconnect(room: swift::Room);
+    fn LKRoomPublishVideoTrack(
+        room: swift::Room,
+        track: swift::LocalVideoTrack,
+        callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef),
+        callback_data: *mut c_void,
+    );
+    fn LKRoomPublishAudioTrack(
+        room: swift::Room,
+        track: swift::LocalAudioTrack,
+        callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef),
+        callback_data: *mut c_void,
+    );
+    fn LKRoomUnpublishTrack(room: swift::Room, publication: swift::LocalTrackPublication);
+
+    fn LKRoomAudioTracksForRemoteParticipant(
+        room: swift::Room,
+        participant_id: CFStringRef,
+    ) -> CFArrayRef;
+
+    fn LKRoomAudioTrackPublicationsForRemoteParticipant(
+        room: swift::Room,
+        participant_id: CFStringRef,
+    ) -> CFArrayRef;
+
+    fn LKRoomVideoTracksForRemoteParticipant(
+        room: swift::Room,
+        participant_id: CFStringRef,
+    ) -> CFArrayRef;
+
+    fn LKVideoRendererCreate(
+        callback_data: *mut c_void,
+        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
+        on_drop: extern "C" fn(callback_data: *mut c_void),
+    ) -> *const c_void;
+
+    fn LKRemoteAudioTrackGetSid(track: swift::RemoteAudioTrack) -> CFStringRef;
+    fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void);
+    fn LKRemoteVideoTrackGetSid(track: swift::RemoteVideoTrack) -> CFStringRef;
+
+    fn LKDisplaySources(
+        callback_data: *mut c_void,
+        callback: extern "C" fn(
+            callback_data: *mut c_void,
+            sources: CFArrayRef,
+            error: CFStringRef,
+        ),
+    );
+    fn LKCreateScreenShareTrackForDisplay(display: swift::MacOSDisplay) -> swift::LocalVideoTrack;
+    fn LKLocalAudioTrackCreateTrack() -> swift::LocalAudioTrack;
+
+    fn LKLocalTrackPublicationSetMute(
+        publication: swift::LocalTrackPublication,
+        muted: bool,
+        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
+        callback_data: *mut c_void,
+    );
+
+    fn LKRemoteTrackPublicationSetEnabled(
+        publication: swift::RemoteTrackPublication,
+        enabled: bool,
+        on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef),
+        callback_data: *mut c_void,
+    );
+
+    fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool;
+    fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef;
+}
+
+pub type Sid = String;
+
+#[derive(Clone, Eq, PartialEq)]
+pub enum ConnectionState {
+    Disconnected,
+    Connected { url: String, token: String },
+}
+
+pub struct Room {
+    native_room: Mutex<swift::Room>,
+    connection: Mutex<(
+        watch::Sender<ConnectionState>,
+        watch::Receiver<ConnectionState>,
+    )>,
+    remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
+    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
+    _delegate: Mutex<RoomDelegate>,
+}
+
+trait AssertSendSync: Send {}
+impl AssertSendSync for Room {}
+
+impl Room {
+    pub fn new() -> Arc<Self> {
+        Arc::new_cyclic(|weak_room| {
+            let delegate = RoomDelegate::new(weak_room.clone());
+            Self {
+                native_room: Mutex::new(unsafe { LKRoomCreate(delegate.native_delegate) }),
+                connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
+                remote_audio_track_subscribers: Default::default(),
+                remote_video_track_subscribers: Default::default(),
+                _delegate: Mutex::new(delegate),
+            }
+        })
+    }
+
+    pub fn status(&self) -> watch::Receiver<ConnectionState> {
+        self.connection.lock().1.clone()
+    }
+
+    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
+        let url = CFString::new(url);
+        let token = CFString::new(token);
+        let (did_connect, tx, rx) = Self::build_done_callback();
+        unsafe {
+            LKRoomConnect(
+                *self.native_room.lock(),
+                url.as_concrete_TypeRef(),
+                token.as_concrete_TypeRef(),
+                did_connect,
+                tx,
+            )
+        }
+
+        let this = self.clone();
+        let url = url.to_string();
+        let token = token.to_string();
+        async move {
+            rx.await.unwrap().context("error connecting to room")?;
+            *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token };
+            Ok(())
+        }
+    }
+
+    fn did_disconnect(&self) {
+        *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
+    }
+
+    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
+        extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
+            unsafe {
+                let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
+
+                if sources.is_null() {
+                    let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
+                } else {
+                    let sources = CFArray::wrap_under_get_rule(sources)
+                        .into_iter()
+                        .map(|source| MacOSDisplay::new(swift::MacOSDisplay(*source)))
+                        .collect();
+
+                    let _ = tx.send(Ok(sources));
+                }
+            }
+        }
+
+        let (tx, rx) = oneshot::channel();
+
+        unsafe {
+            LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
+        }
+
+        async move { rx.await.unwrap() }
+    }
+
+    pub fn publish_video_track(
+        self: &Arc<Self>,
+        track: LocalVideoTrack,
+    ) -> impl Future<Output = Result<LocalTrackPublication>> {
+        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
+        extern "C" fn callback(
+            tx: *mut c_void,
+            publication: swift::LocalTrackPublication,
+            error: CFStringRef,
+        ) {
+            let tx =
+                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
+            if error.is_null() {
+                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
+            } else {
+                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
+                let _ = tx.send(Err(anyhow!(error)));
+            }
+        }
+        unsafe {
+            LKRoomPublishVideoTrack(
+                *self.native_room.lock(),
+                track.0,
+                callback,
+                Box::into_raw(Box::new(tx)) as *mut c_void,
+            );
+        }
+        async { rx.await.unwrap().context("error publishing video track") }
+    }
+
+    pub fn publish_audio_track(
+        self: &Arc<Self>,
+        track: LocalAudioTrack,
+    ) -> impl Future<Output = Result<LocalTrackPublication>> {
+        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
+        extern "C" fn callback(
+            tx: *mut c_void,
+            publication: swift::LocalTrackPublication,
+            error: CFStringRef,
+        ) {
+            let tx =
+                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
+            if error.is_null() {
+                let _ = tx.send(Ok(LocalTrackPublication::new(publication)));
+            } else {
+                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
+                let _ = tx.send(Err(anyhow!(error)));
+            }
+        }
+        unsafe {
+            LKRoomPublishAudioTrack(
+                *self.native_room.lock(),
+                track.0,
+                callback,
+                Box::into_raw(Box::new(tx)) as *mut c_void,
+            );
+        }
+        async { rx.await.unwrap().context("error publishing audio track") }
+    }
+
+    pub fn unpublish_track(&self, publication: LocalTrackPublication) {
+        unsafe {
+            LKRoomUnpublishTrack(*self.native_room.lock(), publication.0);
+        }
+    }
+
+    pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
+        unsafe {
+            let tracks = LKRoomVideoTracksForRemoteParticipant(
+                *self.native_room.lock(),
+                CFString::new(participant_id).as_concrete_TypeRef(),
+            );
+
+            if tracks.is_null() {
+                Vec::new()
+            } else {
+                let tracks = CFArray::wrap_under_get_rule(tracks);
+                tracks
+                    .into_iter()
+                    .map(|native_track| {
+                        let native_track = swift::RemoteVideoTrack(*native_track);
+                        let id =
+                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
+                                .to_string();
+                        Arc::new(RemoteVideoTrack::new(
+                            native_track,
+                            id,
+                            participant_id.into(),
+                        ))
+                    })
+                    .collect()
+            }
+        }
+    }
+
+    pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
+        unsafe {
+            let tracks = LKRoomAudioTracksForRemoteParticipant(
+                *self.native_room.lock(),
+                CFString::new(participant_id).as_concrete_TypeRef(),
+            );
+
+            if tracks.is_null() {
+                Vec::new()
+            } else {
+                let tracks = CFArray::wrap_under_get_rule(tracks);
+                tracks
+                    .into_iter()
+                    .map(|native_track| {
+                        let native_track = swift::RemoteAudioTrack(*native_track);
+                        let id =
+                            CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track))
+                                .to_string();
+                        Arc::new(RemoteAudioTrack::new(
+                            native_track,
+                            id,
+                            participant_id.into(),
+                        ))
+                    })
+                    .collect()
+            }
+        }
+    }
+
+    pub fn remote_audio_track_publications(
+        &self,
+        participant_id: &str,
+    ) -> Vec<Arc<RemoteTrackPublication>> {
+        unsafe {
+            let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant(
+                *self.native_room.lock(),
+                CFString::new(participant_id).as_concrete_TypeRef(),
+            );
+
+            if tracks.is_null() {
+                Vec::new()
+            } else {
+                let tracks = CFArray::wrap_under_get_rule(tracks);
+                tracks
+                    .into_iter()
+                    .map(|native_track_publication| {
+                        let native_track_publication =
+                            swift::RemoteTrackPublication(*native_track_publication);
+                        Arc::new(RemoteTrackPublication::new(native_track_publication))
+                    })
+                    .collect()
+            }
+        }
+    }
+
+    pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
+        let (tx, rx) = mpsc::unbounded();
+        self.remote_audio_track_subscribers.lock().push(tx);
+        rx
+    }
+
+    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
+        let (tx, rx) = mpsc::unbounded();
+        self.remote_video_track_subscribers.lock().push(tx);
+        rx
+    }
+
+    fn did_subscribe_to_remote_audio_track(
+        &self,
+        track: RemoteAudioTrack,
+        publication: RemoteTrackPublication,
+    ) {
+        let track = Arc::new(track);
+        let publication = Arc::new(publication);
+        self.remote_audio_track_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(
+                track.clone(),
+                publication.clone(),
+            ))
+            .is_ok()
+        });
+    }
+
+    fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
+        self.remote_audio_track_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
+                publisher_id: publisher_id.clone(),
+                track_id: track_id.clone(),
+            })
+            .is_ok()
+        });
+    }
+
+    fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
+        self.remote_audio_track_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged {
+                track_id: track_id.clone(),
+                muted,
+            })
+            .is_ok()
+        });
+    }
+
+    // A vec of publisher IDs
+    fn active_speakers_changed(&self, speakers: Vec<String>) {
+        self.remote_audio_track_subscribers
+            .lock()
+            .retain(move |tx| {
+                tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
+                    speakers: speakers.clone(),
+                })
+                .is_ok()
+            });
+    }
+
+    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
+        let track = Arc::new(track);
+        self.remote_video_track_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
+                .is_ok()
+        });
+    }
+
+    fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
+        self.remote_video_track_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
+                publisher_id: publisher_id.clone(),
+                track_id: track_id.clone(),
+            })
+            .is_ok()
+        });
+    }
+
+    fn build_done_callback() -> (
+        extern "C" fn(*mut c_void, CFStringRef),
+        *mut c_void,
+        oneshot::Receiver<Result<()>>,
+    ) {
+        let (tx, rx) = oneshot::channel();
+        extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
+            let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
+            if error.is_null() {
+                let _ = tx.send(Ok(()));
+            } else {
+                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
+                let _ = tx.send(Err(anyhow!(error)));
+            }
+        }
+        (
+            done_callback,
+            Box::into_raw(Box::new(tx)) as *mut c_void,
+            rx,
+        )
+    }
+}
+
+impl Drop for Room {
+    fn drop(&mut self) {
+        unsafe {
+            let native_room = &*self.native_room.lock();
+            LKRoomDisconnect(*native_room);
+            CFRelease(native_room.0);
+        }
+    }
+}
+
+struct RoomDelegate {
+    native_delegate: swift::RoomDelegate,
+    _weak_room: Weak<Room>,
+}
+
+impl RoomDelegate {
+    fn new(weak_room: Weak<Room>) -> Self {
+        let native_delegate = unsafe {
+            LKRoomDelegateCreate(
+                weak_room.as_ptr() as *mut c_void,
+                Self::on_did_disconnect,
+                Self::on_did_subscribe_to_remote_audio_track,
+                Self::on_did_unsubscribe_from_remote_audio_track,
+                Self::on_mute_change_from_remote_audio_track,
+                Self::on_active_speakers_changed,
+                Self::on_did_subscribe_to_remote_video_track,
+                Self::on_did_unsubscribe_from_remote_video_track,
+            )
+        };
+        Self {
+            native_delegate,
+            _weak_room: weak_room,
+        }
+    }
+
+    extern "C" fn on_did_disconnect(room: *mut c_void) {
+        let room = unsafe { Weak::from_raw(room as *mut Room) };
+        if let Some(room) = room.upgrade() {
+            room.did_disconnect();
+        }
+        let _ = Weak::into_raw(room);
+    }
+
+    extern "C" fn on_did_subscribe_to_remote_audio_track(
+        room: *mut c_void,
+        publisher_id: CFStringRef,
+        track_id: CFStringRef,
+        track: swift::RemoteAudioTrack,
+        publication: swift::RemoteTrackPublication,
+    ) {
+        let room = unsafe { Weak::from_raw(room as *mut Room) };
+        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
+        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
+        let track = RemoteAudioTrack::new(track, track_id, publisher_id);
+        let publication = RemoteTrackPublication::new(publication);
+        if let Some(room) = room.upgrade() {
+            room.did_subscribe_to_remote_audio_track(track, publication);
+        }
+        let _ = Weak::into_raw(room);
+    }
+
+    extern "C" fn on_did_unsubscribe_from_remote_audio_track(
+        room: *mut c_void,
+        publisher_id: CFStringRef,
+        track_id: CFStringRef,
+    ) {
+        let room = unsafe { Weak::from_raw(room as *mut Room) };
+        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
+        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
+        if let Some(room) = room.upgrade() {
+            room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id);
+        }
+        let _ = Weak::into_raw(room);
+    }
+
+    extern "C" fn on_mute_change_from_remote_audio_track(
+        room: *mut c_void,
+        track_id: CFStringRef,
+        muted: bool,
+    ) {
+        let room = unsafe { Weak::from_raw(room as *mut Room) };
+        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
+        if let Some(room) = room.upgrade() {
+            room.mute_changed_from_remote_audio_track(track_id, muted);
+        }
+        let _ = Weak::into_raw(room);
+    }
+
+    extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) {
+        if participants.is_null() {
+            return;
+        }
+
+        let room = unsafe { Weak::from_raw(room as *mut Room) };
+        let speakers = unsafe {
+            CFArray::wrap_under_get_rule(participants)
+                .into_iter()
+                .map(
+                    |speaker: core_foundation::base::ItemRef<'_, *const c_void>| {
+                        CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string()
+                    },
+                )
+                .collect()
+        };
+
+        if let Some(room) = room.upgrade() {
+            room.active_speakers_changed(speakers);
+        }
+        let _ = Weak::into_raw(room);
+    }
+
+    extern "C" fn on_did_subscribe_to_remote_video_track(
+        room: *mut c_void,
+        publisher_id: CFStringRef,
+        track_id: CFStringRef,
+        track: swift::RemoteVideoTrack,
+    ) {
+        let room = unsafe { Weak::from_raw(room as *mut Room) };
+        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
+        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
+        let track = RemoteVideoTrack::new(track, track_id, publisher_id);
+        if let Some(room) = room.upgrade() {
+            room.did_subscribe_to_remote_video_track(track);
+        }
+        let _ = Weak::into_raw(room);
+    }
+
+    extern "C" fn on_did_unsubscribe_from_remote_video_track(
+        room: *mut c_void,
+        publisher_id: CFStringRef,
+        track_id: CFStringRef,
+    ) {
+        let room = unsafe { Weak::from_raw(room as *mut Room) };
+        let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
+        let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
+        if let Some(room) = room.upgrade() {
+            room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
+        }
+        let _ = Weak::into_raw(room);
+    }
+}
+
+impl Drop for RoomDelegate {
+    fn drop(&mut self) {
+        unsafe {
+            CFRelease(self.native_delegate.0);
+        }
+    }
+}
+
+pub struct LocalAudioTrack(swift::LocalAudioTrack);
+
+impl LocalAudioTrack {
+    pub fn create() -> Self {
+        Self(unsafe { LKLocalAudioTrackCreateTrack() })
+    }
+}
+
+impl Drop for LocalAudioTrack {
+    fn drop(&mut self) {
+        unsafe { CFRelease(self.0 .0) }
+    }
+}
+
+pub struct LocalVideoTrack(swift::LocalVideoTrack);
+
+impl LocalVideoTrack {
+    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
+        Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
+    }
+}
+
+impl Drop for LocalVideoTrack {
+    fn drop(&mut self) {
+        unsafe { CFRelease(self.0 .0) }
+    }
+}
+
+pub struct LocalTrackPublication(swift::LocalTrackPublication);
+
+impl LocalTrackPublication {
+    pub fn new(native_track_publication: swift::LocalTrackPublication) -> Self {
+        unsafe {
+            CFRetain(native_track_publication.0);
+        }
+        Self(native_track_publication)
+    }
+
+    pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
+        let (tx, rx) = futures::channel::oneshot::channel();
+
+        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
+            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
+            if error.is_null() {
+                tx.send(Ok(())).ok();
+            } else {
+                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
+                tx.send(Err(anyhow!(error))).ok();
+            }
+        }
+
+        unsafe {
+            LKLocalTrackPublicationSetMute(
+                self.0,
+                muted,
+                complete_callback,
+                Box::into_raw(Box::new(tx)) as *mut c_void,
+            )
+        }
+
+        async move { rx.await.unwrap() }
+    }
+}
+
+impl Drop for LocalTrackPublication {
+    fn drop(&mut self) {
+        unsafe { CFRelease(self.0 .0) }
+    }
+}
+
+pub struct RemoteTrackPublication {
+    native_publication: Mutex<swift::RemoteTrackPublication>,
+}
+
+impl RemoteTrackPublication {
+    pub fn new(native_track_publication: swift::RemoteTrackPublication) -> Self {
+        unsafe {
+            CFRetain(native_track_publication.0);
+        }
+        Self {
+            native_publication: Mutex::new(native_track_publication),
+        }
+    }
+
+    pub fn sid(&self) -> String {
+        unsafe {
+            CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid(
+                *self.native_publication.lock(),
+            ))
+            .to_string()
+        }
+    }
+
+    pub fn is_muted(&self) -> bool {
+        unsafe { LKRemoteTrackPublicationIsMuted(*self.native_publication.lock()) }
+    }
+
+    pub fn set_enabled(&self, enabled: bool) -> impl Future<Output = Result<()>> {
+        let (tx, rx) = futures::channel::oneshot::channel();
+
+        extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) {
+            let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender<Result<()>>) };
+            if error.is_null() {
+                tx.send(Ok(())).ok();
+            } else {
+                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
+                tx.send(Err(anyhow!(error))).ok();
+            }
+        }
+
+        unsafe {
+            LKRemoteTrackPublicationSetEnabled(
+                *self.native_publication.lock(),
+                enabled,
+                complete_callback,
+                Box::into_raw(Box::new(tx)) as *mut c_void,
+            )
+        }
+
+        async move { rx.await.unwrap() }
+    }
+}
+
+impl Drop for RemoteTrackPublication {
+    fn drop(&mut self) {
+        unsafe { CFRelease((*self.native_publication.lock()).0) }
+    }
+}
+
+#[derive(Debug)]
+pub struct RemoteAudioTrack {
+    native_track: Mutex<swift::RemoteAudioTrack>,
+    sid: Sid,
+    publisher_id: String,
+}
+
+impl RemoteAudioTrack {
+    fn new(native_track: swift::RemoteAudioTrack, sid: Sid, publisher_id: String) -> Self {
+        unsafe {
+            CFRetain(native_track.0);
+        }
+        Self {
+            native_track: Mutex::new(native_track),
+            sid,
+            publisher_id,
+        }
+    }
+
+    pub fn sid(&self) -> &str {
+        &self.sid
+    }
+
+    pub fn publisher_id(&self) -> &str {
+        &self.publisher_id
+    }
+
+    pub fn enable(&self) -> impl Future<Output = Result<()>> {
+        async { Ok(()) }
+    }
+
+    pub fn disable(&self) -> impl Future<Output = Result<()>> {
+        async { Ok(()) }
+    }
+}
+
+impl Drop for RemoteAudioTrack {
+    fn drop(&mut self) {
+        unsafe { CFRelease(self.native_track.lock().0) }
+    }
+}
+
+#[derive(Debug)]
+pub struct RemoteVideoTrack {
+    native_track: Mutex<swift::RemoteVideoTrack>,
+    sid: Sid,
+    publisher_id: String,
+}
+
+impl RemoteVideoTrack {
+    fn new(native_track: swift::RemoteVideoTrack, sid: Sid, publisher_id: String) -> Self {
+        unsafe {
+            CFRetain(native_track.0);
+        }
+        Self {
+            native_track: Mutex::new(native_track),
+            sid,
+            publisher_id,
+        }
+    }
+
+    pub fn sid(&self) -> &str {
+        &self.sid
+    }
+
+    pub fn publisher_id(&self) -> &str {
+        &self.publisher_id
+    }
+
+    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
+        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
+            unsafe {
+                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
+                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
+                let result = tx.try_broadcast(Frame(buffer));
+                let _ = Box::into_raw(tx);
+                match result {
+                    Ok(_) => true,
+                    Err(async_broadcast::TrySendError::Closed(_))
+                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
+                        log::warn!("no active receiver for frame");
+                        false
+                    }
+                    Err(async_broadcast::TrySendError::Full(_)) => {
+                        log::warn!("skipping frame as receiver is not keeping up");
+                        true
+                    }
+                }
+            }
+        }
+
+        extern "C" fn on_drop(callback_data: *mut c_void) {
+            unsafe {
+                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
+            }
+        }
+
+        let (tx, rx) = async_broadcast::broadcast(64);
+        unsafe {
+            let renderer = LKVideoRendererCreate(
+                Box::into_raw(Box::new(tx)) as *mut c_void,
+                on_frame,
+                on_drop,
+            );
+            LKVideoTrackAddRenderer(*self.native_track.lock(), renderer);
+            rx
+        }
+    }
+}
+
+impl Drop for RemoteVideoTrack {
+    fn drop(&mut self) {
+        unsafe { CFRelease(self.native_track.lock().0) }
+    }
+}
+
+pub enum RemoteVideoTrackUpdate {
+    Subscribed(Arc<RemoteVideoTrack>),
+    Unsubscribed { publisher_id: Sid, track_id: Sid },
+}
+
+pub enum RemoteAudioTrackUpdate {
+    ActiveSpeakersChanged { speakers: Vec<Sid> },
+    MuteChanged { track_id: Sid, muted: bool },
+    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
+    Unsubscribed { publisher_id: Sid, track_id: Sid },
+}
+
+pub struct MacOSDisplay(swift::MacOSDisplay);
+
+impl MacOSDisplay {
+    fn new(ptr: swift::MacOSDisplay) -> Self {
+        unsafe {
+            CFRetain(ptr.0);
+        }
+        Self(ptr)
+    }
+}
+
+impl Drop for MacOSDisplay {
+    fn drop(&mut self) {
+        unsafe { CFRelease(self.0 .0) }
+    }
+}
+
+#[derive(Clone)]
+pub struct Frame(CVImageBuffer);
+
+impl Frame {
+    pub fn width(&self) -> usize {
+        self.0.width()
+    }
+
+    pub fn height(&self) -> usize {
+        self.0.height()
+    }
+
+    pub fn image(&self) -> CVImageBuffer {
+        self.0.clone()
+    }
+}

crates/live_kit_client2/src/test.rs 🔗

@@ -0,0 +1,647 @@
+use anyhow::{anyhow, Result};
+use async_trait::async_trait;
+use collections::{BTreeMap, HashMap};
+use futures::Stream;
+use gpui2::Executor;
+use live_kit_server::token;
+use media::core_video::CVImageBuffer;
+use parking_lot::Mutex;
+use postage::watch;
+use std::{future::Future, mem, sync::Arc};
+
+static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
+
+pub struct TestServer {
+    pub url: String,
+    pub api_key: String,
+    pub secret_key: String,
+    rooms: Mutex<HashMap<String, TestServerRoom>>,
+    executor: Arc<Executor>,
+}
+
+impl TestServer {
+    pub fn create(
+        url: String,
+        api_key: String,
+        secret_key: String,
+        executor: Arc<Executor>,
+    ) -> Result<Arc<TestServer>> {
+        let mut servers = SERVERS.lock();
+        if servers.contains_key(&url) {
+            Err(anyhow!("a server with url {:?} already exists", url))
+        } else {
+            let server = Arc::new(TestServer {
+                url: url.clone(),
+                api_key,
+                secret_key,
+                rooms: Default::default(),
+                executor,
+            });
+            servers.insert(url, server.clone());
+            Ok(server)
+        }
+    }
+
+    fn get(url: &str) -> Result<Arc<TestServer>> {
+        Ok(SERVERS
+            .lock()
+            .get(url)
+            .ok_or_else(|| anyhow!("no server found for url"))?
+            .clone())
+    }
+
+    pub fn teardown(&self) -> Result<()> {
+        SERVERS
+            .lock()
+            .remove(&self.url)
+            .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
+        Ok(())
+    }
+
+    pub fn create_api_client(&self) -> TestApiClient {
+        TestApiClient {
+            url: self.url.clone(),
+        }
+    }
+
+    pub async fn create_room(&self, room: String) -> Result<()> {
+        self.executor.simulate_random_delay().await;
+        let mut server_rooms = self.rooms.lock();
+        if server_rooms.contains_key(&room) {
+            Err(anyhow!("room {:?} already exists", room))
+        } else {
+            server_rooms.insert(room, Default::default());
+            Ok(())
+        }
+    }
+
+    async fn delete_room(&self, room: String) -> Result<()> {
+        // TODO: clear state associated with all `Room`s.
+        self.executor.simulate_random_delay().await;
+        let mut server_rooms = self.rooms.lock();
+        server_rooms
+            .remove(&room)
+            .ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
+        Ok(())
+    }
+
+    async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
+        self.executor.simulate_random_delay().await;
+        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+        let identity = claims.sub.unwrap().to_string();
+        let room_name = claims.video.room.unwrap();
+        let mut server_rooms = self.rooms.lock();
+        let room = (*server_rooms).entry(room_name.to_string()).or_default();
+
+        if room.client_rooms.contains_key(&identity) {
+            Err(anyhow!(
+                "{:?} attempted to join room {:?} twice",
+                identity,
+                room_name
+            ))
+        } else {
+            for track in &room.video_tracks {
+                client_room
+                    .0
+                    .lock()
+                    .video_track_updates
+                    .0
+                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
+                    .unwrap();
+            }
+            room.client_rooms.insert(identity, client_room);
+            Ok(())
+        }
+    }
+
+    async fn leave_room(&self, token: String) -> Result<()> {
+        self.executor.simulate_random_delay().await;
+        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+        let identity = claims.sub.unwrap().to_string();
+        let room_name = claims.video.room.unwrap();
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&*room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+        room.client_rooms.remove(&identity).ok_or_else(|| {
+            anyhow!(
+                "{:?} attempted to leave room {:?} before joining it",
+                identity,
+                room_name
+            )
+        })?;
+        Ok(())
+    }
+
+    async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
+        // TODO: clear state associated with the `Room`.
+
+        self.executor.simulate_random_delay().await;
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+        room.client_rooms.remove(&identity).ok_or_else(|| {
+            anyhow!(
+                "participant {:?} did not join room {:?}",
+                identity,
+                room_name
+            )
+        })?;
+        Ok(())
+    }
+
+    pub async fn disconnect_client(&self, client_identity: String) {
+        self.executor.simulate_random_delay().await;
+        let mut server_rooms = self.rooms.lock();
+        for room in server_rooms.values_mut() {
+            if let Some(room) = room.client_rooms.remove(&client_identity) {
+                *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
+            }
+        }
+    }
+
+    async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
+        self.executor.simulate_random_delay().await;
+        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+        let identity = claims.sub.unwrap().to_string();
+        let room_name = claims.video.room.unwrap();
+
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&*room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+
+        let track = Arc::new(RemoteVideoTrack {
+            sid: nanoid::nanoid!(17),
+            publisher_id: identity.clone(),
+            frames_rx: local_track.frames_rx.clone(),
+        });
+
+        room.video_tracks.push(track.clone());
+
+        for (id, client_room) in &room.client_rooms {
+            if *id != identity {
+                let _ = client_room
+                    .0
+                    .lock()
+                    .video_track_updates
+                    .0
+                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
+                    .unwrap();
+            }
+        }
+
+        Ok(())
+    }
+
+    async fn publish_audio_track(
+        &self,
+        token: String,
+        _local_track: &LocalAudioTrack,
+    ) -> Result<()> {
+        self.executor.simulate_random_delay().await;
+        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+        let identity = claims.sub.unwrap().to_string();
+        let room_name = claims.video.room.unwrap();
+
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&*room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+
+        let track = Arc::new(RemoteAudioTrack {
+            sid: nanoid::nanoid!(17),
+            publisher_id: identity.clone(),
+        });
+
+        let publication = Arc::new(RemoteTrackPublication);
+
+        room.audio_tracks.push(track.clone());
+
+        for (id, client_room) in &room.client_rooms {
+            if *id != identity {
+                let _ = client_room
+                    .0
+                    .lock()
+                    .audio_track_updates
+                    .0
+                    .try_broadcast(RemoteAudioTrackUpdate::Subscribed(
+                        track.clone(),
+                        publication.clone(),
+                    ))
+                    .unwrap();
+            }
+        }
+
+        Ok(())
+    }
+
+    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
+        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+        let room_name = claims.video.room.unwrap();
+
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&*room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+        Ok(room.video_tracks.clone())
+    }
+
+    fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
+        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+        let room_name = claims.video.room.unwrap();
+
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&*room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+        Ok(room.audio_tracks.clone())
+    }
+}
+
+#[derive(Default)]
+struct TestServerRoom {
+    client_rooms: HashMap<Sid, Arc<Room>>,
+    video_tracks: Vec<Arc<RemoteVideoTrack>>,
+    audio_tracks: Vec<Arc<RemoteAudioTrack>>,
+}
+
+impl TestServerRoom {}
+
+pub struct TestApiClient {
+    url: String,
+}
+
+#[async_trait]
+impl live_kit_server::api::Client for TestApiClient {
+    fn url(&self) -> &str {
+        &self.url
+    }
+
+    async fn create_room(&self, name: String) -> Result<()> {
+        let server = TestServer::get(&self.url)?;
+        server.create_room(name).await?;
+        Ok(())
+    }
+
+    async fn delete_room(&self, name: String) -> Result<()> {
+        let server = TestServer::get(&self.url)?;
+        server.delete_room(name).await?;
+        Ok(())
+    }
+
+    async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
+        let server = TestServer::get(&self.url)?;
+        server.remove_participant(room, identity).await?;
+        Ok(())
+    }
+
+    fn room_token(&self, room: &str, identity: &str) -> Result<String> {
+        let server = TestServer::get(&self.url)?;
+        token::create(
+            &server.api_key,
+            &server.secret_key,
+            Some(identity),
+            token::VideoGrant::to_join(room),
+        )
+    }
+
+    fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
+        let server = TestServer::get(&self.url)?;
+        token::create(
+            &server.api_key,
+            &server.secret_key,
+            Some(identity),
+            token::VideoGrant::for_guest(room),
+        )
+    }
+}
+
+pub type Sid = String;
+
+struct RoomState {
+    connection: (
+        watch::Sender<ConnectionState>,
+        watch::Receiver<ConnectionState>,
+    ),
+    display_sources: Vec<MacOSDisplay>,
+    audio_track_updates: (
+        async_broadcast::Sender<RemoteAudioTrackUpdate>,
+        async_broadcast::Receiver<RemoteAudioTrackUpdate>,
+    ),
+    video_track_updates: (
+        async_broadcast::Sender<RemoteVideoTrackUpdate>,
+        async_broadcast::Receiver<RemoteVideoTrackUpdate>,
+    ),
+}
+
+#[derive(Clone, Eq, PartialEq)]
+pub enum ConnectionState {
+    Disconnected,
+    Connected { url: String, token: String },
+}
+
+pub struct Room(Mutex<RoomState>);
+
+impl Room {
+    pub fn new() -> Arc<Self> {
+        Arc::new(Self(Mutex::new(RoomState {
+            connection: watch::channel_with(ConnectionState::Disconnected),
+            display_sources: Default::default(),
+            video_track_updates: async_broadcast::broadcast(128),
+            audio_track_updates: async_broadcast::broadcast(128),
+        })))
+    }
+
+    pub fn status(&self) -> watch::Receiver<ConnectionState> {
+        self.0.lock().connection.1.clone()
+    }
+
+    pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
+        let this = self.clone();
+        let url = url.to_string();
+        let token = token.to_string();
+        async move {
+            let server = TestServer::get(&url)?;
+            server.join_room(token.clone(), this.clone()).await?;
+            *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
+            Ok(())
+        }
+    }
+
+    pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
+        let this = self.clone();
+        async move {
+            let server = this.test_server();
+            server.executor.simulate_random_delay().await;
+            Ok(this.0.lock().display_sources.clone())
+        }
+    }
+
+    pub fn publish_video_track(
+        self: &Arc<Self>,
+        track: LocalVideoTrack,
+    ) -> impl Future<Output = Result<LocalTrackPublication>> {
+        let this = self.clone();
+        let track = track.clone();
+        async move {
+            this.test_server()
+                .publish_video_track(this.token(), track)
+                .await?;
+            Ok(LocalTrackPublication)
+        }
+    }
+    pub fn publish_audio_track(
+        self: &Arc<Self>,
+        track: LocalAudioTrack,
+    ) -> impl Future<Output = Result<LocalTrackPublication>> {
+        let this = self.clone();
+        let track = track.clone();
+        async move {
+            this.test_server()
+                .publish_audio_track(this.token(), &track)
+                .await?;
+            Ok(LocalTrackPublication)
+        }
+    }
+
+    pub fn unpublish_track(&self, _publication: LocalTrackPublication) {}
+
+    pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteAudioTrack>> {
+        if !self.is_connected() {
+            return Vec::new();
+        }
+
+        self.test_server()
+            .audio_tracks(self.token())
+            .unwrap()
+            .into_iter()
+            .filter(|track| track.publisher_id() == publisher_id)
+            .collect()
+    }
+
+    pub fn remote_audio_track_publications(
+        &self,
+        publisher_id: &str,
+    ) -> Vec<Arc<RemoteTrackPublication>> {
+        if !self.is_connected() {
+            return Vec::new();
+        }
+
+        self.test_server()
+            .audio_tracks(self.token())
+            .unwrap()
+            .into_iter()
+            .filter(|track| track.publisher_id() == publisher_id)
+            .map(|_track| Arc::new(RemoteTrackPublication {}))
+            .collect()
+    }
+
+    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
+        if !self.is_connected() {
+            return Vec::new();
+        }
+
+        self.test_server()
+            .video_tracks(self.token())
+            .unwrap()
+            .into_iter()
+            .filter(|track| track.publisher_id() == publisher_id)
+            .collect()
+    }
+
+    pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
+        self.0.lock().audio_track_updates.1.clone()
+    }
+
+    pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
+        self.0.lock().video_track_updates.1.clone()
+    }
+
+    pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
+        self.0.lock().display_sources = sources;
+    }
+
+    fn test_server(&self) -> Arc<TestServer> {
+        match self.0.lock().connection.1.borrow().clone() {
+            ConnectionState::Disconnected => panic!("must be connected to call this method"),
+            ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
+        }
+    }
+
+    fn token(&self) -> String {
+        match self.0.lock().connection.1.borrow().clone() {
+            ConnectionState::Disconnected => panic!("must be connected to call this method"),
+            ConnectionState::Connected { token, .. } => token,
+        }
+    }
+
+    fn is_connected(&self) -> bool {
+        match *self.0.lock().connection.1.borrow() {
+            ConnectionState::Disconnected => false,
+            ConnectionState::Connected { .. } => true,
+        }
+    }
+}
+
+impl Drop for Room {
+    fn drop(&mut self) {
+        if let ConnectionState::Connected { token, .. } = mem::replace(
+            &mut *self.0.lock().connection.0.borrow_mut(),
+            ConnectionState::Disconnected,
+        ) {
+            if let Ok(server) = TestServer::get(&token) {
+                let executor = server.executor.clone();
+                executor
+                    .spawn(async move { server.leave_room(token).await.unwrap() })
+                    .detach();
+            }
+        }
+    }
+}
+
+pub struct LocalTrackPublication;
+
+impl LocalTrackPublication {
+    pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
+        async { Ok(()) }
+    }
+}
+
+pub struct RemoteTrackPublication;
+
+impl RemoteTrackPublication {
+    pub fn set_enabled(&self, _enabled: bool) -> impl Future<Output = Result<()>> {
+        async { Ok(()) }
+    }
+
+    pub fn is_muted(&self) -> bool {
+        false
+    }
+
+    pub fn sid(&self) -> String {
+        "".to_string()
+    }
+}
+
+#[derive(Clone)]
+pub struct LocalVideoTrack {
+    frames_rx: async_broadcast::Receiver<Frame>,
+}
+
+impl LocalVideoTrack {
+    pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
+        Self {
+            frames_rx: display.frames.1.clone(),
+        }
+    }
+}
+
+#[derive(Clone)]
+pub struct LocalAudioTrack;
+
+impl LocalAudioTrack {
+    pub fn create() -> Self {
+        Self
+    }
+}
+
+pub struct RemoteVideoTrack {
+    sid: Sid,
+    publisher_id: Sid,
+    frames_rx: async_broadcast::Receiver<Frame>,
+}
+
+impl RemoteVideoTrack {
+    pub fn sid(&self) -> &str {
+        &self.sid
+    }
+
+    pub fn publisher_id(&self) -> &str {
+        &self.publisher_id
+    }
+
+    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
+        self.frames_rx.clone()
+    }
+}
+
+#[derive(Debug)]
+pub struct RemoteAudioTrack {
+    sid: Sid,
+    publisher_id: Sid,
+}
+
+impl RemoteAudioTrack {
+    pub fn sid(&self) -> &str {
+        &self.sid
+    }
+
+    pub fn publisher_id(&self) -> &str {
+        &self.publisher_id
+    }
+
+    pub fn enable(&self) -> impl Future<Output = Result<()>> {
+        async { Ok(()) }
+    }
+
+    pub fn disable(&self) -> impl Future<Output = Result<()>> {
+        async { Ok(()) }
+    }
+}
+
+#[derive(Clone)]
+pub enum RemoteVideoTrackUpdate {
+    Subscribed(Arc<RemoteVideoTrack>),
+    Unsubscribed { publisher_id: Sid, track_id: Sid },
+}
+
+#[derive(Clone)]
+pub enum RemoteAudioTrackUpdate {
+    ActiveSpeakersChanged { speakers: Vec<Sid> },
+    MuteChanged { track_id: Sid, muted: bool },
+    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
+    Unsubscribed { publisher_id: Sid, track_id: Sid },
+}
+
+#[derive(Clone)]
+pub struct MacOSDisplay {
+    frames: (
+        async_broadcast::Sender<Frame>,
+        async_broadcast::Receiver<Frame>,
+    ),
+}
+
+impl MacOSDisplay {
+    pub fn new() -> Self {
+        Self {
+            frames: async_broadcast::broadcast(128),
+        }
+    }
+
+    pub fn send_frame(&self, frame: Frame) {
+        self.frames.0.try_broadcast(frame).unwrap();
+    }
+}
+
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct Frame {
+    pub label: String,
+    pub width: usize,
+    pub height: usize,
+}
+
+impl Frame {
+    pub fn width(&self) -> usize {
+        self.width
+    }
+
+    pub fn height(&self) -> usize {
+        self.height
+    }
+
+    pub fn image(&self) -> CVImageBuffer {
+        unimplemented!("you can't call this in test mode")
+    }
+}