1use crate::Channel;
2use anyhow::Result;
3use client::Client;
4use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle};
5use rpc::{proto, TypedEnvelope};
6use std::sync::Arc;
7use util::ResultExt;
8
9pub(crate) fn init(client: &Arc<Client>) {
10 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
11 client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator);
12 client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator);
13 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborator);
14}
15
16pub struct ChannelBuffer {
17 pub(crate) channel: Arc<Channel>,
18 connected: bool,
19 collaborators: Vec<proto::Collaborator>,
20 buffer: ModelHandle<language::Buffer>,
21 buffer_epoch: u64,
22 client: Arc<Client>,
23 subscription: Option<client::Subscription>,
24}
25
26pub enum Event {
27 CollaboratorsChanged,
28 Disconnected,
29}
30
31impl Entity for ChannelBuffer {
32 type Event = Event;
33
34 fn release(&mut self, _: &mut AppContext) {
35 if self.connected {
36 self.client
37 .send(proto::LeaveChannelBuffer {
38 channel_id: self.channel.id,
39 })
40 .log_err();
41 }
42 }
43}
44
45impl ChannelBuffer {
46 pub(crate) async fn new(
47 channel: Arc<Channel>,
48 client: Arc<Client>,
49 mut cx: AsyncAppContext,
50 ) -> Result<ModelHandle<Self>> {
51 let response = client
52 .request(proto::JoinChannelBuffer {
53 channel_id: channel.id,
54 })
55 .await?;
56
57 let base_text = response.base_text;
58 let operations = response
59 .operations
60 .into_iter()
61 .map(language::proto::deserialize_operation)
62 .collect::<Result<Vec<_>, _>>()?;
63
64 let collaborators = response.collaborators;
65
66 let buffer = cx.add_model(|_| {
67 language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
68 });
69 buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
70
71 let subscription = client.subscribe_to_entity(channel.id)?;
72
73 anyhow::Ok(cx.add_model(|cx| {
74 cx.subscribe(&buffer, Self::on_buffer_update).detach();
75
76 Self {
77 buffer,
78 buffer_epoch: response.epoch,
79 client,
80 connected: true,
81 collaborators,
82 channel,
83 subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
84 }
85 }))
86 }
87
88 pub(crate) fn replace_collaborators(
89 &mut self,
90 collaborators: Vec<proto::Collaborator>,
91 cx: &mut ModelContext<Self>,
92 ) {
93 for old_collaborator in &self.collaborators {
94 if collaborators
95 .iter()
96 .any(|c| c.replica_id == old_collaborator.replica_id)
97 {
98 self.buffer.update(cx, |buffer, cx| {
99 buffer.remove_peer(old_collaborator.replica_id as u16, cx)
100 });
101 }
102 }
103 self.collaborators = collaborators;
104 cx.emit(Event::CollaboratorsChanged);
105 cx.notify();
106 }
107
108 async fn handle_update_channel_buffer(
109 this: ModelHandle<Self>,
110 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
111 _: Arc<Client>,
112 mut cx: AsyncAppContext,
113 ) -> Result<()> {
114 let ops = update_channel_buffer
115 .payload
116 .operations
117 .into_iter()
118 .map(language::proto::deserialize_operation)
119 .collect::<Result<Vec<_>, _>>()?;
120
121 this.update(&mut cx, |this, cx| {
122 cx.notify();
123 this.buffer
124 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
125 })?;
126
127 Ok(())
128 }
129
130 async fn handle_add_channel_buffer_collaborator(
131 this: ModelHandle<Self>,
132 envelope: TypedEnvelope<proto::AddChannelBufferCollaborator>,
133 _: Arc<Client>,
134 mut cx: AsyncAppContext,
135 ) -> Result<()> {
136 let collaborator = envelope.payload.collaborator.ok_or_else(|| {
137 anyhow::anyhow!(
138 "Should have gotten a collaborator in the AddChannelBufferCollaborator message"
139 )
140 })?;
141
142 this.update(&mut cx, |this, cx| {
143 this.collaborators.push(collaborator);
144 cx.emit(Event::CollaboratorsChanged);
145 cx.notify();
146 });
147
148 Ok(())
149 }
150
151 async fn handle_remove_channel_buffer_collaborator(
152 this: ModelHandle<Self>,
153 message: TypedEnvelope<proto::RemoveChannelBufferCollaborator>,
154 _: Arc<Client>,
155 mut cx: AsyncAppContext,
156 ) -> Result<()> {
157 this.update(&mut cx, |this, cx| {
158 this.collaborators.retain(|collaborator| {
159 if collaborator.peer_id == message.payload.peer_id {
160 this.buffer.update(cx, |buffer, cx| {
161 buffer.remove_peer(collaborator.replica_id as u16, cx)
162 });
163 false
164 } else {
165 true
166 }
167 });
168 cx.emit(Event::CollaboratorsChanged);
169 cx.notify();
170 });
171
172 Ok(())
173 }
174
175 async fn handle_update_channel_buffer_collaborator(
176 this: ModelHandle<Self>,
177 message: TypedEnvelope<proto::UpdateChannelBufferCollaborator>,
178 _: Arc<Client>,
179 mut cx: AsyncAppContext,
180 ) -> Result<()> {
181 this.update(&mut cx, |this, cx| {
182 for collaborator in &mut this.collaborators {
183 if collaborator.peer_id == message.payload.old_peer_id {
184 collaborator.peer_id = message.payload.new_peer_id;
185 break;
186 }
187 }
188 cx.emit(Event::CollaboratorsChanged);
189 cx.notify();
190 });
191
192 Ok(())
193 }
194
195 fn on_buffer_update(
196 &mut self,
197 _: ModelHandle<language::Buffer>,
198 event: &language::Event,
199 _: &mut ModelContext<Self>,
200 ) {
201 if let language::Event::Operation(operation) = event {
202 let operation = language::proto::serialize_operation(operation);
203 self.client
204 .send(proto::UpdateChannelBuffer {
205 channel_id: self.channel.id,
206 operations: vec![operation],
207 })
208 .log_err();
209 }
210 }
211
212 pub fn epoch(&self) -> u64 {
213 self.buffer_epoch
214 }
215
216 pub fn buffer(&self) -> ModelHandle<language::Buffer> {
217 self.buffer.clone()
218 }
219
220 pub fn collaborators(&self) -> &[proto::Collaborator] {
221 &self.collaborators
222 }
223
224 pub fn channel(&self) -> Arc<Channel> {
225 self.channel.clone()
226 }
227
228 pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
229 log::info!("channel buffer {} disconnected", self.channel.id);
230 if self.connected {
231 self.connected = false;
232 self.subscription.take();
233 cx.emit(Event::Disconnected);
234 cx.notify()
235 }
236 }
237
238 pub fn is_connected(&self) -> bool {
239 self.connected
240 }
241
242 pub fn replica_id(&self, cx: &AppContext) -> u16 {
243 self.buffer.read(cx).replica_id()
244 }
245}