channel_buffer.rs

  1use crate::Channel;
  2use anyhow::Result;
  3use client::{Client, Collaborator, UserStore};
  4use collections::HashMap;
  5use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle};
  6use rpc::{
  7    proto::{self, PeerId},
  8    TypedEnvelope,
  9};
 10use std::sync::Arc;
 11use util::ResultExt;
 12
 13pub(crate) fn init(client: &Arc<Client>) {
 14    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
 15    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
 16}
 17
 18pub struct ChannelBuffer {
 19    pub(crate) channel: Arc<Channel>,
 20    connected: bool,
 21    collaborators: HashMap<PeerId, Collaborator>,
 22    user_store: ModelHandle<UserStore>,
 23    buffer: ModelHandle<language::Buffer>,
 24    buffer_epoch: u64,
 25    client: Arc<Client>,
 26    subscription: Option<client::Subscription>,
 27}
 28
 29pub enum ChannelBufferEvent {
 30    CollaboratorsChanged,
 31    Disconnected,
 32}
 33
 34impl Entity for ChannelBuffer {
 35    type Event = ChannelBufferEvent;
 36
 37    fn release(&mut self, _: &mut AppContext) {
 38        if self.connected {
 39            self.client
 40                .send(proto::LeaveChannelBuffer {
 41                    channel_id: self.channel.id,
 42                })
 43                .log_err();
 44        }
 45    }
 46}
 47
 48impl ChannelBuffer {
 49    pub(crate) async fn new(
 50        channel: Arc<Channel>,
 51        client: Arc<Client>,
 52        user_store: ModelHandle<UserStore>,
 53        mut cx: AsyncAppContext,
 54    ) -> Result<ModelHandle<Self>> {
 55        let response = client
 56            .request(proto::JoinChannelBuffer {
 57                channel_id: channel.id,
 58            })
 59            .await?;
 60
 61        let base_text = response.base_text;
 62        let operations = response
 63            .operations
 64            .into_iter()
 65            .map(language::proto::deserialize_operation)
 66            .collect::<Result<Vec<_>, _>>()?;
 67
 68        let buffer = cx.add_model(|_| {
 69            language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
 70        });
 71        buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 72
 73        let subscription = client.subscribe_to_entity(channel.id)?;
 74
 75        anyhow::Ok(cx.add_model(|cx| {
 76            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 77
 78            let mut this = Self {
 79                buffer,
 80                buffer_epoch: response.epoch,
 81                client,
 82                connected: true,
 83                collaborators: Default::default(),
 84                channel,
 85                subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
 86                user_store,
 87            };
 88            this.replace_collaborators(response.collaborators, cx);
 89            this
 90        }))
 91    }
 92
 93    pub fn user_store(&self) -> &ModelHandle<UserStore> {
 94        &self.user_store
 95    }
 96
 97    pub(crate) fn replace_collaborators(
 98        &mut self,
 99        collaborators: Vec<proto::Collaborator>,
100        cx: &mut ModelContext<Self>,
101    ) {
102        let mut new_collaborators = HashMap::default();
103        for collaborator in collaborators {
104            if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
105                new_collaborators.insert(collaborator.peer_id, collaborator);
106            }
107        }
108
109        for (_, old_collaborator) in &self.collaborators {
110            if !new_collaborators.contains_key(&old_collaborator.peer_id) {
111                self.buffer.update(cx, |buffer, cx| {
112                    buffer.remove_peer(old_collaborator.replica_id as u16, cx)
113                });
114            }
115        }
116        self.collaborators = new_collaborators;
117        cx.emit(ChannelBufferEvent::CollaboratorsChanged);
118        cx.notify();
119    }
120
121    async fn handle_update_channel_buffer(
122        this: ModelHandle<Self>,
123        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
124        _: Arc<Client>,
125        mut cx: AsyncAppContext,
126    ) -> Result<()> {
127        let ops = update_channel_buffer
128            .payload
129            .operations
130            .into_iter()
131            .map(language::proto::deserialize_operation)
132            .collect::<Result<Vec<_>, _>>()?;
133
134        this.update(&mut cx, |this, cx| {
135            cx.notify();
136            this.buffer
137                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
138        })?;
139
140        Ok(())
141    }
142
143    async fn handle_update_channel_buffer_collaborators(
144        this: ModelHandle<Self>,
145        message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
146        _: Arc<Client>,
147        mut cx: AsyncAppContext,
148    ) -> Result<()> {
149        this.update(&mut cx, |this, cx| {
150            this.replace_collaborators(message.payload.collaborators, cx);
151            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
152            cx.notify();
153        });
154
155        Ok(())
156    }
157
158    fn on_buffer_update(
159        &mut self,
160        _: ModelHandle<language::Buffer>,
161        event: &language::Event,
162        _: &mut ModelContext<Self>,
163    ) {
164        if let language::Event::Operation(operation) = event {
165            let operation = language::proto::serialize_operation(operation);
166            self.client
167                .send(proto::UpdateChannelBuffer {
168                    channel_id: self.channel.id,
169                    operations: vec![operation],
170                })
171                .log_err();
172        }
173    }
174
175    pub fn epoch(&self) -> u64 {
176        self.buffer_epoch
177    }
178
179    pub fn buffer(&self) -> ModelHandle<language::Buffer> {
180        self.buffer.clone()
181    }
182
183    pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
184        &self.collaborators
185    }
186
187    pub fn channel(&self) -> Arc<Channel> {
188        self.channel.clone()
189    }
190
191    pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
192        log::info!("channel buffer {} disconnected", self.channel.id);
193        if self.connected {
194            self.connected = false;
195            self.subscription.take();
196            cx.emit(ChannelBufferEvent::Disconnected);
197            cx.notify()
198        }
199    }
200
201    pub fn is_connected(&self) -> bool {
202        self.connected
203    }
204
205    pub fn replica_id(&self, cx: &AppContext) -> u16 {
206        self.buffer.read(cx).replica_id()
207    }
208}