channel_buffer.rs

  1use crate::{Channel, ChannelStore};
  2use anyhow::Result;
  3use client::{ChannelId, Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
  4use collections::HashMap;
  5use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task};
  6use language::proto::serialize_version;
  7use rpc::{
  8    AnyProtoClient, TypedEnvelope,
  9    proto::{self, PeerId},
 10};
 11use std::{sync::Arc, time::Duration};
 12use text::{BufferId, ReplicaId};
 13use util::ResultExt;
 14
 15pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
 16
 17pub(crate) fn init(client: &AnyProtoClient) {
 18    client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer);
 19    client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
 20}
 21
 22pub struct ChannelBuffer {
 23    pub channel_id: ChannelId,
 24    connected: bool,
 25    collaborators: HashMap<PeerId, Collaborator>,
 26    user_store: Entity<UserStore>,
 27    channel_store: Entity<ChannelStore>,
 28    buffer: Entity<language::Buffer>,
 29    buffer_epoch: u64,
 30    client: Arc<Client>,
 31    subscription: Option<client::Subscription>,
 32    acknowledge_task: Option<Task<Result<()>>>,
 33}
 34
 35pub enum ChannelBufferEvent {
 36    CollaboratorsChanged,
 37    Disconnected,
 38    Connected,
 39    BufferEdited,
 40    ChannelChanged,
 41}
 42
 43impl EventEmitter<ChannelBufferEvent> for ChannelBuffer {}
 44
 45impl ChannelBuffer {
 46    pub(crate) async fn new(
 47        channel: Arc<Channel>,
 48        client: Arc<Client>,
 49        user_store: Entity<UserStore>,
 50        channel_store: Entity<ChannelStore>,
 51        cx: &mut AsyncApp,
 52    ) -> Result<Entity<Self>> {
 53        let response = client
 54            .request(proto::JoinChannelBuffer {
 55                channel_id: channel.id.0,
 56            })
 57            .await?;
 58        let buffer_id = BufferId::new(response.buffer_id)?;
 59        let base_text = response.base_text;
 60        let operations = response
 61            .operations
 62            .into_iter()
 63            .map(language::proto::deserialize_operation)
 64            .collect::<Result<Vec<_>, _>>()?;
 65
 66        let buffer = cx.new(|cx| {
 67            let capability = channel_store.read(cx).channel_capability(channel.id);
 68            language::Buffer::remote(
 69                buffer_id,
 70                ReplicaId::new(response.replica_id as u16),
 71                capability,
 72                base_text,
 73                cx.background_executor(),
 74            )
 75        })?;
 76        buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 77
 78        let subscription = client.subscribe_to_entity(channel.id.0)?;
 79
 80        anyhow::Ok(cx.new(|cx| {
 81            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 82            cx.on_release(Self::release).detach();
 83            let mut this = Self {
 84                buffer,
 85                buffer_epoch: response.epoch,
 86                client,
 87                connected: true,
 88                collaborators: Default::default(),
 89                acknowledge_task: None,
 90                channel_id: channel.id,
 91                subscription: Some(subscription.set_entity(&cx.entity(), &cx.to_async())),
 92                user_store,
 93                channel_store,
 94            };
 95            this.replace_collaborators(response.collaborators, cx);
 96            this
 97        })?)
 98    }
 99
100    fn release(&mut self, _: &mut App) {
101        if self.connected {
102            if let Some(task) = self.acknowledge_task.take() {
103                task.detach();
104            }
105            self.client
106                .send(proto::LeaveChannelBuffer {
107                    channel_id: self.channel_id.0,
108                })
109                .log_err();
110        }
111    }
112
113    pub fn connected(&mut self, cx: &mut Context<Self>) {
114        self.connected = true;
115        if self.subscription.is_none() {
116            let Ok(subscription) = self.client.subscribe_to_entity(self.channel_id.0) else {
117                return;
118            };
119            self.subscription = Some(subscription.set_entity(&cx.entity(), &cx.to_async()));
120            cx.emit(ChannelBufferEvent::Connected);
121        }
122    }
123
124    pub fn remote_id(&self, cx: &App) -> BufferId {
125        self.buffer.read(cx).remote_id()
126    }
127
128    pub fn user_store(&self) -> &Entity<UserStore> {
129        &self.user_store
130    }
131
132    pub(crate) fn replace_collaborators(
133        &mut self,
134        collaborators: Vec<proto::Collaborator>,
135        cx: &mut Context<Self>,
136    ) {
137        let mut new_collaborators = HashMap::default();
138        for collaborator in collaborators {
139            if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
140                new_collaborators.insert(collaborator.peer_id, collaborator);
141            }
142        }
143
144        for old_collaborator in self.collaborators.values() {
145            if !new_collaborators.contains_key(&old_collaborator.peer_id) {
146                self.buffer.update(cx, |buffer, cx| {
147                    buffer.remove_peer(old_collaborator.replica_id, cx)
148                });
149            }
150        }
151        self.collaborators = new_collaborators;
152        cx.emit(ChannelBufferEvent::CollaboratorsChanged);
153        cx.notify();
154    }
155
156    async fn handle_update_channel_buffer(
157        this: Entity<Self>,
158        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
159        mut cx: AsyncApp,
160    ) -> Result<()> {
161        let ops = update_channel_buffer
162            .payload
163            .operations
164            .into_iter()
165            .map(language::proto::deserialize_operation)
166            .collect::<Result<Vec<_>, _>>()?;
167
168        this.update(&mut cx, |this, cx| {
169            cx.notify();
170            this.buffer
171                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
172        })?;
173
174        Ok(())
175    }
176
177    async fn handle_update_channel_buffer_collaborators(
178        this: Entity<Self>,
179        message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
180        mut cx: AsyncApp,
181    ) -> Result<()> {
182        this.update(&mut cx, |this, cx| {
183            this.replace_collaborators(message.payload.collaborators, cx);
184            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
185            cx.notify();
186        })
187    }
188
189    fn on_buffer_update(
190        &mut self,
191        _: Entity<language::Buffer>,
192        event: &language::BufferEvent,
193        cx: &mut Context<Self>,
194    ) {
195        match event {
196            language::BufferEvent::Operation {
197                operation,
198                is_local: true,
199            } => {
200                if *ZED_ALWAYS_ACTIVE
201                    && let language::Operation::UpdateSelections { selections, .. } = operation
202                    && selections.is_empty()
203                {
204                    return;
205                }
206                let operation = language::proto::serialize_operation(operation);
207                self.client
208                    .send(proto::UpdateChannelBuffer {
209                        channel_id: self.channel_id.0,
210                        operations: vec![operation],
211                    })
212                    .log_err();
213            }
214            language::BufferEvent::Edited => {
215                cx.emit(ChannelBufferEvent::BufferEdited);
216            }
217            _ => {}
218        }
219    }
220
221    pub fn acknowledge_buffer_version(&mut self, cx: &mut Context<ChannelBuffer>) {
222        let buffer = self.buffer.read(cx);
223        let version = buffer.version();
224        let buffer_id = buffer.remote_id().into();
225        let client = self.client.clone();
226        let epoch = self.epoch();
227
228        self.acknowledge_task = Some(cx.spawn(async move |_, cx| {
229            cx.background_executor()
230                .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
231                .await;
232            client
233                .send(proto::AckBufferOperation {
234                    buffer_id,
235                    epoch,
236                    version: serialize_version(&version),
237                })
238                .ok();
239            Ok(())
240        }));
241    }
242
243    pub fn epoch(&self) -> u64 {
244        self.buffer_epoch
245    }
246
247    pub fn buffer(&self) -> Entity<language::Buffer> {
248        self.buffer.clone()
249    }
250
251    pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
252        &self.collaborators
253    }
254
255    pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
256        self.channel_store
257            .read(cx)
258            .channel_for_id(self.channel_id)
259            .cloned()
260    }
261
262    pub(crate) fn disconnect(&mut self, cx: &mut Context<Self>) {
263        log::info!("channel buffer {} disconnected", self.channel_id);
264        if self.connected {
265            self.connected = false;
266            self.subscription.take();
267            cx.emit(ChannelBufferEvent::Disconnected);
268            cx.notify()
269        }
270    }
271
272    pub(crate) fn channel_changed(&mut self, cx: &mut Context<Self>) {
273        cx.emit(ChannelBufferEvent::ChannelChanged);
274        cx.notify()
275    }
276
277    pub fn is_connected(&self) -> bool {
278        self.connected
279    }
280
281    pub fn replica_id(&self, cx: &App) -> ReplicaId {
282        self.buffer.read(cx).replica_id()
283    }
284}