mod.rs

  1pub mod participant;
  2pub mod room;
  3
  4use anyhow::{Context as _, Result, anyhow};
  5use audio::Audio;
  6use client::{ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE, proto};
  7use collections::HashSet;
  8use futures::{Future, FutureExt, channel::oneshot, future::Shared};
  9use gpui::{
 10    AnyView, App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task,
 11    WeakEntity, Window,
 12};
 13use postage::watch;
 14use project::Project;
 15use room::Event;
 16use settings::Settings;
 17use std::sync::Arc;
 18use workspace::{
 19    ActiveCallEvent, AnyActiveCall, GlobalAnyActiveCall, Pane, RemoteCollaborator, SharedScreen,
 20    Workspace,
 21};
 22
 23pub use livekit_client::{RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent};
 24pub use room::Room;
 25
 26use crate::call_settings::CallSettings;
 27
 28pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
 29    let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
 30    cx.set_global(GlobalAnyActiveCall(Arc::new(ActiveCallEntity(active_call))))
 31}
 32
 33#[derive(Clone)]
 34struct ActiveCallEntity(Entity<ActiveCall>);
 35
 36impl AnyActiveCall for ActiveCallEntity {
 37    fn entity(&self) -> gpui::AnyEntity {
 38        self.0.clone().into_any()
 39    }
 40
 41    fn is_in_room(&self, cx: &App) -> bool {
 42        self.0.read(cx).room().is_some()
 43    }
 44
 45    fn room_id(&self, cx: &App) -> Option<u64> {
 46        Some(self.0.read(cx).room()?.read(cx).id())
 47    }
 48
 49    fn channel_id(&self, cx: &App) -> Option<ChannelId> {
 50        self.0.read(cx).room()?.read(cx).channel_id()
 51    }
 52
 53    fn hang_up(&self, cx: &mut App) -> Task<Result<()>> {
 54        self.0.update(cx, |this, cx| this.hang_up(cx))
 55    }
 56
 57    fn unshare_project(&self, project: Entity<Project>, cx: &mut App) -> Result<()> {
 58        self.0
 59            .update(cx, |this, cx| this.unshare_project(project, cx))
 60    }
 61
 62    fn remote_participant_for_peer_id(
 63        &self,
 64        peer_id: proto::PeerId,
 65        cx: &App,
 66    ) -> Option<workspace::RemoteCollaborator> {
 67        let room = self.0.read(cx).room()?.read(cx);
 68        let participant = room.remote_participant_for_peer_id(peer_id)?;
 69        Some(RemoteCollaborator {
 70            user: participant.user.clone(),
 71            peer_id: participant.peer_id,
 72            location: participant.location,
 73            participant_index: participant.participant_index,
 74        })
 75    }
 76
 77    fn is_sharing_project(&self, cx: &App) -> bool {
 78        self.0
 79            .read(cx)
 80            .room()
 81            .map_or(false, |room| room.read(cx).is_sharing_project())
 82    }
 83
 84    fn has_remote_participants(&self, cx: &App) -> bool {
 85        self.0.read(cx).room().map_or(false, |room| {
 86            !room.read(cx).remote_participants().is_empty()
 87        })
 88    }
 89
 90    fn local_participant_is_guest(&self, cx: &App) -> bool {
 91        self.0
 92            .read(cx)
 93            .room()
 94            .map_or(false, |room| room.read(cx).local_participant_is_guest())
 95    }
 96
 97    fn client(&self, cx: &App) -> Arc<Client> {
 98        self.0.read(cx).client()
 99    }
100
101    fn share_on_join(&self, cx: &App) -> bool {
102        CallSettings::get_global(cx).share_on_join
103    }
104
105    fn join_channel(&self, channel_id: ChannelId, cx: &mut App) -> Task<Result<bool>> {
106        let task = self
107            .0
108            .update(cx, |this, cx| this.join_channel(channel_id, cx));
109        cx.spawn(async move |_cx| {
110            let result = task.await?;
111            Ok(result.is_some())
112        })
113    }
114
115    fn room_update_completed(&self, cx: &mut App) -> Task<()> {
116        let Some(room) = self.0.read(cx).room().cloned() else {
117            return Task::ready(());
118        };
119        let future = room.update(cx, |room, _cx| room.room_update_completed());
120        cx.spawn(async move |_cx| {
121            future.await;
122        })
123    }
124
125    fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
126        let room = self.0.read(cx).room()?;
127        room.read(cx).most_active_project(cx)
128    }
129
130    fn share_project(&self, project: Entity<Project>, cx: &mut App) -> Task<Result<u64>> {
131        self.0
132            .update(cx, |this, cx| this.share_project(project, cx))
133    }
134
135    fn join_project(
136        &self,
137        project_id: u64,
138        language_registry: Arc<language::LanguageRegistry>,
139        fs: Arc<dyn fs::Fs>,
140        cx: &mut App,
141    ) -> Task<Result<Entity<Project>>> {
142        let Some(room) = self.0.read(cx).room().cloned() else {
143            return Task::ready(Err(anyhow::anyhow!("not in a call")));
144        };
145        room.update(cx, |room, cx| {
146            room.join_project(project_id, language_registry, fs, cx)
147        })
148    }
149
150    fn peer_id_for_user_in_room(&self, user_id: u64, cx: &App) -> Option<proto::PeerId> {
151        let room = self.0.read(cx).room()?.read(cx);
152        room.remote_participants()
153            .values()
154            .find(|p| p.user.id == user_id)
155            .map(|p| p.peer_id)
156    }
157
158    fn subscribe(
159        &self,
160        window: &mut Window,
161        cx: &mut Context<Workspace>,
162        handler: Box<
163            dyn Fn(&mut Workspace, &ActiveCallEvent, &mut Window, &mut Context<Workspace>),
164        >,
165    ) -> Subscription {
166        cx.subscribe_in(
167            &self.0,
168            window,
169            move |workspace, _, event: &room::Event, window, cx| {
170                let mapped = match event {
171                    room::Event::ParticipantLocationChanged { participant_id } => {
172                        Some(ActiveCallEvent::ParticipantLocationChanged {
173                            participant_id: *participant_id,
174                        })
175                    }
176                    room::Event::RemoteVideoTracksChanged { participant_id } => {
177                        Some(ActiveCallEvent::RemoteVideoTracksChanged {
178                            participant_id: *participant_id,
179                        })
180                    }
181                    _ => None,
182                };
183                if let Some(event) = mapped {
184                    handler(workspace, &event, window, cx);
185                }
186            },
187        )
188    }
189
190    fn create_shared_screen(
191        &self,
192        peer_id: client::proto::PeerId,
193        pane: &Entity<Pane>,
194        window: &mut Window,
195        cx: &mut App,
196    ) -> Option<Entity<workspace::SharedScreen>> {
197        let room = self.0.read(cx).room()?.clone();
198        let participant = room.read(cx).remote_participant_for_peer_id(peer_id)?;
199        let track = participant.video_tracks.values().next()?.clone();
200        let user = participant.user.clone();
201
202        for item in pane.read(cx).items_of_type::<SharedScreen>() {
203            if item.read(cx).peer_id == peer_id {
204                return Some(item);
205            }
206        }
207
208        Some(cx.new(|cx: &mut Context<SharedScreen>| {
209            let my_sid = track.sid();
210            cx.subscribe(
211                &room,
212                move |_: &mut SharedScreen,
213                      _: Entity<Room>,
214                      ev: &room::Event,
215                      cx: &mut Context<SharedScreen>| {
216                    if let room::Event::RemoteVideoTrackUnsubscribed { sid } = ev
217                        && *sid == my_sid
218                    {
219                        cx.emit(workspace::shared_screen::Event::Close);
220                    }
221                },
222            )
223            .detach();
224
225            cx.observe_release(
226                &room,
227                |_: &mut SharedScreen, _: &mut Room, cx: &mut Context<SharedScreen>| {
228                    cx.emit(workspace::shared_screen::Event::Close);
229                },
230            )
231            .detach();
232
233            let view = cx.new(|cx| RemoteVideoTrackView::new(track.clone(), window, cx));
234            cx.subscribe(
235                &view,
236                |_: &mut SharedScreen,
237                 _: Entity<RemoteVideoTrackView>,
238                 ev: &RemoteVideoTrackViewEvent,
239                 cx: &mut Context<SharedScreen>| match ev {
240                    RemoteVideoTrackViewEvent::Close => {
241                        cx.emit(workspace::shared_screen::Event::Close);
242                    }
243                },
244            )
245            .detach();
246
247            pub(super) fn clone_remote_video_track_view(
248                view: &AnyView,
249                window: &mut Window,
250                cx: &mut App,
251            ) -> AnyView {
252                let view = view
253                    .clone()
254                    .downcast::<RemoteVideoTrackView>()
255                    .expect("SharedScreen view must be a RemoteVideoTrackView");
256                let cloned = view.update(cx, |view, cx| view.clone(window, cx));
257                AnyView::from(cloned)
258            }
259
260            SharedScreen::new(
261                peer_id,
262                user,
263                AnyView::from(view),
264                clone_remote_video_track_view,
265                cx,
266            )
267        }))
268    }
269}
270
271pub struct OneAtATime {
272    cancel: Option<oneshot::Sender<()>>,
273}
274
275impl OneAtATime {
276    /// spawn a task in the given context.
277    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
278    /// otherwise you'll see the result of the task.
279    fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
280    where
281        F: 'static + FnOnce(AsyncApp) -> Fut,
282        Fut: Future<Output = Result<R>>,
283        R: 'static,
284    {
285        let (tx, rx) = oneshot::channel();
286        self.cancel.replace(tx);
287        cx.spawn(async move |cx| {
288            futures::select_biased! {
289                _ = rx.fuse() => Ok(None),
290                result = f(cx.clone()).fuse() => result.map(Some),
291            }
292        })
293    }
294
295    fn running(&self) -> bool {
296        self.cancel
297            .as_ref()
298            .is_some_and(|cancel| !cancel.is_canceled())
299    }
300}
301
302#[derive(Clone)]
303pub struct IncomingCall {
304    pub room_id: u64,
305    pub calling_user: Arc<User>,
306    pub participants: Vec<Arc<User>>,
307    pub initial_project: Option<proto::ParticipantProject>,
308}
309
310/// Singleton global maintaining the user's participation in a room across workspaces.
311pub struct ActiveCall {
312    room: Option<(Entity<Room>, Vec<Subscription>)>,
313    pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
314    location: Option<WeakEntity<Project>>,
315    _join_debouncer: OneAtATime,
316    pending_invites: HashSet<u64>,
317    incoming_call: (
318        watch::Sender<Option<IncomingCall>>,
319        watch::Receiver<Option<IncomingCall>>,
320    ),
321    client: Arc<Client>,
322    user_store: Entity<UserStore>,
323    _subscriptions: Vec<client::Subscription>,
324}
325
326impl EventEmitter<Event> for ActiveCall {}
327
328impl ActiveCall {
329    fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
330        Self {
331            room: None,
332            pending_room_creation: None,
333            location: None,
334            pending_invites: Default::default(),
335            incoming_call: watch::channel(),
336            _join_debouncer: OneAtATime { cancel: None },
337            _subscriptions: vec![
338                client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
339                client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
340            ],
341            client,
342            user_store,
343        }
344    }
345
346    pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
347        self.room()?.read(cx).channel_id()
348    }
349
350    async fn handle_incoming_call(
351        this: Entity<Self>,
352        envelope: TypedEnvelope<proto::IncomingCall>,
353        mut cx: AsyncApp,
354    ) -> Result<proto::Ack> {
355        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
356        let call = IncomingCall {
357            room_id: envelope.payload.room_id,
358            participants: user_store
359                .update(&mut cx, |user_store, cx| {
360                    user_store.get_users(envelope.payload.participant_user_ids, cx)
361                })
362                .await?,
363            calling_user: user_store
364                .update(&mut cx, |user_store, cx| {
365                    user_store.get_user(envelope.payload.calling_user_id, cx)
366                })
367                .await?,
368            initial_project: envelope.payload.initial_project,
369        };
370        this.update(&mut cx, |this, _| {
371            *this.incoming_call.0.borrow_mut() = Some(call);
372        });
373
374        Ok(proto::Ack {})
375    }
376
377    async fn handle_call_canceled(
378        this: Entity<Self>,
379        envelope: TypedEnvelope<proto::CallCanceled>,
380        mut cx: AsyncApp,
381    ) -> Result<()> {
382        this.update(&mut cx, |this, _| {
383            let mut incoming_call = this.incoming_call.0.borrow_mut();
384            if incoming_call
385                .as_ref()
386                .is_some_and(|call| call.room_id == envelope.payload.room_id)
387            {
388                incoming_call.take();
389            }
390        });
391        Ok(())
392    }
393
394    pub fn global(cx: &App) -> Entity<Self> {
395        Self::try_global(cx).unwrap()
396    }
397
398    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
399        let any = cx.try_global::<GlobalAnyActiveCall>()?;
400        any.0.entity().downcast::<Self>().ok()
401    }
402
403    pub fn invite(
404        &mut self,
405        called_user_id: u64,
406        initial_project: Option<Entity<Project>>,
407        cx: &mut Context<Self>,
408    ) -> Task<Result<()>> {
409        if !self.pending_invites.insert(called_user_id) {
410            return Task::ready(Err(anyhow!("user was already invited")));
411        }
412        cx.notify();
413
414        if self._join_debouncer.running() {
415            return Task::ready(Ok(()));
416        }
417
418        let room = if let Some(room) = self.room().cloned() {
419            Some(Task::ready(Ok(room)).shared())
420        } else {
421            self.pending_room_creation.clone()
422        };
423
424        let invite = if let Some(room) = room {
425            cx.spawn(async move |_, cx| {
426                let room = room.await.map_err(|err| anyhow!("{err:?}"))?;
427
428                let initial_project_id = if let Some(initial_project) = initial_project {
429                    Some(
430                        room.update(cx, |room, cx| room.share_project(initial_project, cx))
431                            .await?,
432                    )
433                } else {
434                    None
435                };
436
437                room.update(cx, move |room, cx| {
438                    room.call(called_user_id, initial_project_id, cx)
439                })
440                .await?;
441
442                anyhow::Ok(())
443            })
444        } else {
445            let client = self.client.clone();
446            let user_store = self.user_store.clone();
447            let room = cx
448                .spawn(async move |this, cx| {
449                    let create_room = async {
450                        let room = cx
451                            .update(|cx| {
452                                Room::create(
453                                    called_user_id,
454                                    initial_project,
455                                    client,
456                                    user_store,
457                                    cx,
458                                )
459                            })
460                            .await?;
461
462                        this.update(cx, |this, cx| this.set_room(Some(room.clone()), cx))?
463                            .await?;
464
465                        anyhow::Ok(room)
466                    };
467
468                    let room = create_room.await;
469                    this.update(cx, |this, _| this.pending_room_creation = None)?;
470                    room.map_err(Arc::new)
471                })
472                .shared();
473            self.pending_room_creation = Some(room.clone());
474            cx.background_spawn(async move {
475                room.await.map_err(|err| anyhow!("{err:?}"))?;
476                anyhow::Ok(())
477            })
478        };
479
480        cx.spawn(async move |this, cx| {
481            let result = invite.await;
482            if result.is_ok() {
483                this.update(cx, |this, cx| {
484                    this.report_call_event("Participant Invited", cx)
485                })?;
486            } else {
487                //TODO: report collaboration error
488                log::error!("invite failed: {:?}", result);
489            }
490
491            this.update(cx, |this, cx| {
492                this.pending_invites.remove(&called_user_id);
493                cx.notify();
494            })?;
495            result
496        })
497    }
498
499    pub fn cancel_invite(
500        &mut self,
501        called_user_id: u64,
502        cx: &mut Context<Self>,
503    ) -> Task<Result<()>> {
504        let room_id = if let Some(room) = self.room() {
505            room.read(cx).id()
506        } else {
507            return Task::ready(Err(anyhow!("no active call")));
508        };
509
510        let client = self.client.clone();
511        cx.background_spawn(async move {
512            client
513                .request(proto::CancelCall {
514                    room_id,
515                    called_user_id,
516                })
517                .await?;
518            anyhow::Ok(())
519        })
520    }
521
522    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
523        self.incoming_call.1.clone()
524    }
525
526    pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
527        if self.room.is_some() {
528            return Task::ready(Err(anyhow!("cannot join while on another call")));
529        }
530
531        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
532            call
533        } else {
534            return Task::ready(Err(anyhow!("no incoming call")));
535        };
536
537        if self.pending_room_creation.is_some() {
538            return Task::ready(Ok(()));
539        }
540
541        let room_id = call.room_id;
542        let client = self.client.clone();
543        let user_store = self.user_store.clone();
544        let join = self
545            ._join_debouncer
546            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
547
548        cx.spawn(async move |this, cx| {
549            let room = join.await?;
550            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
551                .await?;
552            this.update(cx, |this, cx| {
553                this.report_call_event("Incoming Call Accepted", cx)
554            })?;
555            Ok(())
556        })
557    }
558
559    pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
560        let call = self
561            .incoming_call
562            .0
563            .borrow_mut()
564            .take()
565            .context("no incoming call")?;
566        telemetry::event!("Incoming Call Declined", room_id = call.room_id);
567        self.client.send(proto::DeclineCall {
568            room_id: call.room_id,
569        })?;
570        Ok(())
571    }
572
573    pub fn join_channel(
574        &mut self,
575        channel_id: ChannelId,
576        cx: &mut Context<Self>,
577    ) -> Task<Result<Option<Entity<Room>>>> {
578        if let Some(room) = self.room().cloned() {
579            if room.read(cx).channel_id() == Some(channel_id) {
580                return Task::ready(Ok(Some(room)));
581            } else {
582                room.update(cx, |room, cx| room.clear_state(cx));
583            }
584        }
585
586        if self.pending_room_creation.is_some() {
587            return Task::ready(Ok(None));
588        }
589
590        let client = self.client.clone();
591        let user_store = self.user_store.clone();
592        let join = self._join_debouncer.spawn(cx, move |cx| async move {
593            Room::join_channel(channel_id, client, user_store, cx).await
594        });
595
596        cx.spawn(async move |this, cx| {
597            let room = join.await?;
598            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
599                .await?;
600            this.update(cx, |this, cx| this.report_call_event("Channel Joined", cx))?;
601            Ok(room)
602        })
603    }
604
605    pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
606        cx.notify();
607        self.report_call_event("Call Ended", cx);
608
609        Audio::end_call(cx);
610
611        let channel_id = self.channel_id(cx);
612        if let Some((room, _)) = self.room.take() {
613            cx.emit(Event::RoomLeft { channel_id });
614            room.update(cx, |room, cx| room.leave(cx))
615        } else {
616            Task::ready(Ok(()))
617        }
618    }
619
620    pub fn share_project(
621        &mut self,
622        project: Entity<Project>,
623        cx: &mut Context<Self>,
624    ) -> Task<Result<u64>> {
625        if let Some((room, _)) = self.room.as_ref() {
626            self.report_call_event("Project Shared", cx);
627            room.update(cx, |room, cx| room.share_project(project, cx))
628        } else {
629            Task::ready(Err(anyhow!("no active call")))
630        }
631    }
632
633    pub fn unshare_project(
634        &mut self,
635        project: Entity<Project>,
636        cx: &mut Context<Self>,
637    ) -> Result<()> {
638        let (room, _) = self.room.as_ref().context("no active call")?;
639        self.report_call_event("Project Unshared", cx);
640        room.update(cx, |room, cx| room.unshare_project(project, cx))
641    }
642
643    pub fn location(&self) -> Option<&WeakEntity<Project>> {
644        self.location.as_ref()
645    }
646
647    pub fn set_location(
648        &mut self,
649        project: Option<&Entity<Project>>,
650        cx: &mut Context<Self>,
651    ) -> Task<Result<()>> {
652        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
653            self.location = project.map(|project| project.downgrade());
654            if let Some((room, _)) = self.room.as_ref() {
655                return room.update(cx, |room, cx| room.set_location(project, cx));
656            }
657        }
658        Task::ready(Ok(()))
659    }
660
661    fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
662        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
663            Task::ready(Ok(()))
664        } else {
665            cx.notify();
666            if let Some(room) = room {
667                if room.read(cx).status().is_offline() {
668                    self.room = None;
669                    Task::ready(Ok(()))
670                } else {
671                    let subscriptions = vec![
672                        cx.observe(&room, |this, room, cx| {
673                            if room.read(cx).status().is_offline() {
674                                this.set_room(None, cx).detach_and_log_err(cx);
675                            }
676
677                            cx.notify();
678                        }),
679                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
680                    ];
681                    self.room = Some((room.clone(), subscriptions));
682                    let location = self
683                        .location
684                        .as_ref()
685                        .and_then(|location| location.upgrade());
686                    let channel_id = room.read(cx).channel_id();
687                    cx.emit(Event::RoomJoined { channel_id });
688                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
689                }
690            } else {
691                self.room = None;
692                Task::ready(Ok(()))
693            }
694        }
695    }
696
697    pub fn room(&self) -> Option<&Entity<Room>> {
698        self.room.as_ref().map(|(room, _)| room)
699    }
700
701    pub fn client(&self) -> Arc<Client> {
702        self.client.clone()
703    }
704
705    pub fn pending_invites(&self) -> &HashSet<u64> {
706        &self.pending_invites
707    }
708
709    pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
710        if let Some(room) = self.room() {
711            let room = room.read(cx);
712            telemetry::event!(
713                operation,
714                room_id = room.id(),
715                channel_id = room.channel_id()
716            )
717        }
718    }
719}
720
721#[cfg(test)]
722mod test {
723    use gpui::TestAppContext;
724
725    use crate::OneAtATime;
726
727    #[gpui::test]
728    async fn test_one_at_a_time(cx: &mut TestAppContext) {
729        let mut one_at_a_time = OneAtATime { cancel: None };
730
731        assert_eq!(
732            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
733                .await
734                .unwrap(),
735            Some(1)
736        );
737
738        let (a, b) = cx.update(|cx| {
739            (
740                one_at_a_time.spawn(cx, |_| async {
741                    panic!("");
742                }),
743                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
744            )
745        });
746
747        assert_eq!(a.await.unwrap(), None::<u32>);
748        assert_eq!(b.await.unwrap(), Some(3));
749
750        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
751        drop(one_at_a_time);
752
753        assert_eq!(promise.await.unwrap(), None);
754    }
755}