1use crate::{Channel, ChannelId, ChannelStore};
2use anyhow::Result;
3use client::{Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
4use collections::HashMap;
5use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
6use language::proto::serialize_version;
7use rpc::{
8 proto::{self, PeerId},
9 TypedEnvelope,
10};
11use std::{sync::Arc, time::Duration};
12use util::ResultExt;
13
14pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
15
16pub(crate) fn init(client: &Arc<Client>) {
17 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
18 client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
19}
20
21pub struct ChannelBuffer {
22 pub channel_id: ChannelId,
23 connected: bool,
24 collaborators: HashMap<PeerId, Collaborator>,
25 user_store: Model<UserStore>,
26 channel_store: Model<ChannelStore>,
27 buffer: Model<language::Buffer>,
28 buffer_epoch: u64,
29 client: Arc<Client>,
30 subscription: Option<client::Subscription>,
31 acknowledge_task: Option<Task<Result<()>>>,
32}
33
34pub enum ChannelBufferEvent {
35 CollaboratorsChanged,
36 Disconnected,
37 BufferEdited,
38 ChannelChanged,
39}
40
41impl EventEmitter<ChannelBufferEvent> for ChannelBuffer {}
42
43impl ChannelBuffer {
44 pub(crate) async fn new(
45 channel: Arc<Channel>,
46 client: Arc<Client>,
47 user_store: Model<UserStore>,
48 channel_store: Model<ChannelStore>,
49 mut cx: AsyncAppContext,
50 ) -> Result<Model<Self>> {
51 let response = client
52 .request(proto::JoinChannelBuffer {
53 channel_id: channel.id,
54 })
55 .await?;
56
57 let base_text = response.base_text;
58 let operations = response
59 .operations
60 .into_iter()
61 .map(language::proto::deserialize_operation)
62 .collect::<Result<Vec<_>, _>>()?;
63
64 let buffer = cx.new_model(|cx| {
65 let capability = channel_store.read(cx).channel_capability(channel.id);
66 language::Buffer::remote(
67 response.buffer_id,
68 response.replica_id as u16,
69 capability,
70 base_text,
71 )
72 })?;
73 buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))??;
74
75 let subscription = client.subscribe_to_entity(channel.id)?;
76
77 anyhow::Ok(cx.new_model(|cx| {
78 cx.subscribe(&buffer, Self::on_buffer_update).detach();
79 cx.on_release(Self::release).detach();
80 let mut this = Self {
81 buffer,
82 buffer_epoch: response.epoch,
83 client,
84 connected: true,
85 collaborators: Default::default(),
86 acknowledge_task: None,
87 channel_id: channel.id,
88 subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
89 user_store,
90 channel_store,
91 };
92 this.replace_collaborators(response.collaborators, cx);
93 this
94 })?)
95 }
96
97 fn release(&mut self, _: &mut AppContext) {
98 if self.connected {
99 if let Some(task) = self.acknowledge_task.take() {
100 task.detach();
101 }
102 self.client
103 .send(proto::LeaveChannelBuffer {
104 channel_id: self.channel_id,
105 })
106 .log_err();
107 }
108 }
109
110 pub fn remote_id(&self, cx: &AppContext) -> u64 {
111 self.buffer.read(cx).remote_id()
112 }
113
114 pub fn user_store(&self) -> &Model<UserStore> {
115 &self.user_store
116 }
117
118 pub(crate) fn replace_collaborators(
119 &mut self,
120 collaborators: Vec<proto::Collaborator>,
121 cx: &mut ModelContext<Self>,
122 ) {
123 let mut new_collaborators = HashMap::default();
124 for collaborator in collaborators {
125 if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
126 new_collaborators.insert(collaborator.peer_id, collaborator);
127 }
128 }
129
130 for (_, old_collaborator) in &self.collaborators {
131 if !new_collaborators.contains_key(&old_collaborator.peer_id) {
132 self.buffer.update(cx, |buffer, cx| {
133 buffer.remove_peer(old_collaborator.replica_id as u16, cx)
134 });
135 }
136 }
137 self.collaborators = new_collaborators;
138 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
139 cx.notify();
140 }
141
142 async fn handle_update_channel_buffer(
143 this: Model<Self>,
144 update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
145 _: Arc<Client>,
146 mut cx: AsyncAppContext,
147 ) -> Result<()> {
148 let ops = update_channel_buffer
149 .payload
150 .operations
151 .into_iter()
152 .map(language::proto::deserialize_operation)
153 .collect::<Result<Vec<_>, _>>()?;
154
155 this.update(&mut cx, |this, cx| {
156 cx.notify();
157 this.buffer
158 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
159 })??;
160
161 Ok(())
162 }
163
164 async fn handle_update_channel_buffer_collaborators(
165 this: Model<Self>,
166 message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
167 _: Arc<Client>,
168 mut cx: AsyncAppContext,
169 ) -> Result<()> {
170 this.update(&mut cx, |this, cx| {
171 this.replace_collaborators(message.payload.collaborators, cx);
172 cx.emit(ChannelBufferEvent::CollaboratorsChanged);
173 cx.notify();
174 })
175 }
176
177 fn on_buffer_update(
178 &mut self,
179 _: Model<language::Buffer>,
180 event: &language::Event,
181 cx: &mut ModelContext<Self>,
182 ) {
183 match event {
184 language::Event::Operation(operation) => {
185 if *ZED_ALWAYS_ACTIVE {
186 match operation {
187 language::Operation::UpdateSelections { selections, .. } => {
188 if selections.is_empty() {
189 return;
190 }
191 }
192 _ => {}
193 }
194 }
195 let operation = language::proto::serialize_operation(operation);
196 self.client
197 .send(proto::UpdateChannelBuffer {
198 channel_id: self.channel_id,
199 operations: vec![operation],
200 })
201 .log_err();
202 }
203 language::Event::Edited => {
204 cx.emit(ChannelBufferEvent::BufferEdited);
205 }
206 _ => {}
207 }
208 }
209
210 pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
211 let buffer = self.buffer.read(cx);
212 let version = buffer.version();
213 let buffer_id = buffer.remote_id();
214 let client = self.client.clone();
215 let epoch = self.epoch();
216
217 self.acknowledge_task = Some(cx.spawn(move |_, cx| async move {
218 cx.background_executor()
219 .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
220 .await;
221 client
222 .send(proto::AckBufferOperation {
223 buffer_id,
224 epoch,
225 version: serialize_version(&version),
226 })
227 .ok();
228 Ok(())
229 }));
230 }
231
232 pub fn epoch(&self) -> u64 {
233 self.buffer_epoch
234 }
235
236 pub fn buffer(&self) -> Model<language::Buffer> {
237 self.buffer.clone()
238 }
239
240 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
241 &self.collaborators
242 }
243
244 pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
245 self.channel_store
246 .read(cx)
247 .channel_for_id(self.channel_id)
248 .cloned()
249 }
250
251 pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
252 log::info!("channel buffer {} disconnected", self.channel_id);
253 if self.connected {
254 self.connected = false;
255 self.subscription.take();
256 cx.emit(ChannelBufferEvent::Disconnected);
257 cx.notify()
258 }
259 }
260
261 pub(crate) fn channel_changed(&mut self, cx: &mut ModelContext<Self>) {
262 cx.emit(ChannelBufferEvent::ChannelChanged);
263 cx.notify()
264 }
265
266 pub fn is_connected(&self) -> bool {
267 self.connected
268 }
269
270 pub fn replica_id(&self, cx: &AppContext) -> u16 {
271 self.buffer.read(cx).replica_id()
272 }
273}