channel_buffer.rs

  1use crate::{Channel, ChannelStore};
  2use anyhow::Result;
  3use client::{ChannelId, Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
  4use collections::HashMap;
  5use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task};
  6use language::proto::serialize_version;
  7use rpc::{
  8    AnyProtoClient, TypedEnvelope,
  9    proto::{self, PeerId},
 10};
 11use std::{sync::Arc, time::Duration};
 12use text::{BufferId, ReplicaId};
 13use util::ResultExt;
 14
 15pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
 16
 17pub(crate) fn init(client: &AnyProtoClient) {
 18    client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer);
 19    client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
 20}
 21
 22pub struct ChannelBuffer {
 23    pub channel_id: ChannelId,
 24    connected: bool,
 25    rejoining: bool,
 26    collaborators: HashMap<PeerId, Collaborator>,
 27    user_store: Entity<UserStore>,
 28    channel_store: Entity<ChannelStore>,
 29    buffer: Entity<language::Buffer>,
 30    buffer_epoch: u64,
 31    client: Arc<Client>,
 32    subscription: Option<client::Subscription>,
 33    acknowledge_task: Option<Task<Result<()>>>,
 34}
 35
 36pub enum ChannelBufferEvent {
 37    CollaboratorsChanged,
 38    Disconnected,
 39    Connected,
 40    BufferEdited,
 41    ChannelChanged,
 42}
 43
 44impl EventEmitter<ChannelBufferEvent> for ChannelBuffer {}
 45
 46impl ChannelBuffer {
 47    pub(crate) async fn new(
 48        channel: Arc<Channel>,
 49        client: Arc<Client>,
 50        user_store: Entity<UserStore>,
 51        channel_store: Entity<ChannelStore>,
 52        cx: &mut AsyncApp,
 53    ) -> Result<Entity<Self>> {
 54        let response = client
 55            .request(proto::JoinChannelBuffer {
 56                channel_id: channel.id.0,
 57            })
 58            .await?;
 59        let buffer_id = BufferId::new(response.buffer_id)?;
 60        let base_text = response.base_text;
 61        let operations = response
 62            .operations
 63            .into_iter()
 64            .map(language::proto::deserialize_operation)
 65            .collect::<Result<Vec<_>, _>>()?;
 66
 67        let buffer = cx.new(|cx| {
 68            let capability = channel_store.read(cx).channel_capability(channel.id);
 69            language::Buffer::remote(
 70                buffer_id,
 71                ReplicaId::new(response.replica_id as u16),
 72                capability,
 73                base_text,
 74            )
 75        });
 76        buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 77
 78        let subscription = client.subscribe_to_entity(channel.id.0)?;
 79
 80        anyhow::Ok(cx.new(|cx| {
 81            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 82            cx.on_release(Self::release).detach();
 83            let mut this = Self {
 84                buffer,
 85                buffer_epoch: response.epoch,
 86                client,
 87                connected: true,
 88                rejoining: false,
 89                collaborators: Default::default(),
 90                acknowledge_task: None,
 91                channel_id: channel.id,
 92                subscription: Some(subscription.set_entity(&cx.entity(), &cx.to_async())),
 93                user_store,
 94                channel_store,
 95            };
 96            this.replace_collaborators(response.collaborators, cx);
 97            this
 98        }))
 99    }
100
101    fn release(&mut self, _: &mut App) {
102        if self.connected {
103            if let Some(task) = self.acknowledge_task.take() {
104                task.detach();
105            }
106            self.client
107                .send(proto::LeaveChannelBuffer {
108                    channel_id: self.channel_id.0,
109                })
110                .log_err();
111        }
112    }
113
114    pub fn connected(&mut self, cx: &mut Context<Self>) {
115        self.connected = true;
116        self.rejoining = false;
117        if self.subscription.is_none() {
118            let Ok(subscription) = self.client.subscribe_to_entity(self.channel_id.0) else {
119                return;
120            };
121            self.subscription = Some(subscription.set_entity(&cx.entity(), &cx.to_async()));
122            cx.emit(ChannelBufferEvent::Connected);
123        }
124    }
125
126    pub(crate) fn set_rejoining(&mut self, rejoining: bool) {
127        self.rejoining = rejoining;
128    }
129
130    pub fn remote_id(&self, cx: &App) -> BufferId {
131        self.buffer.read(cx).remote_id()
132    }
133
134    pub fn user_store(&self) -> &Entity<UserStore> {
135        &self.user_store
136    }
137
138    pub(crate) fn replace_collaborators(
139        &mut self,
140        collaborators: Vec<proto::Collaborator>,
141        cx: &mut Context<Self>,
142    ) {
143        let mut new_collaborators = HashMap::default();
144        for collaborator in collaborators {
145            if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
146                new_collaborators.insert(collaborator.peer_id, collaborator);
147            }
148        }
149
150        for old_collaborator in self.collaborators.values() {
151            if !new_collaborators.contains_key(&old_collaborator.peer_id) {
152                self.buffer.update(cx, |buffer, cx| {
153                    buffer.remove_peer(old_collaborator.replica_id, cx)
154                });
155            }
156        }
157        self.collaborators = new_collaborators;
158        cx.emit(ChannelBufferEvent::CollaboratorsChanged);
159        cx.notify();
160    }
161
162    async fn handle_update_channel_buffer(
163        this: Entity<Self>,
164        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
165        mut cx: AsyncApp,
166    ) -> Result<()> {
167        let ops = update_channel_buffer
168            .payload
169            .operations
170            .into_iter()
171            .map(language::proto::deserialize_operation)
172            .collect::<Result<Vec<_>, _>>()?;
173
174        this.update(&mut cx, |this, cx| {
175            cx.notify();
176            this.buffer
177                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
178        });
179
180        Ok(())
181    }
182
183    async fn handle_update_channel_buffer_collaborators(
184        this: Entity<Self>,
185        message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
186        mut cx: AsyncApp,
187    ) -> Result<()> {
188        this.update(&mut cx, |this, cx| {
189            this.replace_collaborators(message.payload.collaborators, cx);
190            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
191            cx.notify();
192        });
193        Ok(())
194    }
195
196    fn on_buffer_update(
197        &mut self,
198        _: Entity<language::Buffer>,
199        event: &language::BufferEvent,
200        cx: &mut Context<Self>,
201    ) {
202        match event {
203            language::BufferEvent::Operation {
204                operation,
205                is_local: true,
206            } => {
207                if *ZED_ALWAYS_ACTIVE
208                    && let language::Operation::UpdateSelections { selections, .. } = operation
209                    && selections.is_empty()
210                {
211                    return;
212                }
213                let operation = language::proto::serialize_operation(operation);
214                if self.rejoining {
215                    return;
216                }
217                self.client
218                    .send(proto::UpdateChannelBuffer {
219                        channel_id: self.channel_id.0,
220                        operations: vec![operation],
221                    })
222                    .log_err();
223            }
224            language::BufferEvent::Edited => {
225                cx.emit(ChannelBufferEvent::BufferEdited);
226            }
227            _ => {}
228        }
229    }
230
231    pub fn acknowledge_buffer_version(&mut self, cx: &mut Context<ChannelBuffer>) {
232        let buffer = self.buffer.read(cx);
233        let version = buffer.version();
234        let buffer_id = buffer.remote_id().into();
235        let client = self.client.clone();
236        let epoch = self.epoch();
237
238        self.acknowledge_task = Some(cx.spawn(async move |_, cx| {
239            cx.background_executor()
240                .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
241                .await;
242            client
243                .send(proto::AckBufferOperation {
244                    buffer_id,
245                    epoch,
246                    version: serialize_version(&version),
247                })
248                .ok();
249            Ok(())
250        }));
251    }
252
253    pub fn epoch(&self) -> u64 {
254        self.buffer_epoch
255    }
256
257    pub fn buffer(&self) -> Entity<language::Buffer> {
258        self.buffer.clone()
259    }
260
261    pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
262        &self.collaborators
263    }
264
265    pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
266        self.channel_store
267            .read(cx)
268            .channel_for_id(self.channel_id)
269            .cloned()
270    }
271
272    pub(crate) fn disconnect(&mut self, cx: &mut Context<Self>) {
273        log::info!("channel buffer {} disconnected", self.channel_id);
274        if self.connected {
275            self.connected = false;
276            self.rejoining = false;
277            self.subscription.take();
278            cx.emit(ChannelBufferEvent::Disconnected);
279            cx.notify()
280        }
281    }
282
283    pub(crate) fn channel_changed(&mut self, cx: &mut Context<Self>) {
284        cx.emit(ChannelBufferEvent::ChannelChanged);
285        cx.notify()
286    }
287
288    pub fn is_connected(&self) -> bool {
289        self.connected
290    }
291
292    pub fn replica_id(&self, cx: &App) -> ReplicaId {
293        self.buffer.read(cx).replica_id()
294    }
295}