channel_buffer.rs

  1use crate::{Channel, ChannelId, ChannelStore};
  2use anyhow::Result;
  3use client::Client;
  4use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle};
  5use rpc::{proto, TypedEnvelope};
  6use std::sync::Arc;
  7use util::ResultExt;
  8
  9pub(crate) fn init(client: &Arc<Client>) {
 10    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
 11    client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator);
 12    client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator);
 13}
 14
 15pub struct ChannelBuffer {
 16    channel_id: ChannelId,
 17    collaborators: Vec<proto::Collaborator>,
 18    buffer: ModelHandle<language::Buffer>,
 19    channel_store: ModelHandle<ChannelStore>,
 20    client: Arc<Client>,
 21    _subscription: client::Subscription,
 22}
 23
 24pub enum Event {
 25    CollaboratorsChanged,
 26}
 27
 28impl Entity for ChannelBuffer {
 29    type Event = Event;
 30
 31    fn release(&mut self, _: &mut AppContext) {
 32        self.client
 33            .send(proto::LeaveChannelBuffer {
 34                channel_id: self.channel_id,
 35            })
 36            .log_err();
 37    }
 38}
 39
 40impl ChannelBuffer {
 41    pub(crate) async fn new(
 42        channel_store: ModelHandle<ChannelStore>,
 43        channel_id: ChannelId,
 44        client: Arc<Client>,
 45        mut cx: AsyncAppContext,
 46    ) -> Result<ModelHandle<Self>> {
 47        let response = client
 48            .request(proto::JoinChannelBuffer { channel_id })
 49            .await?;
 50
 51        let base_text = response.base_text;
 52        let operations = response
 53            .operations
 54            .into_iter()
 55            .map(language::proto::deserialize_operation)
 56            .collect::<Result<Vec<_>, _>>()?;
 57
 58        let collaborators = response.collaborators;
 59
 60        let buffer = cx.add_model(|_| {
 61            language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
 62        });
 63        buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 64
 65        let subscription = client.subscribe_to_entity(channel_id)?;
 66
 67        anyhow::Ok(cx.add_model(|cx| {
 68            cx.subscribe(&buffer, Self::on_buffer_update).detach();
 69
 70            Self {
 71                buffer,
 72                client,
 73                channel_id,
 74                channel_store,
 75                collaborators,
 76                _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
 77            }
 78        }))
 79    }
 80
 81    async fn handle_update_channel_buffer(
 82        this: ModelHandle<Self>,
 83        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
 84        _: Arc<Client>,
 85        mut cx: AsyncAppContext,
 86    ) -> Result<()> {
 87        let ops = update_channel_buffer
 88            .payload
 89            .operations
 90            .into_iter()
 91            .map(language::proto::deserialize_operation)
 92            .collect::<Result<Vec<_>, _>>()?;
 93
 94        this.update(&mut cx, |this, cx| {
 95            cx.notify();
 96            this.buffer
 97                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
 98        })?;
 99
100        Ok(())
101    }
102
103    async fn handle_add_channel_buffer_collaborator(
104        this: ModelHandle<Self>,
105        envelope: TypedEnvelope<proto::AddChannelBufferCollaborator>,
106        _: Arc<Client>,
107        mut cx: AsyncAppContext,
108    ) -> Result<()> {
109        let collaborator = envelope.payload.collaborator.ok_or_else(|| {
110            anyhow::anyhow!(
111                "Should have gotten a collaborator in the AddChannelBufferCollaborator message"
112            )
113        })?;
114
115        this.update(&mut cx, |this, cx| {
116            this.collaborators.push(collaborator);
117            cx.emit(Event::CollaboratorsChanged);
118            cx.notify();
119        });
120
121        Ok(())
122    }
123
124    async fn handle_remove_channel_buffer_collaborator(
125        this: ModelHandle<Self>,
126        message: TypedEnvelope<proto::RemoveChannelBufferCollaborator>,
127        _: Arc<Client>,
128        mut cx: AsyncAppContext,
129    ) -> Result<()> {
130        this.update(&mut cx, |this, cx| {
131            this.collaborators.retain(|collaborator| {
132                if collaborator.peer_id == message.payload.peer_id {
133                    this.buffer.update(cx, |buffer, cx| {
134                        buffer.remove_peer(collaborator.replica_id as u16, cx)
135                    });
136                    false
137                } else {
138                    true
139                }
140            });
141            cx.emit(Event::CollaboratorsChanged);
142            cx.notify();
143        });
144
145        Ok(())
146    }
147
148    fn on_buffer_update(
149        &mut self,
150        _: ModelHandle<language::Buffer>,
151        event: &language::Event,
152        _: &mut ModelContext<Self>,
153    ) {
154        if let language::Event::Operation(operation) = event {
155            let operation = language::proto::serialize_operation(operation);
156            self.client
157                .send(proto::UpdateChannelBuffer {
158                    channel_id: self.channel_id,
159                    operations: vec![operation],
160                })
161                .log_err();
162        }
163    }
164
165    pub fn buffer(&self) -> ModelHandle<language::Buffer> {
166        self.buffer.clone()
167    }
168
169    pub fn collaborators(&self) -> &[proto::Collaborator] {
170        &self.collaborators
171    }
172
173    pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
174        self.channel_store
175            .read(cx)
176            .channel_for_id(self.channel_id)
177            .cloned()
178    }
179
180    pub fn replica_id(&self, cx: &AppContext) -> u16 {
181        self.buffer.read(cx).replica_id()
182    }
183}