channel_buffer.rs

  1use crate::{Channel, ChannelStore};
  2use anyhow::Result;
  3use client::{ChannelId, Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
  4use collections::HashMap;
  5use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task};
  6use language::proto::serialize_version;
  7use rpc::{
  8    AnyProtoClient, TypedEnvelope,
  9    proto::{self, PeerId},
 10};
 11use std::{sync::Arc, time::Duration};
 12use text::BufferId;
 13use util::ResultExt;
 14
 15pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
 16
 17pub(crate) fn init(client: &AnyProtoClient) {
 18    client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer);
 19    client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
 20}
 21
 22pub struct ChannelBuffer {
 23    pub channel_id: ChannelId,
 24    connected: bool,
 25    collaborators: HashMap<PeerId, Collaborator>,
 26    user_store: Entity<UserStore>,
 27    channel_store: Entity<ChannelStore>,
 28    buffer: Entity<language::Buffer>,
 29    buffer_epoch: u64,
 30    client: Arc<Client>,
 31    subscription: Option<client::Subscription>,
 32    acknowledge_task: Option<Task<Result<()>>>,
 33}
 34
 35pub enum ChannelBufferEvent {
 36    CollaboratorsChanged,
 37    Disconnected,
 38    BufferEdited,
 39    ChannelChanged,
 40}
 41
 42impl EventEmitter<ChannelBufferEvent> for ChannelBuffer {}
 43
 44impl ChannelBuffer {
 45    pub(crate) async fn new(
 46        channel: Arc<Channel>,
 47        client: Arc<Client>,
 48        user_store: Entity<UserStore>,
 49        channel_store: Entity<ChannelStore>,
 50        cx: &mut AsyncApp,
 51    ) -> Result<Entity<Self>> {
 52        let response = client
 53            .request(proto::JoinChannelBuffer {
 54                channel_id: channel.id.0,
 55            })
 56            .await?;
 57        let buffer_id = BufferId::new(response.buffer_id)?;
 58        let base_text = response.base_text;
 59        let operations = response
 60            .operations
 61            .into_iter()
 62            .map(language::proto::deserialize_operation)
 63            .collect::<Result<Vec<_>, _>>()?;
 64
 65        let buffer = cx.new(|cx| {
 66            let capability = channel_store.read(cx).channel_capability(channel.id);
 67            language::Buffer::remote(buffer_id, response.replica_id as u16, capability, base_text)
 68        })?;
 69        buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 70
 71        let subscription = client.subscribe_to_entity(channel.id.0)?;
 72
 73        anyhow::Ok(cx.new(|cx| {
 74            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 75            cx.on_release(Self::release).detach();
 76            let mut this = Self {
 77                buffer,
 78                buffer_epoch: response.epoch,
 79                client,
 80                connected: true,
 81                collaborators: Default::default(),
 82                acknowledge_task: None,
 83                channel_id: channel.id,
 84                subscription: Some(subscription.set_entity(&cx.entity(), &mut cx.to_async())),
 85                user_store,
 86                channel_store,
 87            };
 88            this.replace_collaborators(response.collaborators, cx);
 89            this
 90        })?)
 91    }
 92
 93    fn release(&mut self, _: &mut App) {
 94        if self.connected {
 95            if let Some(task) = self.acknowledge_task.take() {
 96                task.detach();
 97            }
 98            self.client
 99                .send(proto::LeaveChannelBuffer {
100                    channel_id: self.channel_id.0,
101                })
102                .log_err();
103        }
104    }
105
106    pub fn remote_id(&self, cx: &App) -> BufferId {
107        self.buffer.read(cx).remote_id()
108    }
109
110    pub fn user_store(&self) -> &Entity<UserStore> {
111        &self.user_store
112    }
113
114    pub(crate) fn replace_collaborators(
115        &mut self,
116        collaborators: Vec<proto::Collaborator>,
117        cx: &mut Context<Self>,
118    ) {
119        let mut new_collaborators = HashMap::default();
120        for collaborator in collaborators {
121            if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
122                new_collaborators.insert(collaborator.peer_id, collaborator);
123            }
124        }
125
126        for (_, old_collaborator) in &self.collaborators {
127            if !new_collaborators.contains_key(&old_collaborator.peer_id) {
128                self.buffer.update(cx, |buffer, cx| {
129                    buffer.remove_peer(old_collaborator.replica_id, cx)
130                });
131            }
132        }
133        self.collaborators = new_collaborators;
134        cx.emit(ChannelBufferEvent::CollaboratorsChanged);
135        cx.notify();
136    }
137
138    async fn handle_update_channel_buffer(
139        this: Entity<Self>,
140        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
141        mut cx: AsyncApp,
142    ) -> Result<()> {
143        let ops = update_channel_buffer
144            .payload
145            .operations
146            .into_iter()
147            .map(language::proto::deserialize_operation)
148            .collect::<Result<Vec<_>, _>>()?;
149
150        this.update(&mut cx, |this, cx| {
151            cx.notify();
152            this.buffer
153                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
154        })?;
155
156        Ok(())
157    }
158
159    async fn handle_update_channel_buffer_collaborators(
160        this: Entity<Self>,
161        message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
162        mut cx: AsyncApp,
163    ) -> Result<()> {
164        this.update(&mut cx, |this, cx| {
165            this.replace_collaborators(message.payload.collaborators, cx);
166            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
167            cx.notify();
168        })
169    }
170
171    fn on_buffer_update(
172        &mut self,
173        _: Entity<language::Buffer>,
174        event: &language::BufferEvent,
175        cx: &mut Context<Self>,
176    ) {
177        match event {
178            language::BufferEvent::Operation {
179                operation,
180                is_local: true,
181            } => {
182                if *ZED_ALWAYS_ACTIVE {
183                    if let language::Operation::UpdateSelections { selections, .. } = operation {
184                        if selections.is_empty() {
185                            return;
186                        }
187                    }
188                }
189                let operation = language::proto::serialize_operation(operation);
190                self.client
191                    .send(proto::UpdateChannelBuffer {
192                        channel_id: self.channel_id.0,
193                        operations: vec![operation],
194                    })
195                    .log_err();
196            }
197            language::BufferEvent::Edited => {
198                cx.emit(ChannelBufferEvent::BufferEdited);
199            }
200            _ => {}
201        }
202    }
203
204    pub fn acknowledge_buffer_version(&mut self, cx: &mut Context<ChannelBuffer>) {
205        let buffer = self.buffer.read(cx);
206        let version = buffer.version();
207        let buffer_id = buffer.remote_id().into();
208        let client = self.client.clone();
209        let epoch = self.epoch();
210
211        self.acknowledge_task = Some(cx.spawn(async move |_, cx| {
212            cx.background_executor()
213                .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
214                .await;
215            client
216                .send(proto::AckBufferOperation {
217                    buffer_id,
218                    epoch,
219                    version: serialize_version(&version),
220                })
221                .ok();
222            Ok(())
223        }));
224    }
225
226    pub fn epoch(&self) -> u64 {
227        self.buffer_epoch
228    }
229
230    pub fn buffer(&self) -> Entity<language::Buffer> {
231        self.buffer.clone()
232    }
233
234    pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
235        &self.collaborators
236    }
237
238    pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
239        self.channel_store
240            .read(cx)
241            .channel_for_id(self.channel_id)
242            .cloned()
243    }
244
245    pub(crate) fn disconnect(&mut self, cx: &mut Context<Self>) {
246        log::info!("channel buffer {} disconnected", self.channel_id);
247        if self.connected {
248            self.connected = false;
249            self.subscription.take();
250            cx.emit(ChannelBufferEvent::Disconnected);
251            cx.notify()
252        }
253    }
254
255    pub(crate) fn channel_changed(&mut self, cx: &mut Context<Self>) {
256        cx.emit(ChannelBufferEvent::ChannelChanged);
257        cx.notify()
258    }
259
260    pub fn is_connected(&self) -> bool {
261        self.connected
262    }
263
264    pub fn replica_id(&self, cx: &App) -> u16 {
265        self.buffer.read(cx).replica_id()
266    }
267}