channel_buffer.rs

  1use crate::Channel;
  2use anyhow::Result;
  3use client::Client;
  4use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle};
  5use rpc::{proto, TypedEnvelope};
  6use std::sync::Arc;
  7use util::ResultExt;
  8
  9pub(crate) fn init(client: &Arc<Client>) {
 10    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
 11    client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator);
 12    client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator);
 13    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborator);
 14}
 15
 16pub struct ChannelBuffer {
 17    pub(crate) channel: Arc<Channel>,
 18    connected: bool,
 19    collaborators: Vec<proto::Collaborator>,
 20    buffer: ModelHandle<language::Buffer>,
 21    buffer_epoch: u64,
 22    client: Arc<Client>,
 23    subscription: Option<client::Subscription>,
 24}
 25
 26pub enum ChannelBufferEvent {
 27    CollaboratorsChanged,
 28    Disconnected,
 29}
 30
 31impl Entity for ChannelBuffer {
 32    type Event = ChannelBufferEvent;
 33
 34    fn release(&mut self, _: &mut AppContext) {
 35        if self.connected {
 36            self.client
 37                .send(proto::LeaveChannelBuffer {
 38                    channel_id: self.channel.id,
 39                })
 40                .log_err();
 41        }
 42    }
 43}
 44
 45impl ChannelBuffer {
 46    pub(crate) async fn new(
 47        channel: Arc<Channel>,
 48        client: Arc<Client>,
 49        mut cx: AsyncAppContext,
 50    ) -> Result<ModelHandle<Self>> {
 51        let response = client
 52            .request(proto::JoinChannelBuffer {
 53                channel_id: channel.id,
 54            })
 55            .await?;
 56
 57        let base_text = response.base_text;
 58        let operations = response
 59            .operations
 60            .into_iter()
 61            .map(language::proto::deserialize_operation)
 62            .collect::<Result<Vec<_>, _>>()?;
 63
 64        let collaborators = response.collaborators;
 65
 66        let buffer = cx.add_model(|_| {
 67            language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
 68        });
 69        buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 70
 71        let subscription = client.subscribe_to_entity(channel.id)?;
 72
 73        anyhow::Ok(cx.add_model(|cx| {
 74            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 75
 76            Self {
 77                buffer,
 78                buffer_epoch: response.epoch,
 79                client,
 80                connected: true,
 81                collaborators,
 82                channel,
 83                subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
 84            }
 85        }))
 86    }
 87
 88    pub(crate) fn replace_collaborators(
 89        &mut self,
 90        collaborators: Vec<proto::Collaborator>,
 91        cx: &mut ModelContext<Self>,
 92    ) {
 93        for old_collaborator in &self.collaborators {
 94            if collaborators
 95                .iter()
 96                .any(|c| c.replica_id == old_collaborator.replica_id)
 97            {
 98                self.buffer.update(cx, |buffer, cx| {
 99                    buffer.remove_peer(old_collaborator.replica_id as u16, cx)
100                });
101            }
102        }
103        self.collaborators = collaborators;
104        cx.emit(ChannelBufferEvent::CollaboratorsChanged);
105        cx.notify();
106    }
107
108    async fn handle_update_channel_buffer(
109        this: ModelHandle<Self>,
110        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
111        _: Arc<Client>,
112        mut cx: AsyncAppContext,
113    ) -> Result<()> {
114        let ops = update_channel_buffer
115            .payload
116            .operations
117            .into_iter()
118            .map(language::proto::deserialize_operation)
119            .collect::<Result<Vec<_>, _>>()?;
120
121        this.update(&mut cx, |this, cx| {
122            cx.notify();
123            this.buffer
124                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
125        })?;
126
127        Ok(())
128    }
129
130    async fn handle_add_channel_buffer_collaborator(
131        this: ModelHandle<Self>,
132        envelope: TypedEnvelope<proto::AddChannelBufferCollaborator>,
133        _: Arc<Client>,
134        mut cx: AsyncAppContext,
135    ) -> Result<()> {
136        let collaborator = envelope.payload.collaborator.ok_or_else(|| {
137            anyhow::anyhow!(
138                "Should have gotten a collaborator in the AddChannelBufferCollaborator message"
139            )
140        })?;
141
142        this.update(&mut cx, |this, cx| {
143            this.collaborators.push(collaborator);
144            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
145            cx.notify();
146        });
147
148        Ok(())
149    }
150
151    async fn handle_remove_channel_buffer_collaborator(
152        this: ModelHandle<Self>,
153        message: TypedEnvelope<proto::RemoveChannelBufferCollaborator>,
154        _: Arc<Client>,
155        mut cx: AsyncAppContext,
156    ) -> Result<()> {
157        this.update(&mut cx, |this, cx| {
158            this.collaborators.retain(|collaborator| {
159                if collaborator.peer_id == message.payload.peer_id {
160                    this.buffer.update(cx, |buffer, cx| {
161                        buffer.remove_peer(collaborator.replica_id as u16, cx)
162                    });
163                    false
164                } else {
165                    true
166                }
167            });
168            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
169            cx.notify();
170        });
171
172        Ok(())
173    }
174
175    async fn handle_update_channel_buffer_collaborator(
176        this: ModelHandle<Self>,
177        message: TypedEnvelope<proto::UpdateChannelBufferCollaborator>,
178        _: Arc<Client>,
179        mut cx: AsyncAppContext,
180    ) -> Result<()> {
181        this.update(&mut cx, |this, cx| {
182            for collaborator in &mut this.collaborators {
183                if collaborator.peer_id == message.payload.old_peer_id {
184                    collaborator.peer_id = message.payload.new_peer_id;
185                    break;
186                }
187            }
188            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
189            cx.notify();
190        });
191
192        Ok(())
193    }
194
195    fn on_buffer_update(
196        &mut self,
197        _: ModelHandle<language::Buffer>,
198        event: &language::Event,
199        _: &mut ModelContext<Self>,
200    ) {
201        if let language::Event::Operation(operation) = event {
202            let operation = language::proto::serialize_operation(operation);
203            self.client
204                .send(proto::UpdateChannelBuffer {
205                    channel_id: self.channel.id,
206                    operations: vec![operation],
207                })
208                .log_err();
209        }
210    }
211
212    pub fn epoch(&self) -> u64 {
213        self.buffer_epoch
214    }
215
216    pub fn buffer(&self) -> ModelHandle<language::Buffer> {
217        self.buffer.clone()
218    }
219
220    pub fn collaborators(&self) -> &[proto::Collaborator] {
221        &self.collaborators
222    }
223
224    pub fn channel(&self) -> Arc<Channel> {
225        self.channel.clone()
226    }
227
228    pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
229        log::info!("channel buffer {} disconnected", self.channel.id);
230        if self.connected {
231            self.connected = false;
232            self.subscription.take();
233            cx.emit(ChannelBufferEvent::Disconnected);
234            cx.notify()
235        }
236    }
237
238    pub fn is_connected(&self) -> bool {
239        self.connected
240    }
241
242    pub fn replica_id(&self, cx: &AppContext) -> u16 {
243        self.buffer.read(cx).replica_id()
244    }
245}