1use crate::Channel;
2use anyhow::Result;
3use client::{Client, Collaborator, UserStore};
4use collections::HashMap;
5use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
6use language::proto::serialize_version;
7use rpc::{
8 proto::{self, PeerId},
9 TypedEnvelope,
10};
11use std::{sync::Arc, time::Duration};
12use util::ResultExt;
13
14pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
15
16pub(crate) fn init(client: &Arc<Client>) {
17 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
18 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
19}
20
21pub struct ChannelBuffer {
22 pub(crate) channel: Arc<Channel>,
23 connected: bool,
24 collaborators: HashMap<PeerId, Collaborator>,
25 user_store: ModelHandle<UserStore>,
26 buffer: ModelHandle<language::Buffer>,
27 buffer_epoch: u64,
28 client: Arc<Client>,
29 subscription: Option<client::Subscription>,
30 acknowledge_task: Option<Task<Result<()>>>,
31}
32
33pub enum ChannelBufferEvent {
34 CollaboratorsChanged,
35 Disconnected,
36 BufferEdited,
37}
38
39impl Entity for ChannelBuffer {
40 type Event = ChannelBufferEvent;
41
42 fn release(&mut self, _: &mut AppContext) {
43 if self.connected {
44 if let Some(task) = self.acknowledge_task.take() {
45 task.detach();
46 }
47 self.client
48 .send(proto::LeaveChannelBuffer {
49 channel_id: self.channel.id,
50 })
51 .log_err();
52 }
53 }
54}
55
56impl ChannelBuffer {
57 pub(crate) async fn new(
58 channel: Arc<Channel>,
59 client: Arc<Client>,
60 user_store: ModelHandle<UserStore>,
61 mut cx: AsyncAppContext,
62 ) -> Result<ModelHandle<Self>> {
63 let response = client
64 .request(proto::JoinChannelBuffer {
65 channel_id: channel.id,
66 })
67 .await?;
68
69 let base_text = response.base_text;
70 let operations = response
71 .operations
72 .into_iter()
73 .map(language::proto::deserialize_operation)
74 .collect::<Result<Vec<_>, _>>()?;
75
76 let buffer = cx.add_model(|_| {
77 language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
78 });
79 buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
80
81 let subscription = client.subscribe_to_entity(channel.id)?;
82
83 anyhow::Ok(cx.add_model(|cx| {
84 cx.subscribe(&buffer, Self::on_buffer_update).detach();
85
86 let mut this = Self {
87 buffer,
88 buffer_epoch: response.epoch,
89 client,
90 connected: true,
91 collaborators: Default::default(),
92 acknowledge_task: None,
93 channel,
94 subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
95 user_store,
96 };
97 this.replace_collaborators(response.collaborators, cx);
98 this
99 }))
100 }
101
102 pub fn remote_id(&self, cx: &AppContext) -> u64 {
103 self.buffer.read(cx).remote_id()
104 }
105
106 pub fn user_store(&self) -> &ModelHandle<UserStore> {
107 &self.user_store
108 }
109
110 pub(crate) fn replace_collaborators(
111 &mut self,
112 collaborators: Vec<proto::Collaborator>,
113 cx: &mut ModelContext<Self>,
114 ) {
115 let mut new_collaborators = HashMap::default();
116 for collaborator in collaborators {
117 if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
118 new_collaborators.insert(collaborator.peer_id, collaborator);
119 }
120 }
121
122 for (_, old_collaborator) in &self.collaborators {
123 if !new_collaborators.contains_key(&old_collaborator.peer_id) {
124 self.buffer.update(cx, |buffer, cx| {
125 buffer.remove_peer(old_collaborator.replica_id as u16, cx)
126 });
127 }
128 }
129 self.collaborators = new_collaborators;
130 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
131 cx.notify();
132 }
133
134 async fn handle_update_channel_buffer(
135 this: ModelHandle<Self>,
136 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
137 _: Arc<Client>,
138 mut cx: AsyncAppContext,
139 ) -> Result<()> {
140 let ops = update_channel_buffer
141 .payload
142 .operations
143 .into_iter()
144 .map(language::proto::deserialize_operation)
145 .collect::<Result<Vec<_>, _>>()?;
146
147 this.update(&mut cx, |this, cx| {
148 cx.notify();
149 this.buffer
150 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
151 })?;
152
153 Ok(())
154 }
155
156 async fn handle_update_channel_buffer_collaborators(
157 this: ModelHandle<Self>,
158 message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
159 _: Arc<Client>,
160 mut cx: AsyncAppContext,
161 ) -> Result<()> {
162 this.update(&mut cx, |this, cx| {
163 this.replace_collaborators(message.payload.collaborators, cx);
164 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
165 cx.notify();
166 });
167
168 Ok(())
169 }
170
171 fn on_buffer_update(
172 &mut self,
173 _: ModelHandle<language::Buffer>,
174 event: &language::Event,
175 cx: &mut ModelContext<Self>,
176 ) {
177 match event {
178 language::Event::Operation(operation) => {
179 let operation = language::proto::serialize_operation(operation);
180 self.client
181 .send(proto::UpdateChannelBuffer {
182 channel_id: self.channel.id,
183 operations: vec![operation],
184 })
185 .log_err();
186 }
187 language::Event::Edited => {
188 cx.emit(ChannelBufferEvent::BufferEdited);
189 }
190 _ => {}
191 }
192 }
193
194 pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
195 let buffer = self.buffer.read(cx);
196 let version = buffer.version();
197 let buffer_id = buffer.remote_id();
198 let client = self.client.clone();
199 let epoch = self.epoch();
200
201 self.acknowledge_task = Some(cx.spawn_weak(|_, cx| async move {
202 cx.background().timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL).await;
203 client
204 .send(proto::AckBufferOperation {
205 buffer_id,
206 epoch,
207 version: serialize_version(&version),
208 })
209 .ok();
210 Ok(())
211 }));
212 }
213
214 pub fn epoch(&self) -> u64 {
215 self.buffer_epoch
216 }
217
218 pub fn buffer(&self) -> ModelHandle<language::Buffer> {
219 self.buffer.clone()
220 }
221
222 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
223 &self.collaborators
224 }
225
226 pub fn channel(&self) -> Arc<Channel> {
227 self.channel.clone()
228 }
229
230 pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
231 log::info!("channel buffer {} disconnected", self.channel.id);
232 if self.connected {
233 self.connected = false;
234 self.subscription.take();
235 cx.emit(ChannelBufferEvent::Disconnected);
236 cx.notify()
237 }
238 }
239
240 pub fn is_connected(&self) -> bool {
241 self.connected
242 }
243
244 pub fn replica_id(&self, cx: &AppContext) -> u16 {
245 self.buffer.read(cx).replica_id()
246 }
247}