1use crate::{Channel, ChannelStore};
2use anyhow::Result;
3use client::{ChannelId, Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
4use collections::HashMap;
5use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task};
6use language::proto::serialize_version;
7use rpc::{
8 AnyProtoClient, TypedEnvelope,
9 proto::{self, PeerId},
10};
11use std::{sync::Arc, time::Duration};
12use text::{BufferId, ReplicaId};
13use util::ResultExt;
14
15pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
16
17pub(crate) fn init(client: &AnyProtoClient) {
18 client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer);
19 client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
20}
21
22pub struct ChannelBuffer {
23 pub channel_id: ChannelId,
24 connected: bool,
25 collaborators: HashMap<PeerId, Collaborator>,
26 user_store: Entity<UserStore>,
27 channel_store: Entity<ChannelStore>,
28 buffer: Entity<language::Buffer>,
29 buffer_epoch: u64,
30 client: Arc<Client>,
31 subscription: Option<client::Subscription>,
32 acknowledge_task: Option<Task<Result<()>>>,
33}
34
35pub enum ChannelBufferEvent {
36 CollaboratorsChanged,
37 Disconnected,
38 Connected,
39 BufferEdited,
40 ChannelChanged,
41}
42
43impl EventEmitter<ChannelBufferEvent> for ChannelBuffer {}
44
45impl ChannelBuffer {
46 pub(crate) async fn new(
47 channel: Arc<Channel>,
48 client: Arc<Client>,
49 user_store: Entity<UserStore>,
50 channel_store: Entity<ChannelStore>,
51 cx: &mut AsyncApp,
52 ) -> Result<Entity<Self>> {
53 let response = client
54 .request(proto::JoinChannelBuffer {
55 channel_id: channel.id.0,
56 })
57 .await?;
58 let buffer_id = BufferId::new(response.buffer_id)?;
59 let base_text = response.base_text;
60 let operations = response
61 .operations
62 .into_iter()
63 .map(language::proto::deserialize_operation)
64 .collect::<Result<Vec<_>, _>>()?;
65
66 let buffer = cx.new(|cx| {
67 let capability = channel_store.read(cx).channel_capability(channel.id);
68 language::Buffer::remote(
69 buffer_id,
70 ReplicaId::new(response.replica_id as u16),
71 capability,
72 base_text,
73 cx.background_executor(),
74 )
75 })?;
76 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
77
78 let subscription = client.subscribe_to_entity(channel.id.0)?;
79
80 anyhow::Ok(cx.new(|cx| {
81 cx.subscribe(&buffer, Self::on_buffer_update).detach();
82 cx.on_release(Self::release).detach();
83 let mut this = Self {
84 buffer,
85 buffer_epoch: response.epoch,
86 client,
87 connected: true,
88 collaborators: Default::default(),
89 acknowledge_task: None,
90 channel_id: channel.id,
91 subscription: Some(subscription.set_entity(&cx.entity(), &cx.to_async())),
92 user_store,
93 channel_store,
94 };
95 this.replace_collaborators(response.collaborators, cx);
96 this
97 })?)
98 }
99
100 fn release(&mut self, _: &mut App) {
101 if self.connected {
102 if let Some(task) = self.acknowledge_task.take() {
103 task.detach();
104 }
105 self.client
106 .send(proto::LeaveChannelBuffer {
107 channel_id: self.channel_id.0,
108 })
109 .log_err();
110 }
111 }
112
113 pub fn connected(&mut self, cx: &mut Context<Self>) {
114 self.connected = true;
115 if self.subscription.is_none() {
116 let Ok(subscription) = self.client.subscribe_to_entity(self.channel_id.0) else {
117 return;
118 };
119 self.subscription = Some(subscription.set_entity(&cx.entity(), &cx.to_async()));
120 cx.emit(ChannelBufferEvent::Connected);
121 }
122 }
123
124 pub fn remote_id(&self, cx: &App) -> BufferId {
125 self.buffer.read(cx).remote_id()
126 }
127
128 pub fn user_store(&self) -> &Entity<UserStore> {
129 &self.user_store
130 }
131
132 pub(crate) fn replace_collaborators(
133 &mut self,
134 collaborators: Vec<proto::Collaborator>,
135 cx: &mut Context<Self>,
136 ) {
137 let mut new_collaborators = HashMap::default();
138 for collaborator in collaborators {
139 if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
140 new_collaborators.insert(collaborator.peer_id, collaborator);
141 }
142 }
143
144 for old_collaborator in self.collaborators.values() {
145 if !new_collaborators.contains_key(&old_collaborator.peer_id) {
146 self.buffer.update(cx, |buffer, cx| {
147 buffer.remove_peer(old_collaborator.replica_id, cx)
148 });
149 }
150 }
151 self.collaborators = new_collaborators;
152 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
153 cx.notify();
154 }
155
156 async fn handle_update_channel_buffer(
157 this: Entity<Self>,
158 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
159 mut cx: AsyncApp,
160 ) -> Result<()> {
161 let ops = update_channel_buffer
162 .payload
163 .operations
164 .into_iter()
165 .map(language::proto::deserialize_operation)
166 .collect::<Result<Vec<_>, _>>()?;
167
168 this.update(&mut cx, |this, cx| {
169 cx.notify();
170 this.buffer
171 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
172 })?;
173
174 Ok(())
175 }
176
177 async fn handle_update_channel_buffer_collaborators(
178 this: Entity<Self>,
179 message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
180 mut cx: AsyncApp,
181 ) -> Result<()> {
182 this.update(&mut cx, |this, cx| {
183 this.replace_collaborators(message.payload.collaborators, cx);
184 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
185 cx.notify();
186 })
187 }
188
189 fn on_buffer_update(
190 &mut self,
191 _: Entity<language::Buffer>,
192 event: &language::BufferEvent,
193 cx: &mut Context<Self>,
194 ) {
195 match event {
196 language::BufferEvent::Operation {
197 operation,
198 is_local: true,
199 } => {
200 if *ZED_ALWAYS_ACTIVE
201 && let language::Operation::UpdateSelections { selections, .. } = operation
202 && selections.is_empty()
203 {
204 return;
205 }
206 let operation = language::proto::serialize_operation(operation);
207 self.client
208 .send(proto::UpdateChannelBuffer {
209 channel_id: self.channel_id.0,
210 operations: vec![operation],
211 })
212 .log_err();
213 }
214 language::BufferEvent::Edited => {
215 cx.emit(ChannelBufferEvent::BufferEdited);
216 }
217 _ => {}
218 }
219 }
220
221 pub fn acknowledge_buffer_version(&mut self, cx: &mut Context<ChannelBuffer>) {
222 let buffer = self.buffer.read(cx);
223 let version = buffer.version();
224 let buffer_id = buffer.remote_id().into();
225 let client = self.client.clone();
226 let epoch = self.epoch();
227
228 self.acknowledge_task = Some(cx.spawn(async move |_, cx| {
229 cx.background_executor()
230 .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
231 .await;
232 client
233 .send(proto::AckBufferOperation {
234 buffer_id,
235 epoch,
236 version: serialize_version(&version),
237 })
238 .ok();
239 Ok(())
240 }));
241 }
242
243 pub fn epoch(&self) -> u64 {
244 self.buffer_epoch
245 }
246
247 pub fn buffer(&self) -> Entity<language::Buffer> {
248 self.buffer.clone()
249 }
250
251 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
252 &self.collaborators
253 }
254
255 pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
256 self.channel_store
257 .read(cx)
258 .channel_for_id(self.channel_id)
259 .cloned()
260 }
261
262 pub(crate) fn disconnect(&mut self, cx: &mut Context<Self>) {
263 log::info!("channel buffer {} disconnected", self.channel_id);
264 if self.connected {
265 self.connected = false;
266 self.subscription.take();
267 cx.emit(ChannelBufferEvent::Disconnected);
268 cx.notify()
269 }
270 }
271
272 pub(crate) fn channel_changed(&mut self, cx: &mut Context<Self>) {
273 cx.emit(ChannelBufferEvent::ChannelChanged);
274 cx.notify()
275 }
276
277 pub fn is_connected(&self) -> bool {
278 self.connected
279 }
280
281 pub fn replica_id(&self, cx: &App) -> ReplicaId {
282 self.buffer.read(cx).replica_id()
283 }
284}