channel_buffer.rs

  1use crate::Channel;
  2use anyhow::Result;
  3use client::Client;
  4use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle};
  5use rpc::{proto, TypedEnvelope};
  6use std::sync::Arc;
  7use util::ResultExt;
  8
  9pub(crate) fn init(client: &Arc<Client>) {
 10    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
 11    client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator);
 12    client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator);
 13}
 14
 15pub struct ChannelBuffer {
 16    pub(crate) channel: Arc<Channel>,
 17    connected: bool,
 18    collaborators: Vec<proto::Collaborator>,
 19    buffer: ModelHandle<language::Buffer>,
 20    buffer_epoch: u64,
 21    client: Arc<Client>,
 22    subscription: Option<client::Subscription>,
 23}
 24
 25pub enum Event {
 26    CollaboratorsChanged,
 27    Disconnected,
 28}
 29
 30impl Entity for ChannelBuffer {
 31    type Event = Event;
 32
 33    fn release(&mut self, _: &mut AppContext) {
 34        if self.connected {
 35            self.client
 36                .send(proto::LeaveChannelBuffer {
 37                    channel_id: self.channel.id,
 38                })
 39                .log_err();
 40        }
 41    }
 42}
 43
 44impl ChannelBuffer {
 45    pub(crate) async fn new(
 46        channel: Arc<Channel>,
 47        client: Arc<Client>,
 48        mut cx: AsyncAppContext,
 49    ) -> Result<ModelHandle<Self>> {
 50        let response = client
 51            .request(proto::JoinChannelBuffer {
 52                channel_id: channel.id,
 53            })
 54            .await?;
 55
 56        let base_text = response.base_text;
 57        let operations = response
 58            .operations
 59            .into_iter()
 60            .map(language::proto::deserialize_operation)
 61            .collect::<Result<Vec<_>, _>>()?;
 62
 63        let collaborators = response.collaborators;
 64
 65        let buffer = cx.add_model(|_| {
 66            language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
 67        });
 68        buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 69
 70        let subscription = client.subscribe_to_entity(channel.id)?;
 71
 72        anyhow::Ok(cx.add_model(|cx| {
 73            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 74
 75            Self {
 76                buffer,
 77                buffer_epoch: response.epoch,
 78                client,
 79                connected: true,
 80                collaborators,
 81                channel,
 82                subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
 83            }
 84        }))
 85    }
 86
 87    pub(crate) fn replace_collaborators(
 88        &mut self,
 89        collaborators: Vec<proto::Collaborator>,
 90        cx: &mut ModelContext<Self>,
 91    ) {
 92        for old_collaborator in &self.collaborators {
 93            if collaborators
 94                .iter()
 95                .any(|c| c.replica_id == old_collaborator.replica_id)
 96            {
 97                self.buffer.update(cx, |buffer, cx| {
 98                    buffer.remove_peer(old_collaborator.replica_id as u16, cx)
 99                });
100            }
101        }
102        self.collaborators = collaborators;
103        cx.emit(Event::CollaboratorsChanged);
104        cx.notify();
105    }
106
107    async fn handle_update_channel_buffer(
108        this: ModelHandle<Self>,
109        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
110        _: Arc<Client>,
111        mut cx: AsyncAppContext,
112    ) -> Result<()> {
113        let ops = update_channel_buffer
114            .payload
115            .operations
116            .into_iter()
117            .map(language::proto::deserialize_operation)
118            .collect::<Result<Vec<_>, _>>()?;
119
120        this.update(&mut cx, |this, cx| {
121            cx.notify();
122            this.buffer
123                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
124        })?;
125
126        Ok(())
127    }
128
129    async fn handle_add_channel_buffer_collaborator(
130        this: ModelHandle<Self>,
131        envelope: TypedEnvelope<proto::AddChannelBufferCollaborator>,
132        _: Arc<Client>,
133        mut cx: AsyncAppContext,
134    ) -> Result<()> {
135        let collaborator = envelope.payload.collaborator.ok_or_else(|| {
136            anyhow::anyhow!(
137                "Should have gotten a collaborator in the AddChannelBufferCollaborator message"
138            )
139        })?;
140
141        this.update(&mut cx, |this, cx| {
142            this.collaborators.push(collaborator);
143            cx.emit(Event::CollaboratorsChanged);
144            cx.notify();
145        });
146
147        Ok(())
148    }
149
150    async fn handle_remove_channel_buffer_collaborator(
151        this: ModelHandle<Self>,
152        message: TypedEnvelope<proto::RemoveChannelBufferCollaborator>,
153        _: Arc<Client>,
154        mut cx: AsyncAppContext,
155    ) -> Result<()> {
156        this.update(&mut cx, |this, cx| {
157            this.collaborators.retain(|collaborator| {
158                if collaborator.peer_id == message.payload.peer_id {
159                    this.buffer.update(cx, |buffer, cx| {
160                        buffer.remove_peer(collaborator.replica_id as u16, cx)
161                    });
162                    false
163                } else {
164                    true
165                }
166            });
167            cx.emit(Event::CollaboratorsChanged);
168            cx.notify();
169        });
170
171        Ok(())
172    }
173
174    fn on_buffer_update(
175        &mut self,
176        _: ModelHandle<language::Buffer>,
177        event: &language::Event,
178        _: &mut ModelContext<Self>,
179    ) {
180        if let language::Event::Operation(operation) = event {
181            let operation = language::proto::serialize_operation(operation);
182            self.client
183                .send(proto::UpdateChannelBuffer {
184                    channel_id: self.channel.id,
185                    operations: vec![operation],
186                })
187                .log_err();
188        }
189    }
190
191    pub fn epoch(&self) -> u64 {
192        self.buffer_epoch
193    }
194
195    pub fn buffer(&self) -> ModelHandle<language::Buffer> {
196        self.buffer.clone()
197    }
198
199    pub fn collaborators(&self) -> &[proto::Collaborator] {
200        &self.collaborators
201    }
202
203    pub fn channel(&self) -> Arc<Channel> {
204        self.channel.clone()
205    }
206
207    pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
208        log::info!("channel buffer {} disconnected", self.channel.id);
209        if self.connected {
210            self.connected = false;
211            self.subscription.take();
212            cx.emit(Event::Disconnected);
213            cx.notify()
214        }
215    }
216
217    pub fn is_connected(&self) -> bool {
218        self.connected
219    }
220
221    pub fn replica_id(&self, cx: &AppContext) -> u16 {
222        self.buffer.read(cx).replica_id()
223    }
224}