mod.rs

  1pub mod diagnostics;
  2pub mod participant;
  3pub mod room;
  4
  5use anyhow::{Context as _, Result, anyhow};
  6use audio::Audio;
  7use client::{ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE, proto};
  8use collections::HashSet;
  9use futures::{Future, FutureExt, channel::oneshot, future::Shared};
 10use gpui::{
 11    AnyView, App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task,
 12    WeakEntity, Window,
 13};
 14use postage::watch;
 15use project::Project;
 16use room::Event;
 17use settings::Settings;
 18use std::sync::Arc;
 19use workspace::{
 20    ActiveCallEvent, AnyActiveCall, GlobalAnyActiveCall, MultiWorkspace, MultiWorkspaceEvent, Pane,
 21    RemoteCollaborator, SharedScreen, Workspace,
 22};
 23
 24pub use livekit_client::{RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent};
 25pub use room::Room;
 26
 27use crate::call_settings::CallSettings;
 28
 29pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
 30    let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
 31    let active_call_handle = active_call.downgrade();
 32
 33    cx.observe_new(move |_multi_workspace: &mut MultiWorkspace, window, cx| {
 34        let Some(window) = window else {
 35            return;
 36        };
 37
 38        let active_call_handle = active_call_handle.clone();
 39        cx.subscribe_in(
 40            &cx.entity(),
 41            window,
 42            move |multi_workspace, _, event: &MultiWorkspaceEvent, window, cx| {
 43                if !matches!(event, MultiWorkspaceEvent::ActiveWorkspaceChanged)
 44                    && window.is_window_active()
 45                {
 46                    return;
 47                }
 48
 49                let project = multi_workspace.workspace().read(cx).project().clone();
 50                if let Ok(task) = active_call_handle.update(cx, |active_call, cx| {
 51                    active_call.set_location(Some(&project), cx)
 52                }) {
 53                    task.detach_and_log_err(cx);
 54                }
 55            },
 56        )
 57        .detach();
 58    })
 59    .detach();
 60
 61    cx.set_global(GlobalAnyActiveCall(Arc::new(ActiveCallEntity(active_call))))
 62}
 63
 64#[derive(Clone)]
 65struct ActiveCallEntity(Entity<ActiveCall>);
 66
 67impl AnyActiveCall for ActiveCallEntity {
 68    fn entity(&self) -> gpui::AnyEntity {
 69        self.0.clone().into_any()
 70    }
 71
 72    fn is_in_room(&self, cx: &App) -> bool {
 73        self.0.read(cx).room().is_some()
 74    }
 75
 76    fn room_id(&self, cx: &App) -> Option<u64> {
 77        Some(self.0.read(cx).room()?.read(cx).id())
 78    }
 79
 80    fn channel_id(&self, cx: &App) -> Option<ChannelId> {
 81        self.0.read(cx).room()?.read(cx).channel_id()
 82    }
 83
 84    fn hang_up(&self, cx: &mut App) -> Task<Result<()>> {
 85        self.0.update(cx, |this, cx| this.hang_up(cx))
 86    }
 87
 88    fn unshare_project(&self, project: Entity<Project>, cx: &mut App) -> Result<()> {
 89        self.0
 90            .update(cx, |this, cx| this.unshare_project(project, cx))
 91    }
 92
 93    fn remote_participant_for_peer_id(
 94        &self,
 95        peer_id: proto::PeerId,
 96        cx: &App,
 97    ) -> Option<workspace::RemoteCollaborator> {
 98        let room = self.0.read(cx).room()?.read(cx);
 99        let participant = room.remote_participant_for_peer_id(peer_id)?;
100        Some(RemoteCollaborator {
101            user: participant.user.clone(),
102            peer_id: participant.peer_id,
103            location: participant.location,
104            participant_index: participant.participant_index,
105        })
106    }
107
108    fn is_sharing_project(&self, cx: &App) -> bool {
109        self.0
110            .read(cx)
111            .room()
112            .map_or(false, |room| room.read(cx).is_sharing_project())
113    }
114
115    fn has_remote_participants(&self, cx: &App) -> bool {
116        self.0.read(cx).room().map_or(false, |room| {
117            !room.read(cx).remote_participants().is_empty()
118        })
119    }
120
121    fn local_participant_is_guest(&self, cx: &App) -> bool {
122        self.0
123            .read(cx)
124            .room()
125            .map_or(false, |room| room.read(cx).local_participant_is_guest())
126    }
127
128    fn client(&self, cx: &App) -> Arc<Client> {
129        self.0.read(cx).client()
130    }
131
132    fn share_on_join(&self, cx: &App) -> bool {
133        CallSettings::get_global(cx).share_on_join
134    }
135
136    fn join_channel(&self, channel_id: ChannelId, cx: &mut App) -> Task<Result<bool>> {
137        let task = self
138            .0
139            .update(cx, |this, cx| this.join_channel(channel_id, cx));
140        cx.spawn(async move |_cx| {
141            let result = task.await?;
142            Ok(result.is_some())
143        })
144    }
145
146    fn room_update_completed(&self, cx: &mut App) -> Task<()> {
147        let Some(room) = self.0.read(cx).room().cloned() else {
148            return Task::ready(());
149        };
150        let future = room.update(cx, |room, _cx| room.room_update_completed());
151        cx.spawn(async move |_cx| {
152            future.await;
153        })
154    }
155
156    fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
157        let room = self.0.read(cx).room()?;
158        room.read(cx).most_active_project(cx)
159    }
160
161    fn share_project(&self, project: Entity<Project>, cx: &mut App) -> Task<Result<u64>> {
162        self.0
163            .update(cx, |this, cx| this.share_project(project, cx))
164    }
165
166    fn join_project(
167        &self,
168        project_id: u64,
169        language_registry: Arc<language::LanguageRegistry>,
170        fs: Arc<dyn fs::Fs>,
171        cx: &mut App,
172    ) -> Task<Result<Entity<Project>>> {
173        let Some(room) = self.0.read(cx).room().cloned() else {
174            return Task::ready(Err(anyhow::anyhow!("not in a call")));
175        };
176        room.update(cx, |room, cx| {
177            room.join_project(project_id, language_registry, fs, cx)
178        })
179    }
180
181    fn peer_id_for_user_in_room(&self, user_id: u64, cx: &App) -> Option<proto::PeerId> {
182        let room = self.0.read(cx).room()?.read(cx);
183        room.remote_participants()
184            .values()
185            .find(|p| p.user.id == user_id)
186            .map(|p| p.peer_id)
187    }
188
189    fn subscribe(
190        &self,
191        window: &mut Window,
192        cx: &mut Context<Workspace>,
193        handler: Box<
194            dyn Fn(&mut Workspace, &ActiveCallEvent, &mut Window, &mut Context<Workspace>),
195        >,
196    ) -> Subscription {
197        cx.subscribe_in(
198            &self.0,
199            window,
200            move |workspace, _, event: &room::Event, window, cx| {
201                let mapped = match event {
202                    room::Event::ParticipantLocationChanged { participant_id } => {
203                        Some(ActiveCallEvent::ParticipantLocationChanged {
204                            participant_id: *participant_id,
205                        })
206                    }
207                    room::Event::RemoteVideoTracksChanged { participant_id } => {
208                        Some(ActiveCallEvent::RemoteVideoTracksChanged {
209                            participant_id: *participant_id,
210                        })
211                    }
212                    _ => None,
213                };
214                if let Some(event) = mapped {
215                    handler(workspace, &event, window, cx);
216                }
217            },
218        )
219    }
220
221    fn create_shared_screen(
222        &self,
223        peer_id: client::proto::PeerId,
224        pane: &Entity<Pane>,
225        window: &mut Window,
226        cx: &mut App,
227    ) -> Option<Entity<workspace::SharedScreen>> {
228        let room = self.0.read(cx).room()?.clone();
229        let participant = room.read(cx).remote_participant_for_peer_id(peer_id)?;
230        let track = participant.video_tracks.values().next()?.clone();
231        let user = participant.user.clone();
232
233        for item in pane.read(cx).items_of_type::<SharedScreen>() {
234            if item.read(cx).peer_id == peer_id {
235                return Some(item);
236            }
237        }
238
239        Some(cx.new(|cx: &mut Context<SharedScreen>| {
240            let my_sid = track.sid();
241            cx.subscribe(
242                &room,
243                move |_: &mut SharedScreen,
244                      _: Entity<Room>,
245                      ev: &room::Event,
246                      cx: &mut Context<SharedScreen>| {
247                    if let room::Event::RemoteVideoTrackUnsubscribed { sid } = ev
248                        && *sid == my_sid
249                    {
250                        cx.emit(workspace::shared_screen::Event::Close);
251                    }
252                },
253            )
254            .detach();
255
256            cx.observe_release(
257                &room,
258                |_: &mut SharedScreen, _: &mut Room, cx: &mut Context<SharedScreen>| {
259                    cx.emit(workspace::shared_screen::Event::Close);
260                },
261            )
262            .detach();
263
264            let view = cx.new(|cx| RemoteVideoTrackView::new(track.clone(), window, cx));
265            cx.subscribe(
266                &view,
267                |_: &mut SharedScreen,
268                 _: Entity<RemoteVideoTrackView>,
269                 ev: &RemoteVideoTrackViewEvent,
270                 cx: &mut Context<SharedScreen>| match ev {
271                    RemoteVideoTrackViewEvent::Close => {
272                        cx.emit(workspace::shared_screen::Event::Close);
273                    }
274                },
275            )
276            .detach();
277
278            pub(super) fn clone_remote_video_track_view(
279                view: &AnyView,
280                window: &mut Window,
281                cx: &mut App,
282            ) -> AnyView {
283                let view = view
284                    .clone()
285                    .downcast::<RemoteVideoTrackView>()
286                    .expect("SharedScreen view must be a RemoteVideoTrackView");
287                let cloned = view.update(cx, |view, cx| view.clone(window, cx));
288                AnyView::from(cloned)
289            }
290
291            SharedScreen::new(
292                peer_id,
293                user,
294                AnyView::from(view),
295                clone_remote_video_track_view,
296                cx,
297            )
298        }))
299    }
300}
301
302pub struct OneAtATime {
303    cancel: Option<oneshot::Sender<()>>,
304}
305
306impl OneAtATime {
307    /// spawn a task in the given context.
308    /// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
309    /// otherwise you'll see the result of the task.
310    fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
311    where
312        F: 'static + FnOnce(AsyncApp) -> Fut,
313        Fut: Future<Output = Result<R>>,
314        R: 'static,
315    {
316        let (tx, rx) = oneshot::channel();
317        self.cancel.replace(tx);
318        cx.spawn(async move |cx| {
319            futures::select_biased! {
320                _ = rx.fuse() => Ok(None),
321                result = f(cx.clone()).fuse() => result.map(Some),
322            }
323        })
324    }
325
326    fn running(&self) -> bool {
327        self.cancel
328            .as_ref()
329            .is_some_and(|cancel| !cancel.is_canceled())
330    }
331}
332
333#[derive(Clone)]
334pub struct IncomingCall {
335    pub room_id: u64,
336    pub calling_user: Arc<User>,
337    pub participants: Vec<Arc<User>>,
338    pub initial_project: Option<proto::ParticipantProject>,
339}
340
341/// Singleton global maintaining the user's participation in a room across workspaces.
342pub struct ActiveCall {
343    room: Option<(Entity<Room>, Vec<Subscription>)>,
344    pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
345    location: Option<WeakEntity<Project>>,
346    _join_debouncer: OneAtATime,
347    pending_invites: HashSet<u64>,
348    incoming_call: (
349        watch::Sender<Option<IncomingCall>>,
350        watch::Receiver<Option<IncomingCall>>,
351    ),
352    client: Arc<Client>,
353    user_store: Entity<UserStore>,
354    _subscriptions: Vec<client::Subscription>,
355}
356
357impl EventEmitter<Event> for ActiveCall {}
358
359impl ActiveCall {
360    fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
361        Self {
362            room: None,
363            pending_room_creation: None,
364            location: None,
365            pending_invites: Default::default(),
366            incoming_call: watch::channel(),
367            _join_debouncer: OneAtATime { cancel: None },
368            _subscriptions: vec![
369                client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
370                client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
371            ],
372            client,
373            user_store,
374        }
375    }
376
377    pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
378        self.room()?.read(cx).channel_id()
379    }
380
381    async fn handle_incoming_call(
382        this: Entity<Self>,
383        envelope: TypedEnvelope<proto::IncomingCall>,
384        mut cx: AsyncApp,
385    ) -> Result<proto::Ack> {
386        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
387        let call = IncomingCall {
388            room_id: envelope.payload.room_id,
389            participants: user_store
390                .update(&mut cx, |user_store, cx| {
391                    user_store.get_users(envelope.payload.participant_user_ids, cx)
392                })
393                .await?,
394            calling_user: user_store
395                .update(&mut cx, |user_store, cx| {
396                    user_store.get_user(envelope.payload.calling_user_id, cx)
397                })
398                .await?,
399            initial_project: envelope.payload.initial_project,
400        };
401        this.update(&mut cx, |this, _| {
402            *this.incoming_call.0.borrow_mut() = Some(call);
403        });
404
405        Ok(proto::Ack {})
406    }
407
408    async fn handle_call_canceled(
409        this: Entity<Self>,
410        envelope: TypedEnvelope<proto::CallCanceled>,
411        mut cx: AsyncApp,
412    ) -> Result<()> {
413        this.update(&mut cx, |this, _| {
414            let mut incoming_call = this.incoming_call.0.borrow_mut();
415            if incoming_call
416                .as_ref()
417                .is_some_and(|call| call.room_id == envelope.payload.room_id)
418            {
419                incoming_call.take();
420            }
421        });
422        Ok(())
423    }
424
425    pub fn global(cx: &App) -> Entity<Self> {
426        Self::try_global(cx).unwrap()
427    }
428
429    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
430        let any = cx.try_global::<GlobalAnyActiveCall>()?;
431        any.0.entity().downcast::<Self>().ok()
432    }
433
434    pub fn invite(
435        &mut self,
436        called_user_id: u64,
437        initial_project: Option<Entity<Project>>,
438        cx: &mut Context<Self>,
439    ) -> Task<Result<()>> {
440        if !self.pending_invites.insert(called_user_id) {
441            return Task::ready(Err(anyhow!("user was already invited")));
442        }
443        cx.notify();
444
445        if self._join_debouncer.running() {
446            return Task::ready(Ok(()));
447        }
448
449        let room = if let Some(room) = self.room().cloned() {
450            Some(Task::ready(Ok(room)).shared())
451        } else {
452            self.pending_room_creation.clone()
453        };
454
455        let invite = if let Some(room) = room {
456            cx.spawn(async move |_, cx| {
457                let room = room.await.map_err(|err| anyhow!("{err:?}"))?;
458
459                let initial_project_id = if let Some(initial_project) = initial_project {
460                    Some(
461                        room.update(cx, |room, cx| room.share_project(initial_project, cx))
462                            .await?,
463                    )
464                } else {
465                    None
466                };
467
468                room.update(cx, move |room, cx| {
469                    room.call(called_user_id, initial_project_id, cx)
470                })
471                .await?;
472
473                anyhow::Ok(())
474            })
475        } else {
476            let client = self.client.clone();
477            let user_store = self.user_store.clone();
478            let room = cx
479                .spawn(async move |this, cx| {
480                    let create_room = async {
481                        let room = cx
482                            .update(|cx| {
483                                Room::create(
484                                    called_user_id,
485                                    initial_project,
486                                    client,
487                                    user_store,
488                                    cx,
489                                )
490                            })
491                            .await?;
492
493                        this.update(cx, |this, cx| this.set_room(Some(room.clone()), cx))?
494                            .await?;
495
496                        anyhow::Ok(room)
497                    };
498
499                    let room = create_room.await;
500                    this.update(cx, |this, _| this.pending_room_creation = None)?;
501                    room.map_err(Arc::new)
502                })
503                .shared();
504            self.pending_room_creation = Some(room.clone());
505            cx.background_spawn(async move {
506                room.await.map_err(|err| anyhow!("{err:?}"))?;
507                anyhow::Ok(())
508            })
509        };
510
511        cx.spawn(async move |this, cx| {
512            let result = invite.await;
513            if result.is_ok() {
514                this.update(cx, |this, cx| {
515                    this.report_call_event("Participant Invited", cx)
516                })?;
517            } else {
518                //TODO: report collaboration error
519                log::error!("invite failed: {:?}", result);
520            }
521
522            this.update(cx, |this, cx| {
523                this.pending_invites.remove(&called_user_id);
524                cx.notify();
525            })?;
526            result
527        })
528    }
529
530    pub fn cancel_invite(
531        &mut self,
532        called_user_id: u64,
533        cx: &mut Context<Self>,
534    ) -> Task<Result<()>> {
535        let room_id = if let Some(room) = self.room() {
536            room.read(cx).id()
537        } else {
538            return Task::ready(Err(anyhow!("no active call")));
539        };
540
541        let client = self.client.clone();
542        cx.background_spawn(async move {
543            client
544                .request(proto::CancelCall {
545                    room_id,
546                    called_user_id,
547                })
548                .await?;
549            anyhow::Ok(())
550        })
551    }
552
553    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
554        self.incoming_call.1.clone()
555    }
556
557    pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
558        if self.room.is_some() {
559            return Task::ready(Err(anyhow!("cannot join while on another call")));
560        }
561
562        let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
563            call
564        } else {
565            return Task::ready(Err(anyhow!("no incoming call")));
566        };
567
568        if self.pending_room_creation.is_some() {
569            return Task::ready(Ok(()));
570        }
571
572        let room_id = call.room_id;
573        let client = self.client.clone();
574        let user_store = self.user_store.clone();
575        let join = self
576            ._join_debouncer
577            .spawn(cx, move |cx| Room::join(room_id, client, user_store, cx));
578
579        cx.spawn(async move |this, cx| {
580            let room = join.await?;
581            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
582                .await?;
583            this.update(cx, |this, cx| {
584                this.report_call_event("Incoming Call Accepted", cx)
585            })?;
586            Ok(())
587        })
588    }
589
590    pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
591        let call = self
592            .incoming_call
593            .0
594            .borrow_mut()
595            .take()
596            .context("no incoming call")?;
597        telemetry::event!("Incoming Call Declined", room_id = call.room_id);
598        self.client.send(proto::DeclineCall {
599            room_id: call.room_id,
600        })?;
601        Ok(())
602    }
603
604    pub fn join_channel(
605        &mut self,
606        channel_id: ChannelId,
607        cx: &mut Context<Self>,
608    ) -> Task<Result<Option<Entity<Room>>>> {
609        if let Some(room) = self.room().cloned() {
610            if room.read(cx).channel_id() == Some(channel_id) {
611                return Task::ready(Ok(Some(room)));
612            } else {
613                room.update(cx, |room, cx| room.clear_state(cx));
614            }
615        }
616
617        if self.pending_room_creation.is_some() {
618            return Task::ready(Ok(None));
619        }
620
621        let client = self.client.clone();
622        let user_store = self.user_store.clone();
623        let join = self._join_debouncer.spawn(cx, move |cx| async move {
624            Room::join_channel(channel_id, client, user_store, cx).await
625        });
626
627        cx.spawn(async move |this, cx| {
628            let room = join.await?;
629            this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
630                .await?;
631            this.update(cx, |this, cx| this.report_call_event("Channel Joined", cx))?;
632            Ok(room)
633        })
634    }
635
636    pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
637        cx.notify();
638        self.report_call_event("Call Ended", cx);
639
640        Audio::end_call(cx);
641
642        let channel_id = self.channel_id(cx);
643        if let Some((room, _)) = self.room.take() {
644            cx.emit(Event::RoomLeft { channel_id });
645            room.update(cx, |room, cx| room.leave(cx))
646        } else {
647            Task::ready(Ok(()))
648        }
649    }
650
651    pub fn share_project(
652        &mut self,
653        project: Entity<Project>,
654        cx: &mut Context<Self>,
655    ) -> Task<Result<u64>> {
656        if let Some((room, _)) = self.room.as_ref() {
657            self.report_call_event("Project Shared", cx);
658            room.update(cx, |room, cx| room.share_project(project, cx))
659        } else {
660            Task::ready(Err(anyhow!("no active call")))
661        }
662    }
663
664    pub fn unshare_project(
665        &mut self,
666        project: Entity<Project>,
667        cx: &mut Context<Self>,
668    ) -> Result<()> {
669        let (room, _) = self.room.as_ref().context("no active call")?;
670        self.report_call_event("Project Unshared", cx);
671        room.update(cx, |room, cx| room.unshare_project(project, cx))
672    }
673
674    pub fn location(&self) -> Option<&WeakEntity<Project>> {
675        self.location.as_ref()
676    }
677
678    pub fn set_location(
679        &mut self,
680        project: Option<&Entity<Project>>,
681        cx: &mut Context<Self>,
682    ) -> Task<Result<()>> {
683        if project.is_some() || !*ZED_ALWAYS_ACTIVE {
684            self.location = project.map(|project| project.downgrade());
685            if let Some((room, _)) = self.room.as_ref() {
686                return room.update(cx, |room, cx| room.set_location(project, cx));
687            }
688        }
689        Task::ready(Ok(()))
690    }
691
692    fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
693        if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
694            Task::ready(Ok(()))
695        } else {
696            cx.notify();
697            if let Some(room) = room {
698                if room.read(cx).status().is_offline() {
699                    self.room = None;
700                    Task::ready(Ok(()))
701                } else {
702                    let subscriptions = vec![
703                        cx.observe(&room, |this, room, cx| {
704                            if room.read(cx).status().is_offline() {
705                                this.set_room(None, cx).detach_and_log_err(cx);
706                            }
707
708                            cx.notify();
709                        }),
710                        cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
711                    ];
712                    self.room = Some((room.clone(), subscriptions));
713                    let location = self
714                        .location
715                        .as_ref()
716                        .and_then(|location| location.upgrade());
717                    let channel_id = room.read(cx).channel_id();
718                    cx.emit(Event::RoomJoined { channel_id });
719                    room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
720                }
721            } else {
722                self.room = None;
723                Task::ready(Ok(()))
724            }
725        }
726    }
727
728    pub fn room(&self) -> Option<&Entity<Room>> {
729        self.room.as_ref().map(|(room, _)| room)
730    }
731
732    pub fn client(&self) -> Arc<Client> {
733        self.client.clone()
734    }
735
736    pub fn pending_invites(&self) -> &HashSet<u64> {
737        &self.pending_invites
738    }
739
740    pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
741        if let Some(room) = self.room() {
742            let room = room.read(cx);
743            telemetry::event!(
744                operation,
745                room_id = room.id(),
746                channel_id = room.channel_id()
747            )
748        }
749    }
750}
751
752#[cfg(test)]
753mod test {
754    use gpui::TestAppContext;
755
756    use crate::OneAtATime;
757
758    #[gpui::test]
759    async fn test_one_at_a_time(cx: &mut TestAppContext) {
760        let mut one_at_a_time = OneAtATime { cancel: None };
761
762        assert_eq!(
763            cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
764                .await
765                .unwrap(),
766            Some(1)
767        );
768
769        let (a, b) = cx.update(|cx| {
770            (
771                one_at_a_time.spawn(cx, |_| async {
772                    panic!("");
773                }),
774                one_at_a_time.spawn(cx, |_| async { Ok(3) }),
775            )
776        });
777
778        assert_eq!(a.await.unwrap(), None::<u32>);
779        assert_eq!(b.await.unwrap(), Some(3));
780
781        let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
782        drop(one_at_a_time);
783
784        assert_eq!(promise.await.unwrap(), None);
785    }
786}