channel_buffer.rs

  1use crate::Channel;
  2use anyhow::Result;
  3use client::{Client, Collaborator, UserStore};
  4use collections::HashMap;
  5use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
  6use language::proto::serialize_version;
  7use rpc::{
  8    proto::{self, PeerId},
  9    TypedEnvelope,
 10};
 11use std::{sync::Arc, time::Duration};
 12use util::ResultExt;
 13
 14pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
 15
 16pub(crate) fn init(client: &Arc<Client>) {
 17    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
 18    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
 19}
 20
 21pub struct ChannelBuffer {
 22    pub(crate) channel: Arc<Channel>,
 23    connected: bool,
 24    collaborators: HashMap<PeerId, Collaborator>,
 25    user_store: ModelHandle<UserStore>,
 26    buffer: ModelHandle<language::Buffer>,
 27    buffer_epoch: u64,
 28    client: Arc<Client>,
 29    subscription: Option<client::Subscription>,
 30    acknowledge_task: Option<Task<Result<()>>>,
 31}
 32
 33pub enum ChannelBufferEvent {
 34    CollaboratorsChanged,
 35    Disconnected,
 36    BufferEdited,
 37}
 38
 39impl Entity for ChannelBuffer {
 40    type Event = ChannelBufferEvent;
 41
 42    fn release(&mut self, _: &mut AppContext) {
 43        if self.connected {
 44            if let Some(task) = self.acknowledge_task.take() {
 45                task.detach();
 46            }
 47            self.client
 48                .send(proto::LeaveChannelBuffer {
 49                    channel_id: self.channel.id,
 50                })
 51                .log_err();
 52        }
 53    }
 54}
 55
 56impl ChannelBuffer {
 57    pub(crate) async fn new(
 58        channel: Arc<Channel>,
 59        client: Arc<Client>,
 60        user_store: ModelHandle<UserStore>,
 61        mut cx: AsyncAppContext,
 62    ) -> Result<ModelHandle<Self>> {
 63        let response = client
 64            .request(proto::JoinChannelBuffer {
 65                channel_id: channel.id,
 66            })
 67            .await?;
 68
 69        let base_text = response.base_text;
 70        let operations = response
 71            .operations
 72            .into_iter()
 73            .map(language::proto::deserialize_operation)
 74            .collect::<Result<Vec<_>, _>>()?;
 75
 76        let buffer = cx.add_model(|_| {
 77            language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
 78        });
 79        buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 80
 81        let subscription = client.subscribe_to_entity(channel.id)?;
 82
 83        anyhow::Ok(cx.add_model(|cx| {
 84            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 85
 86            let mut this = Self {
 87                buffer,
 88                buffer_epoch: response.epoch,
 89                client,
 90                connected: true,
 91                collaborators: Default::default(),
 92                acknowledge_task: None,
 93                channel,
 94                subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
 95                user_store,
 96            };
 97            this.replace_collaborators(response.collaborators, cx);
 98            this
 99        }))
100    }
101
102    pub fn remote_id(&self, cx: &AppContext) -> u64 {
103        self.buffer.read(cx).remote_id()
104    }
105
106    pub fn user_store(&self) -> &ModelHandle<UserStore> {
107        &self.user_store
108    }
109
110    pub(crate) fn replace_collaborators(
111        &mut self,
112        collaborators: Vec<proto::Collaborator>,
113        cx: &mut ModelContext<Self>,
114    ) {
115        let mut new_collaborators = HashMap::default();
116        for collaborator in collaborators {
117            if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
118                new_collaborators.insert(collaborator.peer_id, collaborator);
119            }
120        }
121
122        for (_, old_collaborator) in &self.collaborators {
123            if !new_collaborators.contains_key(&old_collaborator.peer_id) {
124                self.buffer.update(cx, |buffer, cx| {
125                    buffer.remove_peer(old_collaborator.replica_id as u16, cx)
126                });
127            }
128        }
129        self.collaborators = new_collaborators;
130        cx.emit(ChannelBufferEvent::CollaboratorsChanged);
131        cx.notify();
132    }
133
134    async fn handle_update_channel_buffer(
135        this: ModelHandle<Self>,
136        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
137        _: Arc<Client>,
138        mut cx: AsyncAppContext,
139    ) -> Result<()> {
140        let ops = update_channel_buffer
141            .payload
142            .operations
143            .into_iter()
144            .map(language::proto::deserialize_operation)
145            .collect::<Result<Vec<_>, _>>()?;
146
147        this.update(&mut cx, |this, cx| {
148            cx.notify();
149            this.buffer
150                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
151        })?;
152
153        Ok(())
154    }
155
156    async fn handle_update_channel_buffer_collaborators(
157        this: ModelHandle<Self>,
158        message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
159        _: Arc<Client>,
160        mut cx: AsyncAppContext,
161    ) -> Result<()> {
162        this.update(&mut cx, |this, cx| {
163            this.replace_collaborators(message.payload.collaborators, cx);
164            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
165            cx.notify();
166        });
167
168        Ok(())
169    }
170
171    fn on_buffer_update(
172        &mut self,
173        _: ModelHandle<language::Buffer>,
174        event: &language::Event,
175        cx: &mut ModelContext<Self>,
176    ) {
177        match event {
178            language::Event::Operation(operation) => {
179                let operation = language::proto::serialize_operation(operation);
180                self.client
181                    .send(proto::UpdateChannelBuffer {
182                        channel_id: self.channel.id,
183                        operations: vec![operation],
184                    })
185                    .log_err();
186            }
187            language::Event::Edited => {
188                cx.emit(ChannelBufferEvent::BufferEdited);
189            }
190            _ => {}
191        }
192    }
193
194    pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
195        let buffer = self.buffer.read(cx);
196        let version = buffer.version();
197        let buffer_id = buffer.remote_id();
198        let client = self.client.clone();
199        let epoch = self.epoch();
200
201        self.acknowledge_task = Some(cx.spawn_weak(|_, cx| async move {
202            cx.background().timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL).await;
203            client
204                .send(proto::AckBufferOperation {
205                    buffer_id,
206                    epoch,
207                    version: serialize_version(&version),
208                })
209                .ok();
210            Ok(())
211        }));
212    }
213
214    pub fn epoch(&self) -> u64 {
215        self.buffer_epoch
216    }
217
218    pub fn buffer(&self) -> ModelHandle<language::Buffer> {
219        self.buffer.clone()
220    }
221
222    pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
223        &self.collaborators
224    }
225
226    pub fn channel(&self) -> Arc<Channel> {
227        self.channel.clone()
228    }
229
230    pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
231        log::info!("channel buffer {} disconnected", self.channel.id);
232        if self.connected {
233            self.connected = false;
234            self.subscription.take();
235            cx.emit(ChannelBufferEvent::Disconnected);
236            cx.notify()
237        }
238    }
239
240    pub fn is_connected(&self) -> bool {
241        self.connected
242    }
243
244    pub fn replica_id(&self, cx: &AppContext) -> u16 {
245        self.buffer.read(cx).replica_id()
246    }
247}