channel_buffer.rs

  1use crate::{Channel, ChannelId, ChannelStore};
  2use anyhow::Result;
  3use client::{Client, Collaborator, UserStore};
  4use collections::HashMap;
  5use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
  6use language::proto::serialize_version;
  7use rpc::{
  8    proto::{self, PeerId},
  9    TypedEnvelope,
 10};
 11use std::{sync::Arc, time::Duration};
 12use util::ResultExt;
 13
 14pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
 15
 16pub(crate) fn init(client: &Arc<Client>) {
 17    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
 18    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
 19}
 20
 21pub struct ChannelBuffer {
 22    pub channel_id: ChannelId,
 23    connected: bool,
 24    collaborators: HashMap<PeerId, Collaborator>,
 25    user_store: Model<UserStore>,
 26    channel_store: Model<ChannelStore>,
 27    buffer: Model<language::Buffer>,
 28    buffer_epoch: u64,
 29    client: Arc<Client>,
 30    subscription: Option<client::Subscription>,
 31    acknowledge_task: Option<Task<Result<()>>>,
 32}
 33
 34pub enum ChannelBufferEvent {
 35    CollaboratorsChanged,
 36    Disconnected,
 37    BufferEdited,
 38    ChannelChanged,
 39}
 40
 41impl EventEmitter for ChannelBuffer {
 42    type Event = ChannelBufferEvent;
 43}
 44
 45impl ChannelBuffer {
 46    pub(crate) async fn new(
 47        channel: Arc<Channel>,
 48        client: Arc<Client>,
 49        user_store: Model<UserStore>,
 50        channel_store: Model<ChannelStore>,
 51        mut cx: AsyncAppContext,
 52    ) -> Result<Model<Self>> {
 53        let response = client
 54            .request(proto::JoinChannelBuffer {
 55                channel_id: channel.id,
 56            })
 57            .await?;
 58
 59        let base_text = response.base_text;
 60        let operations = response
 61            .operations
 62            .into_iter()
 63            .map(language::proto::deserialize_operation)
 64            .collect::<Result<Vec<_>, _>>()?;
 65
 66        let buffer = cx.build_model(|_| {
 67            language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
 68        })?;
 69        buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))??;
 70
 71        let subscription = client.subscribe_to_entity(channel.id)?;
 72
 73        anyhow::Ok(cx.build_model(|cx| {
 74            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 75            cx.on_release(Self::release).detach();
 76            let mut this = Self {
 77                buffer,
 78                buffer_epoch: response.epoch,
 79                client,
 80                connected: true,
 81                collaborators: Default::default(),
 82                acknowledge_task: None,
 83                channel_id: channel.id,
 84                subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
 85                user_store,
 86                channel_store,
 87            };
 88            this.replace_collaborators(response.collaborators, cx);
 89            this
 90        })?)
 91    }
 92
 93    fn release(&mut self, _: &mut AppContext) {
 94        if self.connected {
 95            if let Some(task) = self.acknowledge_task.take() {
 96                task.detach();
 97            }
 98            self.client
 99                .send(proto::LeaveChannelBuffer {
100                    channel_id: self.channel_id,
101                })
102                .log_err();
103        }
104    }
105
106    pub fn remote_id(&self, cx: &AppContext) -> u64 {
107        self.buffer.read(cx).remote_id()
108    }
109
110    pub fn user_store(&self) -> &Model<UserStore> {
111        &self.user_store
112    }
113
114    pub(crate) fn replace_collaborators(
115        &mut self,
116        collaborators: Vec<proto::Collaborator>,
117        cx: &mut ModelContext<Self>,
118    ) {
119        let mut new_collaborators = HashMap::default();
120        for collaborator in collaborators {
121            if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
122                new_collaborators.insert(collaborator.peer_id, collaborator);
123            }
124        }
125
126        for (_, old_collaborator) in &self.collaborators {
127            if !new_collaborators.contains_key(&old_collaborator.peer_id) {
128                self.buffer.update(cx, |buffer, cx| {
129                    buffer.remove_peer(old_collaborator.replica_id as u16, cx)
130                });
131            }
132        }
133        self.collaborators = new_collaborators;
134        cx.emit(ChannelBufferEvent::CollaboratorsChanged);
135        cx.notify();
136    }
137
138    async fn handle_update_channel_buffer(
139        this: Model<Self>,
140        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
141        _: Arc<Client>,
142        mut cx: AsyncAppContext,
143    ) -> Result<()> {
144        let ops = update_channel_buffer
145            .payload
146            .operations
147            .into_iter()
148            .map(language::proto::deserialize_operation)
149            .collect::<Result<Vec<_>, _>>()?;
150
151        this.update(&mut cx, |this, cx| {
152            cx.notify();
153            this.buffer
154                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
155        })??;
156
157        Ok(())
158    }
159
160    async fn handle_update_channel_buffer_collaborators(
161        this: Model<Self>,
162        message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
163        _: Arc<Client>,
164        mut cx: AsyncAppContext,
165    ) -> Result<()> {
166        this.update(&mut cx, |this, cx| {
167            this.replace_collaborators(message.payload.collaborators, cx);
168            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
169            cx.notify();
170        })
171    }
172
173    fn on_buffer_update(
174        &mut self,
175        _: Model<language::Buffer>,
176        event: &language::Event,
177        cx: &mut ModelContext<Self>,
178    ) {
179        match event {
180            language::Event::Operation(operation) => {
181                let operation = language::proto::serialize_operation(operation);
182                self.client
183                    .send(proto::UpdateChannelBuffer {
184                        channel_id: self.channel_id,
185                        operations: vec![operation],
186                    })
187                    .log_err();
188            }
189            language::Event::Edited => {
190                cx.emit(ChannelBufferEvent::BufferEdited);
191            }
192            _ => {}
193        }
194    }
195
196    pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
197        let buffer = self.buffer.read(cx);
198        let version = buffer.version();
199        let buffer_id = buffer.remote_id();
200        let client = self.client.clone();
201        let epoch = self.epoch();
202
203        self.acknowledge_task = Some(cx.spawn(move |_, cx| async move {
204            cx.background_executor()
205                .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
206                .await;
207            client
208                .send(proto::AckBufferOperation {
209                    buffer_id,
210                    epoch,
211                    version: serialize_version(&version),
212                })
213                .ok();
214            Ok(())
215        }));
216    }
217
218    pub fn epoch(&self) -> u64 {
219        self.buffer_epoch
220    }
221
222    pub fn buffer(&self) -> Model<language::Buffer> {
223        self.buffer.clone()
224    }
225
226    pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
227        &self.collaborators
228    }
229
230    pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
231        self.channel_store
232            .read(cx)
233            .channel_for_id(self.channel_id)
234            .cloned()
235    }
236
237    pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
238        log::info!("channel buffer {} disconnected", self.channel_id);
239        if self.connected {
240            self.connected = false;
241            self.subscription.take();
242            cx.emit(ChannelBufferEvent::Disconnected);
243            cx.notify()
244        }
245    }
246
247    pub(crate) fn channel_changed(&mut self, cx: &mut ModelContext<Self>) {
248        cx.emit(ChannelBufferEvent::ChannelChanged);
249        cx.notify()
250    }
251
252    pub fn is_connected(&self) -> bool {
253        self.connected
254    }
255
256    pub fn replica_id(&self, cx: &AppContext) -> u16 {
257        self.buffer.read(cx).replica_id()
258    }
259}