1use crate::{Channel, ChannelStore};
2use anyhow::Result;
3use client::{ChannelId, Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
4use collections::HashMap;
5use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task};
6use language::proto::serialize_version;
7use rpc::{
8 AnyProtoClient, TypedEnvelope,
9 proto::{self, PeerId},
10};
11use std::{sync::Arc, time::Duration};
12use text::{BufferId, ReplicaId};
13use util::ResultExt;
14
15pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
16
17pub(crate) fn init(client: &AnyProtoClient) {
18 client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer);
19 client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
20}
21
22pub struct ChannelBuffer {
23 pub channel_id: ChannelId,
24 connected: bool,
25 rejoining: bool,
26 collaborators: HashMap<PeerId, Collaborator>,
27 user_store: Entity<UserStore>,
28 channel_store: Entity<ChannelStore>,
29 buffer: Entity<language::Buffer>,
30 buffer_epoch: u64,
31 client: Arc<Client>,
32 subscription: Option<client::Subscription>,
33 acknowledge_task: Option<Task<Result<()>>>,
34}
35
36pub enum ChannelBufferEvent {
37 CollaboratorsChanged,
38 Disconnected,
39 Connected,
40 BufferEdited,
41 ChannelChanged,
42}
43
44impl EventEmitter<ChannelBufferEvent> for ChannelBuffer {}
45
46impl ChannelBuffer {
47 pub(crate) async fn new(
48 channel: Arc<Channel>,
49 client: Arc<Client>,
50 user_store: Entity<UserStore>,
51 channel_store: Entity<ChannelStore>,
52 cx: &mut AsyncApp,
53 ) -> Result<Entity<Self>> {
54 let response = client
55 .request(proto::JoinChannelBuffer {
56 channel_id: channel.id.0,
57 })
58 .await?;
59 let buffer_id = BufferId::new(response.buffer_id)?;
60 let base_text = response.base_text;
61 let operations = response
62 .operations
63 .into_iter()
64 .map(language::proto::deserialize_operation)
65 .collect::<Result<Vec<_>, _>>()?;
66
67 let buffer = cx.new(|cx| {
68 let capability = channel_store.read(cx).channel_capability(channel.id);
69 language::Buffer::remote(
70 buffer_id,
71 ReplicaId::new(response.replica_id as u16),
72 capability,
73 base_text,
74 )
75 });
76 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
77
78 let subscription = client.subscribe_to_entity(channel.id.0)?;
79
80 anyhow::Ok(cx.new(|cx| {
81 cx.subscribe(&buffer, Self::on_buffer_update).detach();
82 cx.on_release(Self::release).detach();
83 let mut this = Self {
84 buffer,
85 buffer_epoch: response.epoch,
86 client,
87 connected: true,
88 rejoining: false,
89 collaborators: Default::default(),
90 acknowledge_task: None,
91 channel_id: channel.id,
92 subscription: Some(subscription.set_entity(&cx.entity(), &cx.to_async())),
93 user_store,
94 channel_store,
95 };
96 this.replace_collaborators(response.collaborators, cx);
97 this
98 }))
99 }
100
101 fn release(&mut self, _: &mut App) {
102 if self.connected {
103 if let Some(task) = self.acknowledge_task.take() {
104 task.detach();
105 }
106 self.client
107 .send(proto::LeaveChannelBuffer {
108 channel_id: self.channel_id.0,
109 })
110 .log_err();
111 }
112 }
113
114 pub fn connected(&mut self, cx: &mut Context<Self>) {
115 self.connected = true;
116 self.rejoining = false;
117 if self.subscription.is_none() {
118 let Ok(subscription) = self.client.subscribe_to_entity(self.channel_id.0) else {
119 return;
120 };
121 self.subscription = Some(subscription.set_entity(&cx.entity(), &cx.to_async()));
122 cx.emit(ChannelBufferEvent::Connected);
123 }
124 }
125
126 pub(crate) fn set_rejoining(&mut self, rejoining: bool) {
127 self.rejoining = rejoining;
128 }
129
130 pub fn remote_id(&self, cx: &App) -> BufferId {
131 self.buffer.read(cx).remote_id()
132 }
133
134 pub fn user_store(&self) -> &Entity<UserStore> {
135 &self.user_store
136 }
137
138 pub(crate) fn replace_collaborators(
139 &mut self,
140 collaborators: Vec<proto::Collaborator>,
141 cx: &mut Context<Self>,
142 ) {
143 let mut new_collaborators = HashMap::default();
144 for collaborator in collaborators {
145 if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
146 new_collaborators.insert(collaborator.peer_id, collaborator);
147 }
148 }
149
150 for old_collaborator in self.collaborators.values() {
151 if !new_collaborators.contains_key(&old_collaborator.peer_id) {
152 self.buffer.update(cx, |buffer, cx| {
153 buffer.remove_peer(old_collaborator.replica_id, cx)
154 });
155 }
156 }
157 self.collaborators = new_collaborators;
158 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
159 cx.notify();
160 }
161
162 async fn handle_update_channel_buffer(
163 this: Entity<Self>,
164 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
165 mut cx: AsyncApp,
166 ) -> Result<()> {
167 let ops = update_channel_buffer
168 .payload
169 .operations
170 .into_iter()
171 .map(language::proto::deserialize_operation)
172 .collect::<Result<Vec<_>, _>>()?;
173
174 this.update(&mut cx, |this, cx| {
175 cx.notify();
176 this.buffer
177 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
178 });
179
180 Ok(())
181 }
182
183 async fn handle_update_channel_buffer_collaborators(
184 this: Entity<Self>,
185 message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
186 mut cx: AsyncApp,
187 ) -> Result<()> {
188 this.update(&mut cx, |this, cx| {
189 this.replace_collaborators(message.payload.collaborators, cx);
190 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
191 cx.notify();
192 });
193 Ok(())
194 }
195
196 fn on_buffer_update(
197 &mut self,
198 _: Entity<language::Buffer>,
199 event: &language::BufferEvent,
200 cx: &mut Context<Self>,
201 ) {
202 match event {
203 language::BufferEvent::Operation {
204 operation,
205 is_local: true,
206 } => {
207 if *ZED_ALWAYS_ACTIVE
208 && let language::Operation::UpdateSelections { selections, .. } = operation
209 && selections.is_empty()
210 {
211 return;
212 }
213 let operation = language::proto::serialize_operation(operation);
214 if self.rejoining {
215 return;
216 }
217 self.client
218 .send(proto::UpdateChannelBuffer {
219 channel_id: self.channel_id.0,
220 operations: vec![operation],
221 })
222 .log_err();
223 }
224 language::BufferEvent::Edited => {
225 cx.emit(ChannelBufferEvent::BufferEdited);
226 }
227 _ => {}
228 }
229 }
230
231 pub fn acknowledge_buffer_version(&mut self, cx: &mut Context<ChannelBuffer>) {
232 let buffer = self.buffer.read(cx);
233 let version = buffer.version();
234 let buffer_id = buffer.remote_id().into();
235 let client = self.client.clone();
236 let epoch = self.epoch();
237
238 self.acknowledge_task = Some(cx.spawn(async move |_, cx| {
239 cx.background_executor()
240 .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
241 .await;
242 client
243 .send(proto::AckBufferOperation {
244 buffer_id,
245 epoch,
246 version: serialize_version(&version),
247 })
248 .ok();
249 Ok(())
250 }));
251 }
252
253 pub fn epoch(&self) -> u64 {
254 self.buffer_epoch
255 }
256
257 pub fn buffer(&self) -> Entity<language::Buffer> {
258 self.buffer.clone()
259 }
260
261 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
262 &self.collaborators
263 }
264
265 pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
266 self.channel_store
267 .read(cx)
268 .channel_for_id(self.channel_id)
269 .cloned()
270 }
271
272 pub(crate) fn disconnect(&mut self, cx: &mut Context<Self>) {
273 log::info!("channel buffer {} disconnected", self.channel_id);
274 if self.connected {
275 self.connected = false;
276 self.rejoining = false;
277 self.subscription.take();
278 cx.emit(ChannelBufferEvent::Disconnected);
279 cx.notify()
280 }
281 }
282
283 pub(crate) fn channel_changed(&mut self, cx: &mut Context<Self>) {
284 cx.emit(ChannelBufferEvent::ChannelChanged);
285 cx.notify()
286 }
287
288 pub fn is_connected(&self) -> bool {
289 self.connected
290 }
291
292 pub fn replica_id(&self, cx: &App) -> ReplicaId {
293 self.buffer.read(cx).replica_id()
294 }
295}