1pub mod call_settings;
2pub mod participant;
3pub mod room;
4mod shared_screen;
5
6use anyhow::{anyhow, Result};
7use async_trait::async_trait;
8use audio::Audio;
9use call_settings::CallSettings;
10use client::{
11 proto::{self, PeerId},
12 Client, TelemetrySettings, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE,
13};
14use collections::HashSet;
15use futures::{channel::oneshot, future::Shared, Future, FutureExt};
16use gpui::{
17 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, PromptLevel,
18 Subscription, Task, View, ViewContext, VisualContext, WeakModel, WeakView, WindowHandle,
19};
20pub use participant::ParticipantLocation;
21use postage::watch;
22use project::Project;
23use room::Event;
24pub use room::Room;
25use settings::Settings;
26use shared_screen::SharedScreen;
27use std::sync::Arc;
28use util::ResultExt;
29use workspace::{item::ItemHandle, CallHandler, Pane, Workspace};
30
31pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
32 CallSettings::register(cx);
33
34 let active_call = cx.build_model(|cx| ActiveCall::new(client, user_store, cx));
35 cx.set_global(active_call);
36}
37
38pub struct OneAtATime {
39 cancel: Option<oneshot::Sender<()>>,
40}
41
42impl OneAtATime {
43 /// spawn a task in the given context.
44 /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
45 /// otherwise you'll see the result of the task.
46 fn spawn<F, Fut, R>(&mut self, cx: &mut AppContext, f: F) -> Task<Result<Option<R>>>
47 where
48 F: 'static + FnOnce(AsyncAppContext) -> Fut,
49 Fut: Future<Output = Result<R>>,
50 R: 'static,
51 {
52 let (tx, rx) = oneshot::channel();
53 self.cancel.replace(tx);
54 cx.spawn(|cx| async move {
55 futures::select_biased! {
56 _ = rx.fuse() => Ok(None),
57 result = f(cx).fuse() => result.map(Some),
58 }
59 })
60 }
61
62 fn running(&self) -> bool {
63 self.cancel
64 .as_ref()
65 .is_some_and(|cancel| !cancel.is_canceled())
66 }
67}
68
69#[derive(Clone)]
70pub struct IncomingCall {
71 pub room_id: u64,
72 pub calling_user: Arc<User>,
73 pub participants: Vec<Arc<User>>,
74 pub initial_project: Option<proto::ParticipantProject>,
75}
76
77/// Singleton global maintaining the user's participation in a room across workspaces.
78pub struct ActiveCall {
79 room: Option<(Model<Room>, Vec<Subscription>)>,
80 pending_room_creation: Option<Shared<Task<Result<Model<Room>, Arc<anyhow::Error>>>>>,
81 location: Option<WeakModel<Project>>,
82 _join_debouncer: OneAtATime,
83 pending_invites: HashSet<u64>,
84 incoming_call: (
85 watch::Sender<Option<IncomingCall>>,
86 watch::Receiver<Option<IncomingCall>>,
87 ),
88 client: Arc<Client>,
89 user_store: Model<UserStore>,
90 _subscriptions: Vec<client::Subscription>,
91}
92
93impl EventEmitter<Event> for ActiveCall {}
94
95impl ActiveCall {
96 fn new(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut ModelContext<Self>) -> Self {
97 Self {
98 room: None,
99 pending_room_creation: None,
100 location: None,
101 pending_invites: Default::default(),
102 incoming_call: watch::channel(),
103 _join_debouncer: OneAtATime { cancel: None },
104 _subscriptions: vec![
105 client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
106 client.add_message_handler(cx.weak_model(), Self::handle_call_canceled),
107 ],
108 client,
109 user_store,
110 }
111 }
112
113 pub fn channel_id(&self, cx: &AppContext) -> Option<u64> {
114 self.room()?.read(cx).channel_id()
115 }
116
117 async fn handle_incoming_call(
118 this: Model<Self>,
119 envelope: TypedEnvelope<proto::IncomingCall>,
120 _: Arc<Client>,
121 mut cx: AsyncAppContext,
122 ) -> Result<proto::Ack> {
123 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
124 let call = IncomingCall {
125 room_id: envelope.payload.room_id,
126 participants: user_store
127 .update(&mut cx, |user_store, cx| {
128 user_store.get_users(envelope.payload.participant_user_ids, cx)
129 })?
130 .await?,
131 calling_user: user_store
132 .update(&mut cx, |user_store, cx| {
133 user_store.get_user(envelope.payload.calling_user_id, cx)
134 })?
135 .await?,
136 initial_project: envelope.payload.initial_project,
137 };
138 this.update(&mut cx, |this, _| {
139 *this.incoming_call.0.borrow_mut() = Some(call);
140 })?;
141
142 Ok(proto::Ack {})
143 }
144
145 async fn handle_call_canceled(
146 this: Model<Self>,
147 envelope: TypedEnvelope<proto::CallCanceled>,
148 _: Arc<Client>,
149 mut cx: AsyncAppContext,
150 ) -> Result<()> {
151 this.update(&mut cx, |this, _| {
152 let mut incoming_call = this.incoming_call.0.borrow_mut();
153 if incoming_call
154 .as_ref()
155 .map_or(false, |call| call.room_id == envelope.payload.room_id)
156 {
157 incoming_call.take();
158 }
159 })?;
160 Ok(())
161 }
162
163 pub fn global(cx: &AppContext) -> Model<Self> {
164 cx.global::<Model<Self>>().clone()
165 }
166
167 pub fn invite(
168 &mut self,
169 called_user_id: u64,
170 initial_project: Option<Model<Project>>,
171 cx: &mut ModelContext<Self>,
172 ) -> Task<Result<()>> {
173 if !self.pending_invites.insert(called_user_id) {
174 return Task::ready(Err(anyhow!("user was already invited")));
175 }
176 cx.notify();
177
178 if self._join_debouncer.running() {
179 return Task::ready(Ok(()));
180 }
181
182 let room = if let Some(room) = self.room().cloned() {
183 Some(Task::ready(Ok(room)).shared())
184 } else {
185 self.pending_room_creation.clone()
186 };
187
188 let invite = if let Some(room) = room {
189 cx.spawn(move |_, mut cx| async move {
190 let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
191
192 let initial_project_id = if let Some(initial_project) = initial_project {
193 Some(
194 room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))?
195 .await?,
196 )
197 } else {
198 None
199 };
200
201 room.update(&mut cx, move |room, cx| {
202 room.call(called_user_id, initial_project_id, cx)
203 })?
204 .await?;
205
206 anyhow::Ok(())
207 })
208 } else {
209 let client = self.client.clone();
210 let user_store = self.user_store.clone();
211 let room = cx
212 .spawn(move |this, mut cx| async move {
213 let create_room = async {
214 let room = cx
215 .update(|cx| {
216 Room::create(
217 called_user_id,
218 initial_project,
219 client,
220 user_store,
221 cx,
222 )
223 })?
224 .await?;
225
226 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))?
227 .await?;
228
229 anyhow::Ok(room)
230 };
231
232 let room = create_room.await;
233 this.update(&mut cx, |this, _| this.pending_room_creation = None)?;
234 room.map_err(Arc::new)
235 })
236 .shared();
237 self.pending_room_creation = Some(room.clone());
238 cx.background_executor().spawn(async move {
239 room.await.map_err(|err| anyhow!("{:?}", err))?;
240 anyhow::Ok(())
241 })
242 };
243
244 cx.spawn(move |this, mut cx| async move {
245 let result = invite.await;
246 if result.is_ok() {
247 this.update(&mut cx, |this, cx| this.report_call_event("invite", cx))?;
248 } else {
249 // TODO: Resport collaboration error
250 }
251
252 this.update(&mut cx, |this, cx| {
253 this.pending_invites.remove(&called_user_id);
254 cx.notify();
255 })?;
256 result
257 })
258 }
259
260 pub fn cancel_invite(
261 &mut self,
262 called_user_id: u64,
263 cx: &mut ModelContext<Self>,
264 ) -> Task<Result<()>> {
265 let room_id = if let Some(room) = self.room() {
266 room.read(cx).id()
267 } else {
268 return Task::ready(Err(anyhow!("no active call")));
269 };
270
271 let client = self.client.clone();
272 cx.background_executor().spawn(async move {
273 client
274 .request(proto::CancelCall {
275 room_id,
276 called_user_id,
277 })
278 .await?;
279 anyhow::Ok(())
280 })
281 }
282
283 pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
284 self.incoming_call.1.clone()
285 }
286
287 pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
288 if self.room.is_some() {
289 return Task::ready(Err(anyhow!("cannot join while on another call")));
290 }
291
292 let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
293 call
294 } else {
295 return Task::ready(Err(anyhow!("no incoming call")));
296 };
297
298 if self.pending_room_creation.is_some() {
299 return Task::ready(Ok(()));
300 }
301
302 let room_id = call.room_id.clone();
303 let client = self.client.clone();
304 let user_store = self.user_store.clone();
305 let join = self
306 ._join_debouncer
307 .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
308
309 cx.spawn(|this, mut cx| async move {
310 let room = join.await?;
311 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
312 .await?;
313 this.update(&mut cx, |this, cx| {
314 this.report_call_event("accept incoming", cx)
315 })?;
316 Ok(())
317 })
318 }
319
320 pub fn decline_incoming(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
321 let call = self
322 .incoming_call
323 .0
324 .borrow_mut()
325 .take()
326 .ok_or_else(|| anyhow!("no incoming call"))?;
327 report_call_event_for_room("decline incoming", call.room_id, None, &self.client, cx);
328 self.client.send(proto::DeclineCall {
329 room_id: call.room_id,
330 })?;
331 Ok(())
332 }
333
334 pub fn join_channel(
335 &mut self,
336 channel_id: u64,
337 requesting_window: Option<WindowHandle<Workspace>>,
338 cx: &mut ModelContext<Self>,
339 ) -> Task<Result<Option<Model<Room>>>> {
340 if let Some(room) = self.room().cloned() {
341 if room.read(cx).channel_id() == Some(channel_id) {
342 return cx.spawn(|_, _| async move {
343 todo!();
344 // let future = room.update(&mut cx, |room, cx| {
345 // room.most_active_project(cx).map(|(host, project)| {
346 // room.join_project(project, host, app_state.clone(), cx)
347 // })
348 // })
349
350 // if let Some(future) = future {
351 // future.await?;
352 // }
353
354 // Ok(Some(room))
355 });
356 }
357
358 let should_prompt = room.update(cx, |room, _| {
359 room.channel_id().is_some()
360 && room.is_sharing_project()
361 && room.remote_participants().len() > 0
362 });
363 if should_prompt && requesting_window.is_some() {
364 return cx.spawn(|this, mut cx| async move {
365 let answer = requesting_window.unwrap().update(&mut cx, |_, cx| {
366 cx.prompt(
367 PromptLevel::Warning,
368 "Leaving this call will unshare your current project.\nDo you want to switch channels?",
369 &["Yes, Join Channel", "Cancel"],
370 )
371 })?;
372 if answer.await? == 1 {
373 return Ok(None);
374 }
375
376 room.update(&mut cx, |room, cx| room.clear_state(cx))?;
377
378 this.update(&mut cx, |this, cx| {
379 this.join_channel(channel_id, requesting_window, cx)
380 })?
381 .await
382 });
383 }
384
385 if room.read(cx).channel_id().is_some() {
386 room.update(cx, |room, cx| room.clear_state(cx));
387 }
388 }
389
390 if self.pending_room_creation.is_some() {
391 return Task::ready(Ok(None));
392 }
393
394 let client = self.client.clone();
395 let user_store = self.user_store.clone();
396 let join = self._join_debouncer.spawn(cx, move |cx| async move {
397 Room::join_channel(channel_id, client, user_store, cx).await
398 });
399
400 cx.spawn(|this, mut cx| async move {
401 let room = join.await?;
402 this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
403 .await?;
404 this.update(&mut cx, |this, cx| {
405 this.report_call_event("join channel", cx)
406 })?;
407 Ok(room)
408 })
409 }
410
411 pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
412 cx.notify();
413 self.report_call_event("hang up", cx);
414
415 Audio::end_call(cx);
416 if let Some((room, _)) = self.room.take() {
417 room.update(cx, |room, cx| room.leave(cx))
418 } else {
419 Task::ready(Ok(()))
420 }
421 }
422
423 pub fn share_project(
424 &mut self,
425 project: Model<Project>,
426 cx: &mut ModelContext<Self>,
427 ) -> Task<Result<u64>> {
428 if let Some((room, _)) = self.room.as_ref() {
429 self.report_call_event("share project", cx);
430 room.update(cx, |room, cx| room.share_project(project, cx))
431 } else {
432 Task::ready(Err(anyhow!("no active call")))
433 }
434 }
435
436 pub fn unshare_project(
437 &mut self,
438 project: Model<Project>,
439 cx: &mut ModelContext<Self>,
440 ) -> Result<()> {
441 if let Some((room, _)) = self.room.as_ref() {
442 self.report_call_event("unshare project", cx);
443 room.update(cx, |room, cx| room.unshare_project(project, cx))
444 } else {
445 Err(anyhow!("no active call"))
446 }
447 }
448
449 pub fn location(&self) -> Option<&WeakModel<Project>> {
450 self.location.as_ref()
451 }
452
453 pub fn set_location(
454 &mut self,
455 project: Option<&Model<Project>>,
456 cx: &mut ModelContext<Self>,
457 ) -> Task<Result<()>> {
458 if project.is_some() || !*ZED_ALWAYS_ACTIVE {
459 self.location = project.map(|project| project.downgrade());
460 if let Some((room, _)) = self.room.as_ref() {
461 return room.update(cx, |room, cx| room.set_location(project, cx));
462 }
463 }
464 Task::ready(Ok(()))
465 }
466
467 fn set_room(
468 &mut self,
469 room: Option<Model<Room>>,
470 cx: &mut ModelContext<Self>,
471 ) -> Task<Result<()>> {
472 if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
473 cx.notify();
474 if let Some(room) = room {
475 if room.read(cx).status().is_offline() {
476 self.room = None;
477 Task::ready(Ok(()))
478 } else {
479 let subscriptions = vec![
480 cx.observe(&room, |this, room, cx| {
481 if room.read(cx).status().is_offline() {
482 this.set_room(None, cx).detach_and_log_err(cx);
483 }
484
485 cx.notify();
486 }),
487 cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
488 ];
489 self.room = Some((room.clone(), subscriptions));
490 let location = self
491 .location
492 .as_ref()
493 .and_then(|location| location.upgrade());
494 room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
495 }
496 } else {
497 self.room = None;
498 Task::ready(Ok(()))
499 }
500 } else {
501 Task::ready(Ok(()))
502 }
503 }
504
505 pub fn room(&self) -> Option<&Model<Room>> {
506 self.room.as_ref().map(|(room, _)| room)
507 }
508
509 pub fn client(&self) -> Arc<Client> {
510 self.client.clone()
511 }
512
513 pub fn pending_invites(&self) -> &HashSet<u64> {
514 &self.pending_invites
515 }
516
517 pub fn report_call_event(&self, operation: &'static str, cx: &mut AppContext) {
518 if let Some(room) = self.room() {
519 let room = room.read(cx);
520 report_call_event_for_room(operation, room.id(), room.channel_id(), &self.client, cx);
521 }
522 }
523}
524
525pub fn report_call_event_for_room(
526 operation: &'static str,
527 room_id: u64,
528 channel_id: Option<u64>,
529 client: &Arc<Client>,
530 cx: &mut AppContext,
531) {
532 let telemetry = client.telemetry();
533 let telemetry_settings = *TelemetrySettings::get_global(cx);
534
535 telemetry.report_call_event(telemetry_settings, operation, Some(room_id), channel_id)
536}
537
538pub fn report_call_event_for_channel(
539 operation: &'static str,
540 channel_id: u64,
541 client: &Arc<Client>,
542 cx: &AppContext,
543) {
544 let room = ActiveCall::global(cx).read(cx).room();
545
546 let telemetry = client.telemetry();
547
548 let telemetry_settings = *TelemetrySettings::get_global(cx);
549
550 telemetry.report_call_event(
551 telemetry_settings,
552 operation,
553 room.map(|r| r.read(cx).id()),
554 Some(channel_id),
555 )
556}
557
558pub struct Call {
559 active_call: Option<(Model<ActiveCall>, Vec<Subscription>)>,
560 parent_workspace: WeakView<Workspace>,
561}
562
563impl Call {
564 pub fn new(
565 parent_workspace: WeakView<Workspace>,
566 cx: &mut ViewContext<'_, Workspace>,
567 ) -> Box<dyn CallHandler> {
568 let mut active_call = None;
569 if cx.has_global::<Model<ActiveCall>>() {
570 let call = cx.global::<Model<ActiveCall>>().clone();
571 let subscriptions = vec![cx.subscribe(&call, Self::on_active_call_event)];
572 active_call = Some((call, subscriptions));
573 }
574 Box::new(Self {
575 active_call,
576 parent_workspace,
577 })
578 }
579 fn on_active_call_event(
580 workspace: &mut Workspace,
581 _: Model<ActiveCall>,
582 event: &room::Event,
583 cx: &mut ViewContext<Workspace>,
584 ) {
585 match event {
586 room::Event::ParticipantLocationChanged { participant_id }
587 | room::Event::RemoteVideoTracksChanged { participant_id } => {
588 workspace.leader_updated(*participant_id, cx);
589 }
590 _ => {}
591 }
592 }
593}
594
595#[async_trait(?Send)]
596impl CallHandler for Call {
597 fn peer_state(
598 &mut self,
599 leader_id: PeerId,
600 cx: &mut ViewContext<Workspace>,
601 ) -> Option<(bool, bool)> {
602 let (call, _) = self.active_call.as_ref()?;
603 let room = call.read(cx).room()?.read(cx);
604 let participant = room.remote_participant_for_peer_id(leader_id)?;
605
606 let leader_in_this_app;
607 let leader_in_this_project;
608 match participant.location {
609 ParticipantLocation::SharedProject { project_id } => {
610 leader_in_this_app = true;
611 leader_in_this_project = Some(project_id)
612 == self
613 .parent_workspace
614 .update(cx, |this, cx| this.project().read(cx).remote_id())
615 .log_err()
616 .flatten();
617 }
618 ParticipantLocation::UnsharedProject => {
619 leader_in_this_app = true;
620 leader_in_this_project = false;
621 }
622 ParticipantLocation::External => {
623 leader_in_this_app = false;
624 leader_in_this_project = false;
625 }
626 };
627
628 Some((leader_in_this_project, leader_in_this_app))
629 }
630
631 fn shared_screen_for_peer(
632 &self,
633 peer_id: PeerId,
634 pane: &View<Pane>,
635 cx: &mut ViewContext<Workspace>,
636 ) -> Option<Box<dyn ItemHandle>> {
637 let (call, _) = self.active_call.as_ref()?;
638 let room = call.read(cx).room()?.read(cx);
639 let participant = room.remote_participant_for_peer_id(peer_id)?;
640 let track = participant.video_tracks.values().next()?.clone();
641 let user = participant.user.clone();
642 for item in pane.read(cx).items_of_type::<SharedScreen>() {
643 if item.read(cx).peer_id == peer_id {
644 return Some(Box::new(item));
645 }
646 }
647
648 Some(Box::new(cx.build_view(|cx| {
649 SharedScreen::new(&track, peer_id, user.clone(), cx)
650 })))
651 }
652 fn room_id(&self, cx: &AppContext) -> Option<u64> {
653 Some(self.active_call.as_ref()?.0.read(cx).room()?.read(cx).id())
654 }
655 fn hang_up(&self, cx: &mut AppContext) -> Task<Result<()>> {
656 let Some((call, _)) = self.active_call.as_ref() else {
657 return Task::ready(Err(anyhow!("Cannot exit a call; not in a call")));
658 };
659
660 call.update(cx, |this, cx| this.hang_up(cx))
661 }
662 fn active_project(&self, cx: &AppContext) -> Option<WeakModel<Project>> {
663 ActiveCall::global(cx).read(cx).location().cloned()
664 }
665 fn invite(
666 &mut self,
667 called_user_id: u64,
668 initial_project: Option<Model<Project>>,
669 cx: &mut AppContext,
670 ) -> Task<Result<()>> {
671 ActiveCall::global(cx).update(cx, |this, cx| {
672 this.invite(called_user_id, initial_project, cx)
673 })
674 }
675 fn remote_participants(&self, cx: &AppContext) -> Option<Vec<(Arc<User>, PeerId)>> {
676 self.active_call
677 .as_ref()
678 .map(|call| {
679 call.0.read(cx).room().map(|room| {
680 room.read(cx)
681 .remote_participants()
682 .iter()
683 .map(|participant| {
684 (participant.1.user.clone(), participant.1.peer_id.clone())
685 })
686 .collect()
687 })
688 })
689 .flatten()
690 }
691 fn is_muted(&self, cx: &AppContext) -> Option<bool> {
692 self.active_call
693 .as_ref()
694 .map(|call| {
695 call.0
696 .read(cx)
697 .room()
698 .map(|room| room.read(cx).is_muted(cx))
699 })
700 .flatten()
701 }
702 fn toggle_mute(&self, cx: &mut AppContext) {
703 self.active_call.as_ref().map(|call| {
704 call.0.update(cx, |this, cx| {
705 this.room().map(|room| {
706 let room = room.clone();
707 cx.spawn(|_, mut cx| async move {
708 room.update(&mut cx, |this, cx| this.toggle_mute(cx))??
709 .await
710 })
711 .detach_and_log_err(cx);
712 })
713 })
714 });
715 }
716 fn toggle_screen_share(&self, cx: &mut AppContext) {
717 self.active_call.as_ref().map(|call| {
718 call.0.update(cx, |this, cx| {
719 this.room().map(|room| {
720 room.update(cx, |this, cx| {
721 if this.is_screen_sharing() {
722 this.unshare_screen(cx).log_err();
723 } else {
724 let t = this.share_screen(cx);
725 cx.spawn(move |_, _| async move {
726 t.await.log_err();
727 })
728 .detach();
729 }
730 })
731 })
732 })
733 });
734 }
735 fn toggle_deafen(&self, cx: &mut AppContext) {
736 self.active_call.as_ref().map(|call| {
737 call.0.update(cx, |this, cx| {
738 this.room().map(|room| {
739 room.update(cx, |this, cx| {
740 this.toggle_deafen(cx).log_err();
741 })
742 })
743 })
744 });
745 }
746 fn is_deafened(&self, cx: &AppContext) -> Option<bool> {
747 self.active_call
748 .as_ref()
749 .map(|call| {
750 call.0
751 .read(cx)
752 .room()
753 .map(|room| room.read(cx).is_deafened())
754 })
755 .flatten()
756 .flatten()
757 }
758}
759
760#[cfg(test)]
761mod test {
762 use gpui::TestAppContext;
763
764 use crate::OneAtATime;
765
766 #[gpui::test]
767 async fn test_one_at_a_time(cx: &mut TestAppContext) {
768 let mut one_at_a_time = OneAtATime { cancel: None };
769
770 assert_eq!(
771 cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
772 .await
773 .unwrap(),
774 Some(1)
775 );
776
777 let (a, b) = cx.update(|cx| {
778 (
779 one_at_a_time.spawn(cx, |_| async {
780 assert!(false);
781 Ok(2)
782 }),
783 one_at_a_time.spawn(cx, |_| async { Ok(3) }),
784 )
785 });
786
787 assert_eq!(a.await.unwrap(), None);
788 assert_eq!(b.await.unwrap(), Some(3));
789
790 let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
791 drop(one_at_a_time);
792
793 assert_eq!(promise.await.unwrap(), None);
794 }
795}