1use crate::{Channel, ChannelStore};
2use anyhow::Result;
3use client::{ChannelId, Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
4use collections::HashMap;
5use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task};
6use language::proto::serialize_version;
7use rpc::{
8 AnyProtoClient, TypedEnvelope,
9 proto::{self, PeerId},
10};
11use std::{sync::Arc, time::Duration};
12use text::BufferId;
13use util::ResultExt;
14
15pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
16
17pub(crate) fn init(client: &AnyProtoClient) {
18 client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer);
19 client.add_entity_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
20}
21
22pub struct ChannelBuffer {
23 pub channel_id: ChannelId,
24 connected: bool,
25 collaborators: HashMap<PeerId, Collaborator>,
26 user_store: Entity<UserStore>,
27 channel_store: Entity<ChannelStore>,
28 buffer: Entity<language::Buffer>,
29 buffer_epoch: u64,
30 client: Arc<Client>,
31 subscription: Option<client::Subscription>,
32 acknowledge_task: Option<Task<Result<()>>>,
33}
34
35pub enum ChannelBufferEvent {
36 CollaboratorsChanged,
37 Disconnected,
38 BufferEdited,
39 ChannelChanged,
40}
41
42impl EventEmitter<ChannelBufferEvent> for ChannelBuffer {}
43
44impl ChannelBuffer {
45 pub(crate) async fn new(
46 channel: Arc<Channel>,
47 client: Arc<Client>,
48 user_store: Entity<UserStore>,
49 channel_store: Entity<ChannelStore>,
50 cx: &mut AsyncApp,
51 ) -> Result<Entity<Self>> {
52 let response = client
53 .request(proto::JoinChannelBuffer {
54 channel_id: channel.id.0,
55 })
56 .await?;
57 let buffer_id = BufferId::new(response.buffer_id)?;
58 let base_text = response.base_text;
59 let operations = response
60 .operations
61 .into_iter()
62 .map(language::proto::deserialize_operation)
63 .collect::<Result<Vec<_>, _>>()?;
64
65 let buffer = cx.new(|cx| {
66 let capability = channel_store.read(cx).channel_capability(channel.id);
67 language::Buffer::remote(buffer_id, response.replica_id as u16, capability, base_text)
68 })?;
69 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
70
71 let subscription = client.subscribe_to_entity(channel.id.0)?;
72
73 anyhow::Ok(cx.new(|cx| {
74 cx.subscribe(&buffer, Self::on_buffer_update).detach();
75 cx.on_release(Self::release).detach();
76 let mut this = Self {
77 buffer,
78 buffer_epoch: response.epoch,
79 client,
80 connected: true,
81 collaborators: Default::default(),
82 acknowledge_task: None,
83 channel_id: channel.id,
84 subscription: Some(subscription.set_entity(&cx.entity(), &mut cx.to_async())),
85 user_store,
86 channel_store,
87 };
88 this.replace_collaborators(response.collaborators, cx);
89 this
90 })?)
91 }
92
93 fn release(&mut self, _: &mut App) {
94 if self.connected {
95 if let Some(task) = self.acknowledge_task.take() {
96 task.detach();
97 }
98 self.client
99 .send(proto::LeaveChannelBuffer {
100 channel_id: self.channel_id.0,
101 })
102 .log_err();
103 }
104 }
105
106 pub fn remote_id(&self, cx: &App) -> BufferId {
107 self.buffer.read(cx).remote_id()
108 }
109
110 pub fn user_store(&self) -> &Entity<UserStore> {
111 &self.user_store
112 }
113
114 pub(crate) fn replace_collaborators(
115 &mut self,
116 collaborators: Vec<proto::Collaborator>,
117 cx: &mut Context<Self>,
118 ) {
119 let mut new_collaborators = HashMap::default();
120 for collaborator in collaborators {
121 if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
122 new_collaborators.insert(collaborator.peer_id, collaborator);
123 }
124 }
125
126 for (_, old_collaborator) in &self.collaborators {
127 if !new_collaborators.contains_key(&old_collaborator.peer_id) {
128 self.buffer.update(cx, |buffer, cx| {
129 buffer.remove_peer(old_collaborator.replica_id, cx)
130 });
131 }
132 }
133 self.collaborators = new_collaborators;
134 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
135 cx.notify();
136 }
137
138 async fn handle_update_channel_buffer(
139 this: Entity<Self>,
140 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
141 mut cx: AsyncApp,
142 ) -> Result<()> {
143 let ops = update_channel_buffer
144 .payload
145 .operations
146 .into_iter()
147 .map(language::proto::deserialize_operation)
148 .collect::<Result<Vec<_>, _>>()?;
149
150 this.update(&mut cx, |this, cx| {
151 cx.notify();
152 this.buffer
153 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
154 })?;
155
156 Ok(())
157 }
158
159 async fn handle_update_channel_buffer_collaborators(
160 this: Entity<Self>,
161 message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
162 mut cx: AsyncApp,
163 ) -> Result<()> {
164 this.update(&mut cx, |this, cx| {
165 this.replace_collaborators(message.payload.collaborators, cx);
166 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
167 cx.notify();
168 })
169 }
170
171 fn on_buffer_update(
172 &mut self,
173 _: Entity<language::Buffer>,
174 event: &language::BufferEvent,
175 cx: &mut Context<Self>,
176 ) {
177 match event {
178 language::BufferEvent::Operation {
179 operation,
180 is_local: true,
181 } => {
182 if *ZED_ALWAYS_ACTIVE {
183 if let language::Operation::UpdateSelections { selections, .. } = operation {
184 if selections.is_empty() {
185 return;
186 }
187 }
188 }
189 let operation = language::proto::serialize_operation(operation);
190 self.client
191 .send(proto::UpdateChannelBuffer {
192 channel_id: self.channel_id.0,
193 operations: vec![operation],
194 })
195 .log_err();
196 }
197 language::BufferEvent::Edited => {
198 cx.emit(ChannelBufferEvent::BufferEdited);
199 }
200 _ => {}
201 }
202 }
203
204 pub fn acknowledge_buffer_version(&mut self, cx: &mut Context<ChannelBuffer>) {
205 let buffer = self.buffer.read(cx);
206 let version = buffer.version();
207 let buffer_id = buffer.remote_id().into();
208 let client = self.client.clone();
209 let epoch = self.epoch();
210
211 self.acknowledge_task = Some(cx.spawn(async move |_, cx| {
212 cx.background_executor()
213 .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
214 .await;
215 client
216 .send(proto::AckBufferOperation {
217 buffer_id,
218 epoch,
219 version: serialize_version(&version),
220 })
221 .ok();
222 Ok(())
223 }));
224 }
225
226 pub fn epoch(&self) -> u64 {
227 self.buffer_epoch
228 }
229
230 pub fn buffer(&self) -> Entity<language::Buffer> {
231 self.buffer.clone()
232 }
233
234 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
235 &self.collaborators
236 }
237
238 pub fn channel(&self, cx: &App) -> Option<Arc<Channel>> {
239 self.channel_store
240 .read(cx)
241 .channel_for_id(self.channel_id)
242 .cloned()
243 }
244
245 pub(crate) fn disconnect(&mut self, cx: &mut Context<Self>) {
246 log::info!("channel buffer {} disconnected", self.channel_id);
247 if self.connected {
248 self.connected = false;
249 self.subscription.take();
250 cx.emit(ChannelBufferEvent::Disconnected);
251 cx.notify()
252 }
253 }
254
255 pub(crate) fn channel_changed(&mut self, cx: &mut Context<Self>) {
256 cx.emit(ChannelBufferEvent::ChannelChanged);
257 cx.notify()
258 }
259
260 pub fn is_connected(&self) -> bool {
261 self.connected
262 }
263
264 pub fn replica_id(&self, cx: &App) -> u16 {
265 self.buffer.read(cx).replica_id()
266 }
267}