mod.rs

  1pub mod diagnostics;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{Context as _, Result, anyhow};
  6use audio::Audio;
  7use client::{ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE, proto};
  8use collections::HashSet;
  9use futures::{Future, FutureExt, channel::oneshot, future::Shared};
 10use gpui::{
 11    AnyView, App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task,
 12    WeakEntity, Window,
 13};
 14use postage::watch;
 15use project::Project;
 16use room::Event;
 17use settings::Settings;
 18use std::sync::Arc;
 19use workspace::{
 20    ActiveCallEvent, AnyActiveCall, GlobalAnyActiveCall, Pane, RemoteCollaborator, SharedScreen,
 21    Workspace,
 22};
 23
 24pub use livekit_client::{RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent};
 25pub use room::Room;
 26
 27use crate::call_settings::CallSettings;
 28
 29pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
 30    let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
 31    cx.set_global(GlobalAnyActiveCall(Arc::new(ActiveCallEntity(active_call))))
 32}
 33
 34#[derive(Clone)]
 35struct ActiveCallEntity(Entity<ActiveCall>);
 36
 37impl AnyActiveCall for ActiveCallEntity {
 38    fn entity(&self) -> gpui::AnyEntity {
 39        self.0.clone().into_any()
 40    }
 41
 42    fn is_in_room(&self, cx: &App) -> bool {
 43        self.0.read(cx).room().is_some()
 44    }
 45
 46    fn room_id(&self, cx: &App) -> Option<u64> {
 47        Some(self.0.read(cx).room()?.read(cx).id())
 48    }
 49
 50    fn channel_id(&self, cx: &App) -> Option<ChannelId> {
 51        self.0.read(cx).room()?.read(cx).channel_id()
 52    }
 53
 54    fn hang_up(&self, cx: &mut App) -> Task<Result<()>> {
 55        self.0.update(cx, |this, cx| this.hang_up(cx))
 56    }
 57
 58    fn unshare_project(&self, project: Entity<Project>, cx: &mut App) -> Result<()> {
 59        self.0
 60            .update(cx, |this, cx| this.unshare_project(project, cx))
 61    }
 62
 63    fn remote_participant_for_peer_id(
 64        &self,
 65        peer_id: proto::PeerId,
 66        cx: &App,
 67    ) -> Option<workspace::RemoteCollaborator> {
 68        let room = self.0.read(cx).room()?.read(cx);
 69        let participant = room.remote_participant_for_peer_id(peer_id)?;
 70        Some(RemoteCollaborator {
 71            user: participant.user.clone(),
 72            peer_id: participant.peer_id,
 73            location: participant.location,
 74            participant_index: participant.participant_index,
 75        })
 76    }
 77
 78    fn is_sharing_project(&self, cx: &App) -> bool {
 79        self.0
 80            .read(cx)
 81            .room()
 82            .map_or(false, |room| room.read(cx).is_sharing_project())
 83    }
 84
 85    fn has_remote_participants(&self, cx: &App) -> bool {
 86        self.0.read(cx).room().map_or(false, |room| {
 87            !room.read(cx).remote_participants().is_empty()
 88        })
 89    }
 90
 91    fn local_participant_is_guest(&self, cx: &App) -> bool {
 92        self.0
 93            .read(cx)
 94            .room()
 95            .map_or(false, |room| room.read(cx).local_participant_is_guest())
 96    }
 97
 98    fn client(&self, cx: &App) -> Arc<Client> {
 99        self.0.read(cx).client()
100    }
101
102    fn share_on_join(&self, cx: &App) -> bool {
103        CallSettings::get_global(cx).share_on_join
104    }
105
106    fn join_channel(&self, channel_id: ChannelId, cx: &mut App) -> Task<Result<bool>> {
107        let task = self
108            .0
109            .update(cx, |this, cx| this.join_channel(channel_id, cx));
110        cx.spawn(async move |_cx| {
111            let result = task.await?;
112            Ok(result.is_some())
113        })
114    }
115
116    fn room_update_completed(&self, cx: &mut App) -> Task<()> {
117        let Some(room) = self.0.read(cx).room().cloned() else {
118            return Task::ready(());
119        };
120        let future = room.update(cx, |room, _cx| room.room_update_completed());
121        cx.spawn(async move |_cx| {
122            future.await;
123        })
124    }
125
126    fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
127        let room = self.0.read(cx).room()?;
128        room.read(cx).most_active_project(cx)
129    }
130
131    fn share_project(&self, project: Entity<Project>, cx: &mut App) -> Task<Result<u64>> {
132        self.0
133            .update(cx, |this, cx| this.share_project(project, cx))
134    }
135
136    fn join_project(
137        &self,
138        project_id: u64,
139        language_registry: Arc<language::LanguageRegistry>,
140        fs: Arc<dyn fs::Fs>,
141        cx: &mut App,
142    ) -> Task<Result<Entity<Project>>> {
143        let Some(room) = self.0.read(cx).room().cloned() else {
144            return Task::ready(Err(anyhow::anyhow!("not in a call")));
145        };
146        room.update(cx, |room, cx| {
147            room.join_project(project_id, language_registry, fs, cx)
148        })
149    }
150
151    fn peer_id_for_user_in_room(&self, user_id: u64, cx: &App) -> Option<proto::PeerId> {
152        let room = self.0.read(cx).room()?.read(cx);
153        room.remote_participants()
154            .values()
155            .find(|p| p.user.id == user_id)
156            .map(|p| p.peer_id)
157    }
158
159    fn subscribe(
160        &self,
161        window: &mut Window,
162        cx: &mut Context<Workspace>,
163        handler: Box<
164            dyn Fn(&mut Workspace, &ActiveCallEvent, &mut Window, &mut Context<Workspace>),
165        >,
166    ) -> Subscription {
167        cx.subscribe_in(
168            &self.0,
169            window,
170            move |workspace, _, event: &room::Event, window, cx| {
171                let mapped = match event {
172                    room::Event::ParticipantLocationChanged { participant_id } => {
173                        Some(ActiveCallEvent::ParticipantLocationChanged {
174                            participant_id: *participant_id,
175                        })
176                    }
177                    room::Event::RemoteVideoTracksChanged { participant_id } => {
178                        Some(ActiveCallEvent::RemoteVideoTracksChanged {
179                            participant_id: *participant_id,
180                        })
181                    }
182                    _ => None,
183                };
184                if let Some(event) = mapped {
185                    handler(workspace, &event, window, cx);
186                }
187            },
188        )
189    }
190
191    fn create_shared_screen(
192        &self,
193        peer_id: client::proto::PeerId,
194        pane: &Entity<Pane>,
195        window: &mut Window,
196        cx: &mut App,
197    ) -> Option<Entity<workspace::SharedScreen>> {
198        let room = self.0.read(cx).room()?.clone();
199        let participant = room.read(cx).remote_participant_for_peer_id(peer_id)?;
200        let track = participant.video_tracks.values().next()?.clone();
201        let user = participant.user.clone();
202
203        for item in pane.read(cx).items_of_type::<SharedScreen>() {
204            if item.read(cx).peer_id == peer_id {
205                return Some(item);
206            }
207        }
208
209        Some(cx.new(|cx: &mut Context<SharedScreen>| {
210            let my_sid = track.sid();
211            cx.subscribe(
212                &room,
213                move |_: &mut SharedScreen,
214                      _: Entity<Room>,
215                      ev: &room::Event,
216                      cx: &mut Context<SharedScreen>| {
217                    if let room::Event::RemoteVideoTrackUnsubscribed { sid } = ev
218                        && *sid == my_sid
219                    {
220                        cx.emit(workspace::shared_screen::Event::Close);
221                    }
222                },
223            )
224            .detach();
225
226            cx.observe_release(
227                &room,
228                |_: &mut SharedScreen, _: &mut Room, cx: &mut Context<SharedScreen>| {
229                    cx.emit(workspace::shared_screen::Event::Close);
230                },
231            )
232            .detach();
233
234            let view = cx.new(|cx| RemoteVideoTrackView::new(track.clone(), window, cx));
235            cx.subscribe(
236                &view,
237                |_: &mut SharedScreen,
238                 _: Entity<RemoteVideoTrackView>,
239                 ev: &RemoteVideoTrackViewEvent,
240                 cx: &mut Context<SharedScreen>| match ev {
241                    RemoteVideoTrackViewEvent::Close => {
242                        cx.emit(workspace::shared_screen::Event::Close);
243                    }
244                },
245            )
246            .detach();
247
248            pub(super) fn clone_remote_video_track_view(
249                view: &AnyView,
250                window: &mut Window,
251                cx: &mut App,
252            ) -> AnyView {
253                let view = view
254                    .clone()
255                    .downcast::<RemoteVideoTrackView>()
256                    .expect("SharedScreen view must be a RemoteVideoTrackView");
257                let cloned = view.update(cx, |view, cx| view.clone(window, cx));
258                AnyView::from(cloned)
259            }
260
261            SharedScreen::new(
262                peer_id,
263                user,
264                AnyView::from(view),
265                clone_remote_video_track_view,
266                cx,
267            )
268        }))
269    }
270}
271
272pub struct OneAtATime {
273    cancel: Option<oneshot::Sender<()>>,
274}
275
276impl OneAtATime {
277    /// spawn a task in the given context.
278    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
279    /// otherwise you'll see the result of the task.
280    fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
281    where
282        F: 'static + FnOnce(AsyncApp) -> Fut,
283        Fut: Future<Output = Result<R>>,
284        R: 'static,
285    {
286        let (tx, rx) = oneshot::channel();
287        self.cancel.replace(tx);
288        cx.spawn(async move |cx| {
289            futures::select_biased! {
290                _ = rx.fuse() => Ok(None),
291                result = f(cx.clone()).fuse() => result.map(Some),
292            }
293        })
294    }
295
296    fn running(&self) -> bool {
297        self.cancel
298            .as_ref()
299            .is_some_and(|cancel| !cancel.is_canceled())
300    }
301}
302
303#[derive(Clone)]
304pub struct IncomingCall {
305    pub room_id: u64,
306    pub calling_user: Arc<User>,
307    pub participants: Vec<Arc<User>>,
308    pub initial_project: Option<proto::ParticipantProject>,
309}
310
311/// Singleton global maintaining the user's participation in a room across workspaces.
312pub struct ActiveCall {
313    room: Option<(Entity<Room>, Vec<Subscription>)>,
314    pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
315    location: Option<WeakEntity<Project>>,
316    _join_debouncer: OneAtATime,
317    pending_invites: HashSet<u64>,
318    incoming_call: (
319        watch::Sender<Option<IncomingCall>>,
320        watch::Receiver<Option<IncomingCall>>,
321    ),
322    client: Arc<Client>,
323    user_store: Entity<UserStore>,
324    _subscriptions: Vec<client::Subscription>,
325}
326
327impl EventEmitter<Event> for ActiveCall {}
328
329impl ActiveCall {
330    fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
331        Self {
332            room: None,
333            pending_room_creation: None,
334            location: None,
335            pending_invites: Default::default(),
336            incoming_call: watch::channel(),
337            _join_debouncer: OneAtATime { cancel: None },
338            _subscriptions: vec![
339                client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
340                client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
341            ],
342            client,
343            user_store,
344        }
345    }
346
347    pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
348        self.room()?.read(cx).channel_id()
349    }
350
351    async fn handle_incoming_call(
352        this: Entity<Self>,
353        envelope: TypedEnvelope<proto::IncomingCall>,
354        mut cx: AsyncApp,
355    ) -> Result<proto::Ack> {
356        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
357        let call = IncomingCall {
358            room_id: envelope.payload.room_id,
359            participants: user_store
360                .update(&mut cx, |user_store, cx| {
361                    user_store.get_users(envelope.payload.participant_user_ids, cx)
362                })
363                .await?,
364            calling_user: user_store
365                .update(&mut cx, |user_store, cx| {
366                    user_store.get_user(envelope.payload.calling_user_id, cx)
367                })
368                .await?,
369            initial_project: envelope.payload.initial_project,
370        };
371        this.update(&mut cx, |this, _| {
372            *this.incoming_call.0.borrow_mut() = Some(call);
373        });
374
375        Ok(proto::Ack {})
376    }
377
378    async fn handle_call_canceled(
379        this: Entity<Self>,
380        envelope: TypedEnvelope<proto::CallCanceled>,
381        mut cx: AsyncApp,
382    ) -> Result<()> {
383        this.update(&mut cx, |this, _| {
384            let mut incoming_call = this.incoming_call.0.borrow_mut();
385            if incoming_call
386                .as_ref()
387                .is_some_and(|call| call.room_id == envelope.payload.room_id)
388            {
389                incoming_call.take();
390            }
391        });
392        Ok(())
393    }
394
395    pub fn global(cx: &App) -> Entity<Self> {
396        Self::try_global(cx).unwrap()
397    }
398
399    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
400        let any = cx.try_global::<GlobalAnyActiveCall>()?;
401        any.0.entity().downcast::<Self>().ok()
402    }
403
404    pub fn invite(
405        &mut self,
406        called_user_id: u64,
407        initial_project: Option<Entity<Project>>,
408        cx: &mut Context<Self>,
409    ) -> Task<Result<()>> {
410        if !self.pending_invites.insert(called_user_id) {
411            return Task::ready(Err(anyhow!("user was already invited")));
412        }
413        cx.notify();
414
415        if self._join_debouncer.running() {
416            return Task::ready(Ok(()));
417        }
418
419        let room = if let Some(room) = self.room().cloned() {
420            Some(Task::ready(Ok(room)).shared())
421        } else {
422            self.pending_room_creation.clone()
423        };
424
425        let invite = if let Some(room) = room {
426            cx.spawn(async move |_, cx| {
427                let room = room.await.map_err(|err| anyhow!("{err:?}"))?;
428
429                let initial_project_id = if let Some(initial_project) = initial_project {
430                    Some(
431                        room.update(cx, |room, cx| room.share_project(initial_project, cx))
432                            .await?,
433                    )
434                } else {
435                    None
436                };
437
438                room.update(cx, move |room, cx| {
439                    room.call(called_user_id, initial_project_id, cx)
440                })
441                .await?;
442
443                anyhow::Ok(())
444            })
445        } else {
446            let client = self.client.clone();
447            let user_store = self.user_store.clone();
448            let room = cx
449                .spawn(async move |this, cx| {
450                    let create_room = async {
451                        let room = cx
452                            .update(|cx| {
453                                Room::create(
454                                    called_user_id,
455                                    initial_project,
456                                    client,
457                                    user_store,
458                                    cx,
459                                )
460                            })
461                            .await?;
462
463                        this.update(cx, |this, cx| this.set_room(Some(room.clone()), cx))?
464                            .await?;
465
466                        anyhow::Ok(room)
467                    };
468
469                    let room = create_room.await;
470                    this.update(cx, |this, _| this.pending_room_creation = None)?;
471                    room.map_err(Arc::new)
472                })
473                .shared();
474            self.pending_room_creation = Some(room.clone());
475            cx.background_spawn(async move {
476                room.await.map_err(|err| anyhow!("{err:?}"))?;
477                anyhow::Ok(())
478            })
479        };
480
481        cx.spawn(async move |this, cx| {
482            let result = invite.await;
483            if result.is_ok() {
484                this.update(cx, |this, cx| {
485                    this.report_call_event("Participant Invited", cx)
486                })?;
487            } else {
488                //TODO: report collaboration error
489                log::error!("invite failed: {:?}", result);
490            }
491
492            this.update(cx, |this, cx| {
493                this.pending_invites.remove(&called_user_id);
494                cx.notify();
495            })?;
496            result
497        })
498    }
499
500    pub fn cancel_invite(
501        &mut self,
502        called_user_id: u64,
503        cx: &mut Context<Self>,
504    ) -> Task<Result<()>> {
505        let room_id = if let Some(room) = self.room() {
506            room.read(cx).id()
507        } else {
508            return Task::ready(Err(anyhow!("no active call")));
509        };
510
511        let client = self.client.clone();
512        cx.background_spawn(async move {
513            client
514                .request(proto::CancelCall {
515                    room_id,
516                    called_user_id,
517                })
518                .await?;
519            anyhow::Ok(())
520        })
521    }
522
523    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
524        self.incoming_call.1.clone()
525    }
526
527    pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
528        if self.room.is_some() {
529            return Task::ready(Err(anyhow!("cannot join while on another call")));
530        }
531
532        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
533            call
534        } else {
535            return Task::ready(Err(anyhow!("no incoming call")));
536        };
537
538        if self.pending_room_creation.is_some() {
539            return Task::ready(Ok(()));
540        }
541
542        let room_id = call.room_id;
543        let client = self.client.clone();
544        let user_store = self.user_store.clone();
545        let join = self
546            ._join_debouncer
547            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
548
549        cx.spawn(async move |this, cx| {
550            let room = join.await?;
551            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
552                .await?;
553            this.update(cx, |this, cx| {
554                this.report_call_event("Incoming Call Accepted", cx)
555            })?;
556            Ok(())
557        })
558    }
559
560    pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
561        let call = self
562            .incoming_call
563            .0
564            .borrow_mut()
565            .take()
566            .context("no incoming call")?;
567        telemetry::event!("Incoming Call Declined", room_id = call.room_id);
568        self.client.send(proto::DeclineCall {
569            room_id: call.room_id,
570        })?;
571        Ok(())
572    }
573
574    pub fn join_channel(
575        &mut self,
576        channel_id: ChannelId,
577        cx: &mut Context<Self>,
578    ) -> Task<Result<Option<Entity<Room>>>> {
579        if let Some(room) = self.room().cloned() {
580            if room.read(cx).channel_id() == Some(channel_id) {
581                return Task::ready(Ok(Some(room)));
582            } else {
583                room.update(cx, |room, cx| room.clear_state(cx));
584            }
585        }
586
587        if self.pending_room_creation.is_some() {
588            return Task::ready(Ok(None));
589        }
590
591        let client = self.client.clone();
592        let user_store = self.user_store.clone();
593        let join = self._join_debouncer.spawn(cx, move |cx| async move {
594            Room::join_channel(channel_id, client, user_store, cx).await
595        });
596
597        cx.spawn(async move |this, cx| {
598            let room = join.await?;
599            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
600                .await?;
601            this.update(cx, |this, cx| this.report_call_event("Channel Joined", cx))?;
602            Ok(room)
603        })
604    }
605
606    pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
607        cx.notify();
608        self.report_call_event("Call Ended", cx);
609
610        Audio::end_call(cx);
611
612        let channel_id = self.channel_id(cx);
613        if let Some((room, _)) = self.room.take() {
614            cx.emit(Event::RoomLeft { channel_id });
615            room.update(cx, |room, cx| room.leave(cx))
616        } else {
617            Task::ready(Ok(()))
618        }
619    }
620
621    pub fn share_project(
622        &mut self,
623        project: Entity<Project>,
624        cx: &mut Context<Self>,
625    ) -> Task<Result<u64>> {
626        if let Some((room, _)) = self.room.as_ref() {
627            self.report_call_event("Project Shared", cx);
628            room.update(cx, |room, cx| room.share_project(project, cx))
629        } else {
630            Task::ready(Err(anyhow!("no active call")))
631        }
632    }
633
634    pub fn unshare_project(
635        &mut self,
636        project: Entity<Project>,
637        cx: &mut Context<Self>,
638    ) -> Result<()> {
639        let (room, _) = self.room.as_ref().context("no active call")?;
640        self.report_call_event("Project Unshared", cx);
641        room.update(cx, |room, cx| room.unshare_project(project, cx))
642    }
643
644    pub fn location(&self) -> Option<&WeakEntity<Project>> {
645        self.location.as_ref()
646    }
647
648    pub fn set_location(
649        &mut self,
650        project: Option<&Entity<Project>>,
651        cx: &mut Context<Self>,
652    ) -> Task<Result<()>> {
653        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
654            self.location = project.map(|project| project.downgrade());
655            if let Some((room, _)) = self.room.as_ref() {
656                return room.update(cx, |room, cx| room.set_location(project, cx));
657            }
658        }
659        Task::ready(Ok(()))
660    }
661
662    fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
663        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
664            Task::ready(Ok(()))
665        } else {
666            cx.notify();
667            if let Some(room) = room {
668                if room.read(cx).status().is_offline() {
669                    self.room = None;
670                    Task::ready(Ok(()))
671                } else {
672                    let subscriptions = vec![
673                        cx.observe(&room, |this, room, cx| {
674                            if room.read(cx).status().is_offline() {
675                                this.set_room(None, cx).detach_and_log_err(cx);
676                            }
677
678                            cx.notify();
679                        }),
680                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
681                    ];
682                    self.room = Some((room.clone(), subscriptions));
683                    let location = self
684                        .location
685                        .as_ref()
686                        .and_then(|location| location.upgrade());
687                    let channel_id = room.read(cx).channel_id();
688                    cx.emit(Event::RoomJoined { channel_id });
689                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
690                }
691            } else {
692                self.room = None;
693                Task::ready(Ok(()))
694            }
695        }
696    }
697
698    pub fn room(&self) -> Option<&Entity<Room>> {
699        self.room.as_ref().map(|(room, _)| room)
700    }
701
702    pub fn client(&self) -> Arc<Client> {
703        self.client.clone()
704    }
705
706    pub fn pending_invites(&self) -> &HashSet<u64> {
707        &self.pending_invites
708    }
709
710    pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
711        if let Some(room) = self.room() {
712            let room = room.read(cx);
713            telemetry::event!(
714                operation,
715                room_id = room.id(),
716                channel_id = room.channel_id()
717            )
718        }
719    }
720}
721
722#[cfg(test)]
723mod test {
724    use gpui::TestAppContext;
725
726    use crate::OneAtATime;
727
728    #[gpui::test]
729    async fn test_one_at_a_time(cx: &mut TestAppContext) {
730        let mut one_at_a_time = OneAtATime { cancel: None };
731
732        assert_eq!(
733            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
734                .await
735                .unwrap(),
736            Some(1)
737        );
738
739        let (a, b) = cx.update(|cx| {
740            (
741                one_at_a_time.spawn(cx, |_| async {
742                    panic!("");
743                }),
744                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
745            )
746        });
747
748        assert_eq!(a.await.unwrap(), None::<u32>);
749        assert_eq!(b.await.unwrap(), Some(3));
750
751        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
752        drop(one_at_a_time);
753
754        assert_eq!(promise.await.unwrap(), None);
755    }
756}