1use crate::{Channel, ChannelId, ChannelStore};
2use anyhow::Result;
3use client::{Client, Collaborator, UserStore};
4use collections::HashMap;
5use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
6use language::proto::serialize_version;
7use rpc::{
8 proto::{self, PeerId},
9 TypedEnvelope,
10};
11use std::{sync::Arc, time::Duration};
12use util::ResultExt;
13
14pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
15
16pub(crate) fn init(client: &Arc<Client>) {
17 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
18 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
19}
20
21pub struct ChannelBuffer {
22 pub channel_id: ChannelId,
23 connected: bool,
24 collaborators: HashMap<PeerId, Collaborator>,
25 user_store: Model<UserStore>,
26 channel_store: Model<ChannelStore>,
27 buffer: Model<language::Buffer>,
28 buffer_epoch: u64,
29 client: Arc<Client>,
30 subscription: Option<client::Subscription>,
31 acknowledge_task: Option<Task<Result<()>>>,
32}
33
34pub enum ChannelBufferEvent {
35 CollaboratorsChanged,
36 Disconnected,
37 BufferEdited,
38 ChannelChanged,
39}
40
41impl EventEmitter<ChannelBufferEvent> for ChannelBuffer {}
42
43impl ChannelBuffer {
44 pub(crate) async fn new(
45 channel: Arc<Channel>,
46 client: Arc<Client>,
47 user_store: Model<UserStore>,
48 channel_store: Model<ChannelStore>,
49 mut cx: AsyncAppContext,
50 ) -> Result<Model<Self>> {
51 let response = client
52 .request(proto::JoinChannelBuffer {
53 channel_id: channel.id,
54 })
55 .await?;
56
57 let base_text = response.base_text;
58 let operations = response
59 .operations
60 .into_iter()
61 .map(language::proto::deserialize_operation)
62 .collect::<Result<Vec<_>, _>>()?;
63
64 let buffer = cx.build_model(|_| {
65 language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
66 })?;
67 buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))??;
68
69 let subscription = client.subscribe_to_entity(channel.id)?;
70
71 anyhow::Ok(cx.build_model(|cx| {
72 cx.subscribe(&buffer, Self::on_buffer_update).detach();
73 cx.on_release(Self::release).detach();
74 let mut this = Self {
75 buffer,
76 buffer_epoch: response.epoch,
77 client,
78 connected: true,
79 collaborators: Default::default(),
80 acknowledge_task: None,
81 channel_id: channel.id,
82 subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
83 user_store,
84 channel_store,
85 };
86 this.replace_collaborators(response.collaborators, cx);
87 this
88 })?)
89 }
90
91 fn release(&mut self, _: &mut AppContext) {
92 if self.connected {
93 if let Some(task) = self.acknowledge_task.take() {
94 task.detach();
95 }
96 self.client
97 .send(proto::LeaveChannelBuffer {
98 channel_id: self.channel_id,
99 })
100 .log_err();
101 }
102 }
103
104 pub fn remote_id(&self, cx: &AppContext) -> u64 {
105 self.buffer.read(cx).remote_id()
106 }
107
108 pub fn user_store(&self) -> &Model<UserStore> {
109 &self.user_store
110 }
111
112 pub(crate) fn replace_collaborators(
113 &mut self,
114 collaborators: Vec<proto::Collaborator>,
115 cx: &mut ModelContext<Self>,
116 ) {
117 let mut new_collaborators = HashMap::default();
118 for collaborator in collaborators {
119 if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
120 new_collaborators.insert(collaborator.peer_id, collaborator);
121 }
122 }
123
124 for (_, old_collaborator) in &self.collaborators {
125 if !new_collaborators.contains_key(&old_collaborator.peer_id) {
126 self.buffer.update(cx, |buffer, cx| {
127 buffer.remove_peer(old_collaborator.replica_id as u16, cx)
128 });
129 }
130 }
131 self.collaborators = new_collaborators;
132 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
133 cx.notify();
134 }
135
136 async fn handle_update_channel_buffer(
137 this: Model<Self>,
138 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
139 _: Arc<Client>,
140 mut cx: AsyncAppContext,
141 ) -> Result<()> {
142 let ops = update_channel_buffer
143 .payload
144 .operations
145 .into_iter()
146 .map(language::proto::deserialize_operation)
147 .collect::<Result<Vec<_>, _>>()?;
148
149 this.update(&mut cx, |this, cx| {
150 cx.notify();
151 this.buffer
152 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
153 })??;
154
155 Ok(())
156 }
157
158 async fn handle_update_channel_buffer_collaborators(
159 this: Model<Self>,
160 message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
161 _: Arc<Client>,
162 mut cx: AsyncAppContext,
163 ) -> Result<()> {
164 this.update(&mut cx, |this, cx| {
165 this.replace_collaborators(message.payload.collaborators, cx);
166 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
167 cx.notify();
168 })
169 }
170
171 fn on_buffer_update(
172 &mut self,
173 _: Model<language::Buffer>,
174 event: &language::Event,
175 cx: &mut ModelContext<Self>,
176 ) {
177 match event {
178 language::Event::Operation(operation) => {
179 let operation = language::proto::serialize_operation(operation);
180 self.client
181 .send(proto::UpdateChannelBuffer {
182 channel_id: self.channel_id,
183 operations: vec![operation],
184 })
185 .log_err();
186 }
187 language::Event::Edited => {
188 cx.emit(ChannelBufferEvent::BufferEdited);
189 }
190 _ => {}
191 }
192 }
193
194 pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
195 let buffer = self.buffer.read(cx);
196 let version = buffer.version();
197 let buffer_id = buffer.remote_id();
198 let client = self.client.clone();
199 let epoch = self.epoch();
200
201 self.acknowledge_task = Some(cx.spawn(move |_, cx| async move {
202 cx.background_executor()
203 .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
204 .await;
205 client
206 .send(proto::AckBufferOperation {
207 buffer_id,
208 epoch,
209 version: serialize_version(&version),
210 })
211 .ok();
212 Ok(())
213 }));
214 }
215
216 pub fn epoch(&self) -> u64 {
217 self.buffer_epoch
218 }
219
220 pub fn buffer(&self) -> Model<language::Buffer> {
221 self.buffer.clone()
222 }
223
224 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
225 &self.collaborators
226 }
227
228 pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
229 self.channel_store
230 .read(cx)
231 .channel_for_id(self.channel_id)
232 .cloned()
233 }
234
235 pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
236 log::info!("channel buffer {} disconnected", self.channel_id);
237 if self.connected {
238 self.connected = false;
239 self.subscription.take();
240 cx.emit(ChannelBufferEvent::Disconnected);
241 cx.notify()
242 }
243 }
244
245 pub(crate) fn channel_changed(&mut self, cx: &mut ModelContext<Self>) {
246 cx.emit(ChannelBufferEvent::ChannelChanged);
247 cx.notify()
248 }
249
250 pub fn is_connected(&self) -> bool {
251 self.connected
252 }
253
254 pub fn replica_id(&self, cx: &AppContext) -> u16 {
255 self.buffer.read(cx).replica_id()
256 }
257}