channel_buffer.rs

  1use crate::{Channel, ChannelId, ChannelStore};
  2use anyhow::Result;
  3use client::Client;
  4use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
  5use rpc::{proto, TypedEnvelope};
  6use std::sync::Arc;
  7use util::ResultExt;
  8
  9pub(crate) fn init(client: &Arc<Client>) {
 10    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
 11    client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator);
 12    client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator);
 13}
 14
 15pub struct ChannelBuffer {
 16    channel_id: ChannelId,
 17    collaborators: Vec<proto::Collaborator>,
 18    buffer: ModelHandle<language::Buffer>,
 19    channel_store: ModelHandle<ChannelStore>,
 20    client: Arc<Client>,
 21    _subscription: client::Subscription,
 22}
 23
 24impl Entity for ChannelBuffer {
 25    type Event = ();
 26
 27    fn release(&mut self, _: &mut AppContext) {
 28        self.client
 29            .send(proto::LeaveChannelBuffer {
 30                channel_id: self.channel_id,
 31            })
 32            .log_err();
 33    }
 34}
 35
 36impl ChannelBuffer {
 37    pub(crate) fn new(
 38        channel_store: ModelHandle<ChannelStore>,
 39        channel_id: ChannelId,
 40        client: Arc<Client>,
 41        cx: &mut AppContext,
 42    ) -> Task<Result<ModelHandle<Self>>> {
 43        cx.spawn(|mut cx| async move {
 44            let response = client
 45                .request(proto::JoinChannelBuffer { channel_id })
 46                .await?;
 47
 48            let base_text = response.base_text;
 49            let operations = response
 50                .operations
 51                .into_iter()
 52                .map(language::proto::deserialize_operation)
 53                .collect::<Result<Vec<_>, _>>()?;
 54
 55            let collaborators = response.collaborators;
 56
 57            let buffer =
 58                cx.add_model(|cx| language::Buffer::new(response.replica_id as u16, base_text, cx));
 59            buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 60
 61            let subscription = client.subscribe_to_entity(channel_id)?;
 62
 63            anyhow::Ok(cx.add_model(|cx| {
 64                cx.subscribe(&buffer, Self::on_buffer_update).detach();
 65
 66                Self {
 67                    buffer,
 68                    client,
 69                    channel_id,
 70                    channel_store,
 71                    collaborators,
 72                    _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
 73                }
 74            }))
 75        })
 76    }
 77
 78    async fn handle_update_channel_buffer(
 79        this: ModelHandle<Self>,
 80        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
 81        _: Arc<Client>,
 82        mut cx: AsyncAppContext,
 83    ) -> Result<()> {
 84        let ops = update_channel_buffer
 85            .payload
 86            .operations
 87            .into_iter()
 88            .map(language::proto::deserialize_operation)
 89            .collect::<Result<Vec<_>, _>>()?;
 90
 91        this.update(&mut cx, |this, cx| {
 92            cx.notify();
 93            this.buffer
 94                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
 95        })?;
 96
 97        Ok(())
 98    }
 99
100    async fn handle_add_channel_buffer_collaborator(
101        this: ModelHandle<Self>,
102        envelope: TypedEnvelope<proto::AddChannelBufferCollaborator>,
103        _: Arc<Client>,
104        mut cx: AsyncAppContext,
105    ) -> Result<()> {
106        let collaborator = envelope.payload.collaborator.ok_or_else(|| {
107            anyhow::anyhow!(
108                "Should have gotten a collaborator in the AddChannelBufferCollaborator message"
109            )
110        })?;
111
112        this.update(&mut cx, |this, cx| {
113            this.collaborators.push(collaborator);
114            cx.notify();
115        });
116
117        Ok(())
118    }
119
120    async fn handle_remove_channel_buffer_collaborator(
121        this: ModelHandle<Self>,
122        message: TypedEnvelope<proto::RemoveChannelBufferCollaborator>,
123        _: Arc<Client>,
124        mut cx: AsyncAppContext,
125    ) -> Result<()> {
126        this.update(&mut cx, |this, cx| {
127            this.collaborators.retain(|collaborator| {
128                if collaborator.peer_id == message.payload.peer_id {
129                    this.buffer.update(cx, |buffer, cx| {
130                        buffer.remove_peer(collaborator.replica_id as u16, cx)
131                    });
132                    false
133                } else {
134                    true
135                }
136            });
137            cx.notify();
138        });
139
140        Ok(())
141    }
142
143    fn on_buffer_update(
144        &mut self,
145        _: ModelHandle<language::Buffer>,
146        event: &language::Event,
147        _: &mut ModelContext<Self>,
148    ) {
149        if let language::Event::Operation(operation) = event {
150            let operation = language::proto::serialize_operation(operation);
151            self.client
152                .send(proto::UpdateChannelBuffer {
153                    channel_id: self.channel_id,
154                    operations: vec![operation],
155                })
156                .log_err();
157        }
158    }
159
160    pub fn buffer(&self) -> ModelHandle<language::Buffer> {
161        self.buffer.clone()
162    }
163
164    pub fn collaborators(&self) -> &[proto::Collaborator] {
165        &self.collaborators
166    }
167
168    pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
169        self.channel_store
170            .read(cx)
171            .channel_for_id(self.channel_id)
172            .cloned()
173    }
174
175    pub fn replica_id(&self, cx: &AppContext) -> u16 {
176        self.buffer.read(cx).replica_id()
177    }
178}