1use crate::{Channel, ChannelId, ChannelStore};
2use anyhow::Result;
3use client::{Client, Collaborator, UserStore};
4use collections::HashMap;
5use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
6use language::proto::serialize_version;
7use rpc::{
8 proto::{self, PeerId},
9 TypedEnvelope,
10};
11use std::{sync::Arc, time::Duration};
12use util::ResultExt;
13
14pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
15
16pub(crate) fn init(client: &Arc<Client>) {
17 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
18 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
19}
20
21pub struct ChannelBuffer {
22 pub channel_id: ChannelId,
23 connected: bool,
24 collaborators: HashMap<PeerId, Collaborator>,
25 user_store: ModelHandle<UserStore>,
26 channel_store: ModelHandle<ChannelStore>,
27 buffer: ModelHandle<language::Buffer>,
28 buffer_epoch: u64,
29 client: Arc<Client>,
30 subscription: Option<client::Subscription>,
31 acknowledge_task: Option<Task<Result<()>>>,
32}
33
34pub enum ChannelBufferEvent {
35 CollaboratorsChanged,
36 Disconnected,
37 BufferEdited,
38 ChannelChanged,
39}
40
41impl Entity for ChannelBuffer {
42 type Event = ChannelBufferEvent;
43
44 fn release(&mut self, _: &mut AppContext) {
45 if self.connected {
46 if let Some(task) = self.acknowledge_task.take() {
47 task.detach();
48 }
49 self.client
50 .send(proto::LeaveChannelBuffer {
51 channel_id: self.channel_id,
52 })
53 .log_err();
54 }
55 }
56}
57
58impl ChannelBuffer {
59 pub(crate) async fn new(
60 channel: Arc<Channel>,
61 client: Arc<Client>,
62 user_store: ModelHandle<UserStore>,
63 channel_store: ModelHandle<ChannelStore>,
64 mut cx: AsyncAppContext,
65 ) -> Result<ModelHandle<Self>> {
66 let response = client
67 .request(proto::JoinChannelBuffer {
68 channel_id: channel.id,
69 })
70 .await?;
71
72 let base_text = response.base_text;
73 let operations = response
74 .operations
75 .into_iter()
76 .map(language::proto::deserialize_operation)
77 .collect::<Result<Vec<_>, _>>()?;
78
79 let buffer = cx.add_model(|_| {
80 language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
81 });
82 buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
83
84 let subscription = client.subscribe_to_entity(channel.id)?;
85
86 anyhow::Ok(cx.add_model(|cx| {
87 cx.subscribe(&buffer, Self::on_buffer_update).detach();
88
89 let mut this = Self {
90 buffer,
91 buffer_epoch: response.epoch,
92 client,
93 connected: true,
94 collaborators: Default::default(),
95 acknowledge_task: None,
96 channel_id: channel.id,
97 subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
98 user_store,
99 channel_store,
100 };
101 this.replace_collaborators(response.collaborators, cx);
102 this
103 }))
104 }
105
106 pub fn remote_id(&self, cx: &AppContext) -> u64 {
107 self.buffer.read(cx).remote_id()
108 }
109
110 pub fn user_store(&self) -> &ModelHandle<UserStore> {
111 &self.user_store
112 }
113
114 pub(crate) fn replace_collaborators(
115 &mut self,
116 collaborators: Vec<proto::Collaborator>,
117 cx: &mut ModelContext<Self>,
118 ) {
119 let mut new_collaborators = HashMap::default();
120 for collaborator in collaborators {
121 if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
122 new_collaborators.insert(collaborator.peer_id, collaborator);
123 }
124 }
125
126 for (_, old_collaborator) in &self.collaborators {
127 if !new_collaborators.contains_key(&old_collaborator.peer_id) {
128 self.buffer.update(cx, |buffer, cx| {
129 buffer.remove_peer(old_collaborator.replica_id as u16, cx)
130 });
131 }
132 }
133 self.collaborators = new_collaborators;
134 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
135 cx.notify();
136 }
137
138 async fn handle_update_channel_buffer(
139 this: ModelHandle<Self>,
140 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
141 _: Arc<Client>,
142 mut cx: AsyncAppContext,
143 ) -> Result<()> {
144 let ops = update_channel_buffer
145 .payload
146 .operations
147 .into_iter()
148 .map(language::proto::deserialize_operation)
149 .collect::<Result<Vec<_>, _>>()?;
150
151 this.update(&mut cx, |this, cx| {
152 cx.notify();
153 this.buffer
154 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
155 })?;
156
157 Ok(())
158 }
159
160 async fn handle_update_channel_buffer_collaborators(
161 this: ModelHandle<Self>,
162 message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
163 _: Arc<Client>,
164 mut cx: AsyncAppContext,
165 ) -> Result<()> {
166 this.update(&mut cx, |this, cx| {
167 this.replace_collaborators(message.payload.collaborators, cx);
168 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
169 cx.notify();
170 });
171
172 Ok(())
173 }
174
175 fn on_buffer_update(
176 &mut self,
177 _: ModelHandle<language::Buffer>,
178 event: &language::Event,
179 cx: &mut ModelContext<Self>,
180 ) {
181 match event {
182 language::Event::Operation(operation) => {
183 let operation = language::proto::serialize_operation(operation);
184 self.client
185 .send(proto::UpdateChannelBuffer {
186 channel_id: self.channel_id,
187 operations: vec![operation],
188 })
189 .log_err();
190 }
191 language::Event::Edited => {
192 cx.emit(ChannelBufferEvent::BufferEdited);
193 }
194 _ => {}
195 }
196 }
197
198 pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
199 let buffer = self.buffer.read(cx);
200 let version = buffer.version();
201 let buffer_id = buffer.remote_id();
202 let client = self.client.clone();
203 let epoch = self.epoch();
204
205 self.acknowledge_task = Some(cx.spawn_weak(|_, cx| async move {
206 cx.background().timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL).await;
207 client
208 .send(proto::AckBufferOperation {
209 buffer_id,
210 epoch,
211 version: serialize_version(&version),
212 })
213 .ok();
214 Ok(())
215 }));
216 }
217
218 pub fn epoch(&self) -> u64 {
219 self.buffer_epoch
220 }
221
222 pub fn buffer(&self) -> ModelHandle<language::Buffer> {
223 self.buffer.clone()
224 }
225
226 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
227 &self.collaborators
228 }
229
230 pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
231 self.channel_store
232 .read(cx)
233 .channel_for_id(self.channel_id)
234 .cloned()
235 }
236
237 pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
238 log::info!("channel buffer {} disconnected", self.channel_id);
239 if self.connected {
240 self.connected = false;
241 self.subscription.take();
242 cx.emit(ChannelBufferEvent::Disconnected);
243 cx.notify()
244 }
245 }
246
247 pub(crate) fn channel_changed(&mut self, cx: &mut ModelContext<Self>) {
248 cx.emit(ChannelBufferEvent::ChannelChanged);
249 cx.notify()
250 }
251
252 pub fn is_connected(&self) -> bool {
253 self.connected
254 }
255
256 pub fn replica_id(&self, cx: &AppContext) -> u16 {
257 self.buffer.read(cx).replica_id()
258 }
259}