call.rs

  1mod participant;
  2pub mod room;
  3
  4use anyhow::{anyhow, Result};
  5use client::{proto, Client, TypedEnvelope, User, UserStore};
  6use gpui::{
  7    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
  8    Subscription, Task,
  9};
 10pub use participant::ParticipantLocation;
 11use postage::watch;
 12use project::Project;
 13pub use room::Room;
 14use std::sync::Arc;
 15
 16pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut MutableAppContext) {
 17    let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx));
 18    cx.set_global(active_call);
 19}
 20
 21#[derive(Clone)]
 22pub struct IncomingCall {
 23    pub room_id: u64,
 24    pub caller: Arc<User>,
 25    pub participants: Vec<Arc<User>>,
 26    pub initial_project_id: Option<u64>,
 27}
 28
 29pub struct ActiveCall {
 30    room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
 31    incoming_call: (
 32        watch::Sender<Option<IncomingCall>>,
 33        watch::Receiver<Option<IncomingCall>>,
 34    ),
 35    client: Arc<Client>,
 36    user_store: ModelHandle<UserStore>,
 37    _subscriptions: Vec<client::Subscription>,
 38}
 39
 40impl Entity for ActiveCall {
 41    type Event = room::Event;
 42}
 43
 44impl ActiveCall {
 45    fn new(
 46        client: Arc<Client>,
 47        user_store: ModelHandle<UserStore>,
 48        cx: &mut ModelContext<Self>,
 49    ) -> Self {
 50        Self {
 51            room: None,
 52            incoming_call: watch::channel(),
 53            _subscriptions: vec![
 54                client.add_request_handler(cx.handle(), Self::handle_incoming_call),
 55                client.add_message_handler(cx.handle(), Self::handle_cancel_call),
 56            ],
 57            client,
 58            user_store,
 59        }
 60    }
 61
 62    async fn handle_incoming_call(
 63        this: ModelHandle<Self>,
 64        envelope: TypedEnvelope<proto::IncomingCall>,
 65        _: Arc<Client>,
 66        mut cx: AsyncAppContext,
 67    ) -> Result<proto::Ack> {
 68        let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 69        let call = IncomingCall {
 70            room_id: envelope.payload.room_id,
 71            participants: user_store
 72                .update(&mut cx, |user_store, cx| {
 73                    user_store.get_users(envelope.payload.participant_user_ids, cx)
 74                })
 75                .await?,
 76            caller: user_store
 77                .update(&mut cx, |user_store, cx| {
 78                    user_store.get_user(envelope.payload.caller_user_id, cx)
 79                })
 80                .await?,
 81            initial_project_id: envelope.payload.initial_project_id,
 82        };
 83        this.update(&mut cx, |this, _| {
 84            *this.incoming_call.0.borrow_mut() = Some(call);
 85        });
 86
 87        Ok(proto::Ack {})
 88    }
 89
 90    async fn handle_cancel_call(
 91        this: ModelHandle<Self>,
 92        _: TypedEnvelope<proto::CancelCall>,
 93        _: Arc<Client>,
 94        mut cx: AsyncAppContext,
 95    ) -> Result<()> {
 96        this.update(&mut cx, |this, _| {
 97            *this.incoming_call.0.borrow_mut() = None;
 98        });
 99        Ok(())
100    }
101
102    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
103        cx.global::<ModelHandle<Self>>().clone()
104    }
105
106    pub fn invite(
107        &mut self,
108        recipient_user_id: u64,
109        initial_project: Option<ModelHandle<Project>>,
110        cx: &mut ModelContext<Self>,
111    ) -> Task<Result<()>> {
112        let room = self.room.as_ref().map(|(room, _)| room.clone());
113        let client = self.client.clone();
114        let user_store = self.user_store.clone();
115        cx.spawn(|this, mut cx| async move {
116            let room = if let Some(room) = room {
117                room
118            } else {
119                cx.update(|cx| Room::create(client, user_store, cx)).await?
120            };
121
122            let initial_project_id = if let Some(initial_project) = initial_project {
123                let room_id = room.read_with(&cx, |room, _| room.id());
124                Some(
125                    initial_project
126                        .update(&mut cx, |project, cx| project.share(room_id, cx))
127                        .await?,
128                )
129            } else {
130                None
131            };
132
133            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx));
134            room.update(&mut cx, |room, cx| {
135                room.call(recipient_user_id, initial_project_id, cx)
136            })
137            .await?;
138
139            Ok(())
140        })
141    }
142
143    pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
144        self.incoming_call.1.clone()
145    }
146
147    pub fn accept_incoming(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
148        if self.room.is_some() {
149            return Task::ready(Err(anyhow!("cannot join while on another call")));
150        }
151
152        let call = if let Some(call) = self.incoming_call.1.borrow().clone() {
153            call
154        } else {
155            return Task::ready(Err(anyhow!("no incoming call")));
156        };
157
158        let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
159        cx.spawn(|this, mut cx| async move {
160            let room = join.await?;
161            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx));
162            Ok(())
163        })
164    }
165
166    pub fn decline_incoming(&mut self) -> Result<()> {
167        *self.incoming_call.0.borrow_mut() = None;
168        self.client.send(proto::DeclineCall {})?;
169        Ok(())
170    }
171
172    pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
173        if let Some((room, _)) = self.room.take() {
174            room.update(cx, |room, cx| room.leave(cx))?;
175        }
176        Ok(())
177    }
178
179    fn set_room(&mut self, room: Option<ModelHandle<Room>>, cx: &mut ModelContext<Self>) {
180        if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
181            if let Some(room) = room {
182                let subscriptions = vec![
183                    cx.observe(&room, |_, _, cx| cx.notify()),
184                    cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
185                ];
186                self.room = Some((room, subscriptions));
187            } else {
188                self.room = None;
189            }
190            cx.notify();
191        }
192    }
193
194    pub fn room(&self) -> Option<&ModelHandle<Room>> {
195        self.room.as_ref().map(|(room, _)| room)
196    }
197}