channel_buffer.rs

  1use crate::{Channel, ChannelId, ChannelStore};
  2use anyhow::Result;
  3use client::{Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
  4use collections::HashMap;
  5use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
  6use language::proto::serialize_version;
  7use rpc::{
  8    proto::{self, PeerId},
  9    TypedEnvelope,
 10};
 11use std::{sync::Arc, time::Duration};
 12use util::ResultExt;
 13
 14pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
 15
 16pub(crate) fn init(client: &Arc<Client>) {
 17    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
 18    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
 19}
 20
 21pub struct ChannelBuffer {
 22    pub channel_id: ChannelId,
 23    connected: bool,
 24    collaborators: HashMap<PeerId, Collaborator>,
 25    user_store: Model<UserStore>,
 26    channel_store: Model<ChannelStore>,
 27    buffer: Model<language::Buffer>,
 28    buffer_epoch: u64,
 29    client: Arc<Client>,
 30    subscription: Option<client::Subscription>,
 31    acknowledge_task: Option<Task<Result<()>>>,
 32}
 33
 34pub enum ChannelBufferEvent {
 35    CollaboratorsChanged,
 36    Disconnected,
 37    BufferEdited,
 38    ChannelChanged,
 39}
 40
 41impl EventEmitter<ChannelBufferEvent> for ChannelBuffer {}
 42
 43impl ChannelBuffer {
 44    pub(crate) async fn new(
 45        channel: Arc<Channel>,
 46        client: Arc<Client>,
 47        user_store: Model<UserStore>,
 48        channel_store: Model<ChannelStore>,
 49        mut cx: AsyncAppContext,
 50    ) -> Result<Model<Self>> {
 51        let response = client
 52            .request(proto::JoinChannelBuffer {
 53                channel_id: channel.id,
 54            })
 55            .await?;
 56
 57        let base_text = response.base_text;
 58        let operations = response
 59            .operations
 60            .into_iter()
 61            .map(language::proto::deserialize_operation)
 62            .collect::<Result<Vec<_>, _>>()?;
 63
 64        let buffer = cx.new_model(|cx| {
 65            let capability = channel_store.read(cx).channel_capability(channel.id);
 66            language::Buffer::remote(
 67                response.buffer_id,
 68                response.replica_id as u16,
 69                capability,
 70                base_text,
 71            )
 72        })?;
 73        buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))??;
 74
 75        let subscription = client.subscribe_to_entity(channel.id)?;
 76
 77        anyhow::Ok(cx.new_model(|cx| {
 78            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 79            cx.on_release(Self::release).detach();
 80            let mut this = Self {
 81                buffer,
 82                buffer_epoch: response.epoch,
 83                client,
 84                connected: true,
 85                collaborators: Default::default(),
 86                acknowledge_task: None,
 87                channel_id: channel.id,
 88                subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
 89                user_store,
 90                channel_store,
 91            };
 92            this.replace_collaborators(response.collaborators, cx);
 93            this
 94        })?)
 95    }
 96
 97    fn release(&mut self, _: &mut AppContext) {
 98        if self.connected {
 99            if let Some(task) = self.acknowledge_task.take() {
100                task.detach();
101            }
102            self.client
103                .send(proto::LeaveChannelBuffer {
104                    channel_id: self.channel_id,
105                })
106                .log_err();
107        }
108    }
109
110    pub fn remote_id(&self, cx: &AppContext) -> u64 {
111        self.buffer.read(cx).remote_id()
112    }
113
114    pub fn user_store(&self) -> &Model<UserStore> {
115        &self.user_store
116    }
117
118    pub(crate) fn replace_collaborators(
119        &mut self,
120        collaborators: Vec<proto::Collaborator>,
121        cx: &mut ModelContext<Self>,
122    ) {
123        let mut new_collaborators = HashMap::default();
124        for collaborator in collaborators {
125            if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
126                new_collaborators.insert(collaborator.peer_id, collaborator);
127            }
128        }
129
130        for (_, old_collaborator) in &self.collaborators {
131            if !new_collaborators.contains_key(&old_collaborator.peer_id) {
132                self.buffer.update(cx, |buffer, cx| {
133                    buffer.remove_peer(old_collaborator.replica_id as u16, cx)
134                });
135            }
136        }
137        self.collaborators = new_collaborators;
138        cx.emit(ChannelBufferEvent::CollaboratorsChanged);
139        cx.notify();
140    }
141
142    async fn handle_update_channel_buffer(
143        this: Model<Self>,
144        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
145        _: Arc<Client>,
146        mut cx: AsyncAppContext,
147    ) -> Result<()> {
148        let ops = update_channel_buffer
149            .payload
150            .operations
151            .into_iter()
152            .map(language::proto::deserialize_operation)
153            .collect::<Result<Vec<_>, _>>()?;
154
155        this.update(&mut cx, |this, cx| {
156            cx.notify();
157            this.buffer
158                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
159        })??;
160
161        Ok(())
162    }
163
164    async fn handle_update_channel_buffer_collaborators(
165        this: Model<Self>,
166        message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
167        _: Arc<Client>,
168        mut cx: AsyncAppContext,
169    ) -> Result<()> {
170        this.update(&mut cx, |this, cx| {
171            this.replace_collaborators(message.payload.collaborators, cx);
172            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
173            cx.notify();
174        })
175    }
176
177    fn on_buffer_update(
178        &mut self,
179        _: Model<language::Buffer>,
180        event: &language::Event,
181        cx: &mut ModelContext<Self>,
182    ) {
183        match event {
184            language::Event::Operation(operation) => {
185                if *ZED_ALWAYS_ACTIVE {
186                    match operation {
187                        language::Operation::UpdateSelections { selections, .. } => {
188                            if selections.is_empty() {
189                                return;
190                            }
191                        }
192                        _ => {}
193                    }
194                }
195                let operation = language::proto::serialize_operation(operation);
196                self.client
197                    .send(proto::UpdateChannelBuffer {
198                        channel_id: self.channel_id,
199                        operations: vec![operation],
200                    })
201                    .log_err();
202            }
203            language::Event::Edited => {
204                cx.emit(ChannelBufferEvent::BufferEdited);
205            }
206            _ => {}
207        }
208    }
209
210    pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
211        let buffer = self.buffer.read(cx);
212        let version = buffer.version();
213        let buffer_id = buffer.remote_id();
214        let client = self.client.clone();
215        let epoch = self.epoch();
216
217        self.acknowledge_task = Some(cx.spawn(move |_, cx| async move {
218            cx.background_executor()
219                .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
220                .await;
221            client
222                .send(proto::AckBufferOperation {
223                    buffer_id,
224                    epoch,
225                    version: serialize_version(&version),
226                })
227                .ok();
228            Ok(())
229        }));
230    }
231
232    pub fn epoch(&self) -> u64 {
233        self.buffer_epoch
234    }
235
236    pub fn buffer(&self) -> Model<language::Buffer> {
237        self.buffer.clone()
238    }
239
240    pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
241        &self.collaborators
242    }
243
244    pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
245        self.channel_store
246            .read(cx)
247            .channel_for_id(self.channel_id)
248            .cloned()
249    }
250
251    pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
252        log::info!("channel buffer {} disconnected", self.channel_id);
253        if self.connected {
254            self.connected = false;
255            self.subscription.take();
256            cx.emit(ChannelBufferEvent::Disconnected);
257            cx.notify()
258        }
259    }
260
261    pub(crate) fn channel_changed(&mut self, cx: &mut ModelContext<Self>) {
262        cx.emit(ChannelBufferEvent::ChannelChanged);
263        cx.notify()
264    }
265
266    pub fn is_connected(&self) -> bool {
267        self.connected
268    }
269
270    pub fn replica_id(&self, cx: &AppContext) -> u16 {
271        self.buffer.read(cx).replica_id()
272    }
273}