channel_buffer.rs

  1use crate::Channel;
  2use anyhow::Result;
  3use client::Client;
  4use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle};
  5use rpc::{proto, TypedEnvelope};
  6use std::sync::Arc;
  7use util::ResultExt;
  8
  9pub(crate) fn init(client: &Arc<Client>) {
 10    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
 11    client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator);
 12    client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator);
 13}
 14
 15pub struct ChannelBuffer {
 16    pub(crate) channel: Arc<Channel>,
 17    connected: bool,
 18    collaborators: Vec<proto::Collaborator>,
 19    buffer: ModelHandle<language::Buffer>,
 20    client: Arc<Client>,
 21    subscription: Option<client::Subscription>,
 22}
 23
 24pub enum Event {
 25    CollaboratorsChanged,
 26    Disconnected,
 27}
 28
 29impl Entity for ChannelBuffer {
 30    type Event = Event;
 31
 32    fn release(&mut self, _: &mut AppContext) {
 33        if self.connected {
 34            self.client
 35                .send(proto::LeaveChannelBuffer {
 36                    channel_id: self.channel.id,
 37                })
 38                .log_err();
 39        }
 40    }
 41}
 42
 43impl ChannelBuffer {
 44    pub(crate) async fn new(
 45        channel: Arc<Channel>,
 46        client: Arc<Client>,
 47        mut cx: AsyncAppContext,
 48    ) -> Result<ModelHandle<Self>> {
 49        let response = client
 50            .request(proto::JoinChannelBuffer {
 51                channel_id: channel.id,
 52            })
 53            .await?;
 54
 55        let base_text = response.base_text;
 56        let operations = response
 57            .operations
 58            .into_iter()
 59            .map(language::proto::deserialize_operation)
 60            .collect::<Result<Vec<_>, _>>()?;
 61
 62        let collaborators = response.collaborators;
 63
 64        let buffer = cx.add_model(|_| {
 65            language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
 66        });
 67        buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 68
 69        let subscription = client.subscribe_to_entity(channel.id)?;
 70
 71        anyhow::Ok(cx.add_model(|cx| {
 72            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 73
 74            Self {
 75                buffer,
 76                client,
 77                connected: true,
 78                collaborators,
 79                channel,
 80                subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
 81            }
 82        }))
 83    }
 84
 85    async fn handle_update_channel_buffer(
 86        this: ModelHandle<Self>,
 87        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
 88        _: Arc<Client>,
 89        mut cx: AsyncAppContext,
 90    ) -> Result<()> {
 91        let ops = update_channel_buffer
 92            .payload
 93            .operations
 94            .into_iter()
 95            .map(language::proto::deserialize_operation)
 96            .collect::<Result<Vec<_>, _>>()?;
 97
 98        this.update(&mut cx, |this, cx| {
 99            cx.notify();
100            this.buffer
101                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
102        })?;
103
104        Ok(())
105    }
106
107    async fn handle_add_channel_buffer_collaborator(
108        this: ModelHandle<Self>,
109        envelope: TypedEnvelope<proto::AddChannelBufferCollaborator>,
110        _: Arc<Client>,
111        mut cx: AsyncAppContext,
112    ) -> Result<()> {
113        let collaborator = envelope.payload.collaborator.ok_or_else(|| {
114            anyhow::anyhow!(
115                "Should have gotten a collaborator in the AddChannelBufferCollaborator message"
116            )
117        })?;
118
119        this.update(&mut cx, |this, cx| {
120            this.collaborators.push(collaborator);
121            cx.emit(Event::CollaboratorsChanged);
122            cx.notify();
123        });
124
125        Ok(())
126    }
127
128    async fn handle_remove_channel_buffer_collaborator(
129        this: ModelHandle<Self>,
130        message: TypedEnvelope<proto::RemoveChannelBufferCollaborator>,
131        _: Arc<Client>,
132        mut cx: AsyncAppContext,
133    ) -> Result<()> {
134        this.update(&mut cx, |this, cx| {
135            this.collaborators.retain(|collaborator| {
136                if collaborator.peer_id == message.payload.peer_id {
137                    this.buffer.update(cx, |buffer, cx| {
138                        buffer.remove_peer(collaborator.replica_id as u16, cx)
139                    });
140                    false
141                } else {
142                    true
143                }
144            });
145            cx.emit(Event::CollaboratorsChanged);
146            cx.notify();
147        });
148
149        Ok(())
150    }
151
152    fn on_buffer_update(
153        &mut self,
154        _: ModelHandle<language::Buffer>,
155        event: &language::Event,
156        _: &mut ModelContext<Self>,
157    ) {
158        if let language::Event::Operation(operation) = event {
159            let operation = language::proto::serialize_operation(operation);
160            self.client
161                .send(proto::UpdateChannelBuffer {
162                    channel_id: self.channel.id,
163                    operations: vec![operation],
164                })
165                .log_err();
166        }
167    }
168
169    pub fn buffer(&self) -> ModelHandle<language::Buffer> {
170        self.buffer.clone()
171    }
172
173    pub fn collaborators(&self) -> &[proto::Collaborator] {
174        &self.collaborators
175    }
176
177    pub fn channel(&self) -> Arc<Channel> {
178        self.channel.clone()
179    }
180
181    pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
182        if self.connected {
183            self.connected = false;
184            self.subscription.take();
185            cx.emit(Event::Disconnected);
186            cx.notify()
187        }
188    }
189
190    pub fn is_connected(&self) -> bool {
191        self.connected
192    }
193
194    pub fn replica_id(&self, cx: &AppContext) -> u16 {
195        self.buffer.read(cx).replica_id()
196    }
197}