1use crate::ProjectPath;
2use anyhow::{anyhow, Context as _, Result};
3use collections::{hash_map, HashMap};
4use futures::{channel::oneshot, StreamExt as _};
5use gpui::{
6 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
7};
8use language::{
9 proto::{deserialize_version, serialize_version, split_operations},
10 Buffer, Capability, Language, Operation,
11};
12use rpc::{
13 proto::{self, AnyProtoClient, PeerId},
14 ErrorExt as _, TypedEnvelope,
15};
16use std::{io, path::Path, sync::Arc};
17use text::BufferId;
18use util::{debug_panic, maybe, ResultExt as _};
19use worktree::{File, ProjectEntryId, RemoteWorktree, Worktree};
20
21/// A set of open buffers.
22pub struct BufferStore {
23 retain_buffers: bool,
24 opened_buffers: HashMap<BufferId, OpenBuffer>,
25 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
26 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
27 #[allow(clippy::type_complexity)]
28 loading_buffers_by_path: HashMap<
29 ProjectPath,
30 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
31 >,
32 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
33 remote_buffer_listeners:
34 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
35}
36
37enum OpenBuffer {
38 Strong(Model<Buffer>),
39 Weak(WeakModel<Buffer>),
40 Operations(Vec<Operation>),
41}
42
43pub enum BufferStoreEvent {
44 BufferAdded(Model<Buffer>),
45 BufferChangedFilePath {
46 buffer: Model<Buffer>,
47 old_file: Option<Arc<File>>,
48 },
49 BufferSaved {
50 buffer: Model<Buffer>,
51 has_changed_file: bool,
52 saved_version: clock::Global,
53 },
54}
55
56impl EventEmitter<BufferStoreEvent> for BufferStore {}
57
58impl BufferStore {
59 /// Creates a buffer store, optionally retaining its buffers.
60 ///
61 /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
62 /// and won't be released unless they are explicitly removed, or `retain_buffers`
63 /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
64 /// weak handles.
65 pub fn new(retain_buffers: bool) -> Self {
66 Self {
67 retain_buffers,
68 opened_buffers: Default::default(),
69 remote_buffer_listeners: Default::default(),
70 loading_remote_buffers_by_id: Default::default(),
71 local_buffer_ids_by_path: Default::default(),
72 local_buffer_ids_by_entry_id: Default::default(),
73 loading_buffers_by_path: Default::default(),
74 }
75 }
76
77 pub fn open_buffer(
78 &mut self,
79 project_path: ProjectPath,
80 worktree: Model<Worktree>,
81 cx: &mut ModelContext<Self>,
82 ) -> Task<Result<Model<Buffer>>> {
83 let existing_buffer = self.get_by_path(&project_path, cx);
84 if let Some(existing_buffer) = existing_buffer {
85 return Task::ready(Ok(existing_buffer));
86 }
87
88 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
89 // If the given path is already being loaded, then wait for that existing
90 // task to complete and return the same buffer.
91 hash_map::Entry::Occupied(e) => e.get().clone(),
92
93 // Otherwise, record the fact that this path is now being loaded.
94 hash_map::Entry::Vacant(entry) => {
95 let (mut tx, rx) = postage::watch::channel();
96 entry.insert(rx.clone());
97
98 let project_path = project_path.clone();
99 let load_buffer = match worktree.read(cx) {
100 Worktree::Local(_) => {
101 self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
102 }
103 Worktree::Remote(tree) => {
104 self.open_remote_buffer_internal(&project_path.path, tree, cx)
105 }
106 };
107
108 cx.spawn(move |this, mut cx| async move {
109 let load_result = load_buffer.await;
110 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
111 // Record the fact that the buffer is no longer loading.
112 this.loading_buffers_by_path.remove(&project_path);
113 let buffer = load_result.map_err(Arc::new)?;
114 Ok(buffer)
115 })?);
116 anyhow::Ok(())
117 })
118 .detach();
119 rx
120 }
121 };
122
123 cx.background_executor().spawn(async move {
124 Self::wait_for_loading_buffer(loading_watch)
125 .await
126 .map_err(|e| e.cloned())
127 })
128 }
129
130 fn open_local_buffer_internal(
131 &mut self,
132 path: Arc<Path>,
133 worktree: Model<Worktree>,
134 cx: &mut ModelContext<Self>,
135 ) -> Task<Result<Model<Buffer>>> {
136 let load_buffer = worktree.update(cx, |worktree, cx| {
137 let load_file = worktree.load_file(path.as_ref(), cx);
138 let reservation = cx.reserve_model();
139 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
140 cx.spawn(move |_, mut cx| async move {
141 let loaded = load_file.await?;
142 let text_buffer = cx
143 .background_executor()
144 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
145 .await;
146 cx.insert_model(reservation, |_| {
147 Buffer::build(
148 text_buffer,
149 loaded.diff_base,
150 Some(loaded.file),
151 Capability::ReadWrite,
152 )
153 })
154 })
155 });
156
157 cx.spawn(move |this, mut cx| async move {
158 let buffer = match load_buffer.await {
159 Ok(buffer) => Ok(buffer),
160 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
161 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
162 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
163 Buffer::build(
164 text_buffer,
165 None,
166 Some(Arc::new(File {
167 worktree,
168 path,
169 mtime: None,
170 entry_id: None,
171 is_local: true,
172 is_deleted: false,
173 is_private: false,
174 })),
175 Capability::ReadWrite,
176 )
177 }),
178 Err(e) => Err(e),
179 }?;
180 this.update(&mut cx, |this, cx| {
181 this.add_buffer(buffer.clone(), cx).log_err();
182 })?;
183 Ok(buffer)
184 })
185 }
186
187 fn open_remote_buffer_internal(
188 &self,
189 path: &Arc<Path>,
190 worktree: &RemoteWorktree,
191 cx: &ModelContext<Self>,
192 ) -> Task<Result<Model<Buffer>>> {
193 let worktree_id = worktree.id().to_proto();
194 let project_id = worktree.project_id();
195 let client = worktree.client();
196 let path_string = path.clone().to_string_lossy().to_string();
197 cx.spawn(move |this, mut cx| async move {
198 let response = client
199 .request(proto::OpenBufferByPath {
200 project_id,
201 worktree_id,
202 path: path_string,
203 })
204 .await?;
205 let buffer_id = BufferId::new(response.buffer_id)?;
206 this.update(&mut cx, |this, cx| {
207 this.wait_for_remote_buffer(buffer_id, cx)
208 })?
209 .await
210 })
211 }
212
213 pub fn create_buffer(
214 &mut self,
215 remote_client: Option<(AnyProtoClient, u64)>,
216 cx: &mut ModelContext<Self>,
217 ) -> Task<Result<Model<Buffer>>> {
218 if let Some((remote_client, project_id)) = remote_client {
219 let create = remote_client.request(proto::OpenNewBuffer { project_id });
220 cx.spawn(|this, mut cx| async move {
221 let response = create.await?;
222 let buffer_id = BufferId::new(response.buffer_id)?;
223
224 this.update(&mut cx, |this, cx| {
225 this.wait_for_remote_buffer(buffer_id, cx)
226 })?
227 .await
228 })
229 } else {
230 Task::ready(Ok(self.create_local_buffer("", None, cx)))
231 }
232 }
233
234 pub fn create_local_buffer(
235 &mut self,
236 text: &str,
237 language: Option<Arc<Language>>,
238 cx: &mut ModelContext<Self>,
239 ) -> Model<Buffer> {
240 let buffer = cx.new_model(|cx| {
241 Buffer::local(text, cx)
242 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
243 });
244 self.add_buffer(buffer.clone(), cx).log_err();
245 buffer
246 }
247
248 pub fn save_buffer(
249 &mut self,
250 buffer: Model<Buffer>,
251 cx: &mut ModelContext<Self>,
252 ) -> Task<Result<()>> {
253 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
254 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
255 };
256 match file.worktree.read(cx) {
257 Worktree::Local(_) => {
258 self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
259 }
260 Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
261 }
262 }
263
264 pub fn save_buffer_as(
265 &mut self,
266 buffer: Model<Buffer>,
267 path: ProjectPath,
268 worktree: Model<Worktree>,
269 cx: &mut ModelContext<Self>,
270 ) -> Task<Result<()>> {
271 let old_file = File::from_dyn(buffer.read(cx).file())
272 .cloned()
273 .map(Arc::new);
274
275 let task = match worktree.read(cx) {
276 Worktree::Local(_) => {
277 self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
278 }
279 Worktree::Remote(tree) => {
280 self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
281 }
282 };
283 cx.spawn(|this, mut cx| async move {
284 task.await?;
285 this.update(&mut cx, |_, cx| {
286 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
287 })
288 })
289 }
290
291 fn save_local_buffer(
292 &self,
293 worktree: Model<Worktree>,
294 buffer_handle: Model<Buffer>,
295 path: Arc<Path>,
296 mut has_changed_file: bool,
297 cx: &mut ModelContext<Self>,
298 ) -> Task<Result<()>> {
299 let buffer = buffer_handle.read(cx);
300 let text = buffer.as_rope().clone();
301 let line_ending = buffer.line_ending();
302 let version = buffer.version();
303 if buffer.file().is_some_and(|file| !file.is_created()) {
304 has_changed_file = true;
305 }
306
307 let save = worktree.update(cx, |worktree, cx| {
308 worktree.write_file(path.as_ref(), text, line_ending, cx)
309 });
310
311 cx.spawn(move |this, mut cx| async move {
312 let new_file = save.await?;
313 let mtime = new_file.mtime;
314 buffer_handle.update(&mut cx, |buffer, cx| {
315 if has_changed_file {
316 buffer.file_updated(new_file, cx);
317 }
318 buffer.did_save(version.clone(), mtime, cx);
319 })?;
320 this.update(&mut cx, |_, cx| {
321 cx.emit(BufferStoreEvent::BufferSaved {
322 buffer: buffer_handle,
323 has_changed_file,
324 saved_version: version,
325 })
326 })?;
327 Ok(())
328 })
329 }
330
331 fn save_remote_buffer(
332 &self,
333 buffer_handle: Model<Buffer>,
334 new_path: Option<proto::ProjectPath>,
335 tree: &RemoteWorktree,
336 cx: &ModelContext<Self>,
337 ) -> Task<Result<()>> {
338 let buffer = buffer_handle.read(cx);
339 let buffer_id = buffer.remote_id().into();
340 let version = buffer.version();
341 let rpc = tree.client();
342 let project_id = tree.project_id();
343 cx.spawn(move |_, mut cx| async move {
344 let response = rpc
345 .request(proto::SaveBuffer {
346 project_id,
347 buffer_id,
348 new_path,
349 version: serialize_version(&version),
350 })
351 .await?;
352 let version = deserialize_version(&response.version);
353 let mtime = response.mtime.map(|mtime| mtime.into());
354
355 buffer_handle.update(&mut cx, |buffer, cx| {
356 buffer.did_save(version.clone(), mtime, cx);
357 })?;
358
359 Ok(())
360 })
361 }
362
363 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
364 let remote_id = buffer.read(cx).remote_id();
365 let is_remote = buffer.read(cx).replica_id() != 0;
366 let open_buffer = if self.retain_buffers {
367 OpenBuffer::Strong(buffer.clone())
368 } else {
369 OpenBuffer::Weak(buffer.downgrade())
370 };
371
372 match self.opened_buffers.entry(remote_id) {
373 hash_map::Entry::Vacant(entry) => {
374 entry.insert(open_buffer);
375 }
376 hash_map::Entry::Occupied(mut entry) => {
377 if let OpenBuffer::Operations(operations) = entry.get_mut() {
378 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
379 } else if entry.get().upgrade().is_some() {
380 if is_remote {
381 return Ok(());
382 } else {
383 debug_panic!("buffer {} was already registered", remote_id);
384 Err(anyhow!("buffer {} was already registered", remote_id))?;
385 }
386 }
387 entry.insert(open_buffer);
388 }
389 }
390
391 if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
392 for sender in senders {
393 sender.send(Ok(buffer.clone())).ok();
394 }
395 }
396
397 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
398 if file.is_local {
399 self.local_buffer_ids_by_path.insert(
400 ProjectPath {
401 worktree_id: file.worktree_id(cx),
402 path: file.path.clone(),
403 },
404 remote_id,
405 );
406
407 if let Some(entry_id) = file.entry_id {
408 self.local_buffer_ids_by_entry_id
409 .insert(entry_id, remote_id);
410 }
411 }
412 }
413
414 cx.emit(BufferStoreEvent::BufferAdded(buffer));
415 Ok(())
416 }
417
418 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
419 self.opened_buffers
420 .values()
421 .filter_map(|buffer| buffer.upgrade())
422 }
423
424 pub fn loading_buffers(
425 &self,
426 ) -> impl Iterator<
427 Item = (
428 &ProjectPath,
429 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
430 ),
431 > {
432 self.loading_buffers_by_path
433 .iter()
434 .map(|(path, rx)| (path, rx.clone()))
435 }
436
437 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
438 self.buffers().find_map(|buffer| {
439 let file = File::from_dyn(buffer.read(cx).file())?;
440 if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
441 Some(buffer)
442 } else {
443 None
444 }
445 })
446 }
447
448 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
449 self.opened_buffers
450 .get(&buffer_id)
451 .and_then(|buffer| buffer.upgrade())
452 }
453
454 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
455 self.get(buffer_id)
456 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
457 }
458
459 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
460 self.get(buffer_id)
461 .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
462 }
463
464 fn get_or_remove_by_path(
465 &mut self,
466 entry_id: ProjectEntryId,
467 project_path: &ProjectPath,
468 ) -> Option<(BufferId, Model<Buffer>)> {
469 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
470 Some(&buffer_id) => buffer_id,
471 None => match self.local_buffer_ids_by_path.get(project_path) {
472 Some(&buffer_id) => buffer_id,
473 None => {
474 return None;
475 }
476 },
477 };
478 let buffer = if let Some(buffer) = self.get(buffer_id) {
479 buffer
480 } else {
481 self.opened_buffers.remove(&buffer_id);
482 self.local_buffer_ids_by_path.remove(project_path);
483 self.local_buffer_ids_by_entry_id.remove(&entry_id);
484 return None;
485 };
486 Some((buffer_id, buffer))
487 }
488
489 pub fn wait_for_remote_buffer(
490 &mut self,
491 id: BufferId,
492 cx: &mut AppContext,
493 ) -> Task<Result<Model<Buffer>>> {
494 let buffer = self.get(id);
495 if let Some(buffer) = buffer {
496 return Task::ready(Ok(buffer));
497 }
498 let (tx, rx) = oneshot::channel();
499 self.remote_buffer_listeners.entry(id).or_default().push(tx);
500 cx.background_executor().spawn(async move { rx.await? })
501 }
502
503 pub fn buffer_version_info(
504 &self,
505 cx: &AppContext,
506 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
507 let buffers = self
508 .buffers()
509 .map(|buffer| {
510 let buffer = buffer.read(cx);
511 proto::BufferVersion {
512 id: buffer.remote_id().into(),
513 version: language::proto::serialize_version(&buffer.version),
514 }
515 })
516 .collect();
517 let incomplete_buffer_ids = self
518 .loading_remote_buffers_by_id
519 .keys()
520 .copied()
521 .collect::<Vec<_>>();
522 (buffers, incomplete_buffer_ids)
523 }
524
525 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
526 self.set_retain_buffers(false, cx);
527
528 for buffer in self.buffers() {
529 buffer.update(cx, |buffer, cx| {
530 buffer.set_capability(Capability::ReadOnly, cx)
531 });
532 }
533
534 // Wake up all futures currently waiting on a buffer to get opened,
535 // to give them a chance to fail now that we've disconnected.
536 self.remote_buffer_listeners.clear();
537 }
538
539 pub fn set_retain_buffers(&mut self, retain_buffers: bool, cx: &mut AppContext) {
540 self.retain_buffers = retain_buffers;
541 for open_buffer in self.opened_buffers.values_mut() {
542 if retain_buffers {
543 if let OpenBuffer::Weak(buffer) = open_buffer {
544 if let Some(buffer) = buffer.upgrade() {
545 *open_buffer = OpenBuffer::Strong(buffer);
546 }
547 }
548 } else {
549 if let Some(buffer) = open_buffer.upgrade() {
550 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
551 }
552 if let OpenBuffer::Strong(buffer) = open_buffer {
553 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
554 }
555 }
556 }
557 }
558
559 pub fn discard_incomplete(&mut self) {
560 self.opened_buffers
561 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
562 }
563
564 pub fn file_changed(
565 &mut self,
566 path: Arc<Path>,
567 entry_id: ProjectEntryId,
568 worktree_handle: &Model<worktree::Worktree>,
569 snapshot: &worktree::Snapshot,
570 cx: &mut ModelContext<Self>,
571 ) -> Option<(Model<Buffer>, Arc<File>, Arc<File>)> {
572 let (buffer_id, buffer) = self.get_or_remove_by_path(
573 entry_id,
574 &ProjectPath {
575 worktree_id: snapshot.id(),
576 path,
577 },
578 )?;
579
580 let result = buffer.update(cx, |buffer, cx| {
581 let old_file = File::from_dyn(buffer.file())?;
582 if old_file.worktree != *worktree_handle {
583 return None;
584 }
585
586 let new_file = if let Some(entry) = old_file
587 .entry_id
588 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
589 {
590 File {
591 is_local: true,
592 entry_id: Some(entry.id),
593 mtime: entry.mtime,
594 path: entry.path.clone(),
595 worktree: worktree_handle.clone(),
596 is_deleted: false,
597 is_private: entry.is_private,
598 }
599 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
600 File {
601 is_local: true,
602 entry_id: Some(entry.id),
603 mtime: entry.mtime,
604 path: entry.path.clone(),
605 worktree: worktree_handle.clone(),
606 is_deleted: false,
607 is_private: entry.is_private,
608 }
609 } else {
610 File {
611 is_local: true,
612 entry_id: old_file.entry_id,
613 path: old_file.path.clone(),
614 mtime: old_file.mtime,
615 worktree: worktree_handle.clone(),
616 is_deleted: true,
617 is_private: old_file.is_private,
618 }
619 };
620
621 if new_file == *old_file {
622 return None;
623 }
624
625 let old_file = Arc::new(old_file.clone());
626 let new_file = Arc::new(new_file);
627 buffer.file_updated(new_file.clone(), cx);
628 Some((cx.handle(), old_file, new_file))
629 });
630
631 if let Some((buffer, old_file, new_file)) = &result {
632 if new_file.path != old_file.path {
633 self.local_buffer_ids_by_path.remove(&ProjectPath {
634 path: old_file.path.clone(),
635 worktree_id: old_file.worktree_id(cx),
636 });
637 self.local_buffer_ids_by_path.insert(
638 ProjectPath {
639 worktree_id: new_file.worktree_id(cx),
640 path: new_file.path.clone(),
641 },
642 buffer_id,
643 );
644 cx.emit(BufferStoreEvent::BufferChangedFilePath {
645 buffer: buffer.clone(),
646 old_file: Some(old_file.clone()),
647 });
648 }
649
650 if new_file.entry_id != old_file.entry_id {
651 if let Some(entry_id) = old_file.entry_id {
652 self.local_buffer_ids_by_entry_id.remove(&entry_id);
653 }
654 if let Some(entry_id) = new_file.entry_id {
655 self.local_buffer_ids_by_entry_id
656 .insert(entry_id, buffer_id);
657 }
658 }
659 }
660
661 result
662 }
663
664 pub fn buffer_changed_file(
665 &mut self,
666 buffer: Model<Buffer>,
667 cx: &mut AppContext,
668 ) -> Option<()> {
669 let file = File::from_dyn(buffer.read(cx).file())?;
670
671 let remote_id = buffer.read(cx).remote_id();
672 if let Some(entry_id) = file.entry_id {
673 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
674 Some(_) => {
675 return None;
676 }
677 None => {
678 self.local_buffer_ids_by_entry_id
679 .insert(entry_id, remote_id);
680 }
681 }
682 };
683 self.local_buffer_ids_by_path.insert(
684 ProjectPath {
685 worktree_id: file.worktree_id(cx),
686 path: file.path.clone(),
687 },
688 remote_id,
689 );
690
691 Some(())
692 }
693
694 pub async fn create_buffer_for_peer(
695 this: Model<Self>,
696 peer_id: PeerId,
697 buffer_id: BufferId,
698 project_id: u64,
699 client: AnyProtoClient,
700 cx: &mut AsyncAppContext,
701 ) -> Result<()> {
702 let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
703 return Ok(());
704 };
705
706 let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
707 let operations = operations.await;
708 let state = buffer.update(cx, |buffer, _| buffer.to_proto())?;
709
710 let initial_state = proto::CreateBufferForPeer {
711 project_id,
712 peer_id: Some(peer_id),
713 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
714 };
715
716 if client.send(initial_state).log_err().is_some() {
717 let client = client.clone();
718 cx.background_executor()
719 .spawn(async move {
720 let mut chunks = split_operations(operations).peekable();
721 while let Some(chunk) = chunks.next() {
722 let is_last = chunks.peek().is_none();
723 client.send(proto::CreateBufferForPeer {
724 project_id,
725 peer_id: Some(peer_id),
726 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
727 proto::BufferChunk {
728 buffer_id: buffer_id.into(),
729 operations: chunk,
730 is_last,
731 },
732 )),
733 })?;
734 }
735 anyhow::Ok(())
736 })
737 .await
738 .log_err();
739 }
740 Ok(())
741 }
742
743 pub fn handle_update_buffer(
744 &mut self,
745 envelope: TypedEnvelope<proto::UpdateBuffer>,
746 is_remote: bool,
747 cx: &mut AppContext,
748 ) -> Result<proto::Ack> {
749 let payload = envelope.payload.clone();
750 let buffer_id = BufferId::new(payload.buffer_id)?;
751 let ops = payload
752 .operations
753 .into_iter()
754 .map(language::proto::deserialize_operation)
755 .collect::<Result<Vec<_>, _>>()?;
756 match self.opened_buffers.entry(buffer_id) {
757 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
758 OpenBuffer::Strong(buffer) => {
759 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
760 }
761 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
762 OpenBuffer::Weak(_) => {}
763 },
764 hash_map::Entry::Vacant(e) => {
765 if !is_remote {
766 debug_panic!(
767 "received buffer update from {:?}",
768 envelope.original_sender_id
769 );
770 return Err(anyhow!("received buffer update for non-remote project"));
771 }
772 e.insert(OpenBuffer::Operations(ops));
773 }
774 }
775 Ok(proto::Ack {})
776 }
777
778 pub fn handle_create_buffer_for_peer(
779 &mut self,
780 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
781 mut worktrees: impl Iterator<Item = Model<Worktree>>,
782 replica_id: u16,
783 capability: Capability,
784 cx: &mut ModelContext<Self>,
785 ) -> Result<()> {
786 match envelope
787 .payload
788 .variant
789 .ok_or_else(|| anyhow!("missing variant"))?
790 {
791 proto::create_buffer_for_peer::Variant::State(mut state) => {
792 let buffer_id = BufferId::new(state.id)?;
793
794 let buffer_result = maybe!({
795 let mut buffer_file = None;
796 if let Some(file) = state.file.take() {
797 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
798 let worktree = worktrees
799 .find(|worktree| worktree.read(cx).id() == worktree_id)
800 .ok_or_else(|| {
801 anyhow!("no worktree found for id {}", file.worktree_id)
802 })?;
803 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
804 as Arc<dyn language::File>);
805 }
806 Buffer::from_proto(replica_id, capability, state, buffer_file)
807 });
808
809 match buffer_result {
810 Ok(buffer) => {
811 let buffer = cx.new_model(|_| buffer);
812 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
813 }
814 Err(error) => {
815 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
816 for listener in listeners {
817 listener.send(Err(anyhow!(error.cloned()))).ok();
818 }
819 }
820 }
821 }
822 }
823 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
824 let buffer_id = BufferId::new(chunk.buffer_id)?;
825 let buffer = self
826 .loading_remote_buffers_by_id
827 .get(&buffer_id)
828 .cloned()
829 .ok_or_else(|| {
830 anyhow!(
831 "received chunk for buffer {} without initial state",
832 chunk.buffer_id
833 )
834 })?;
835
836 let result = maybe!({
837 let operations = chunk
838 .operations
839 .into_iter()
840 .map(language::proto::deserialize_operation)
841 .collect::<Result<Vec<_>>>()?;
842 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
843 });
844
845 if let Err(error) = result {
846 self.loading_remote_buffers_by_id.remove(&buffer_id);
847 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
848 for listener in listeners {
849 listener.send(Err(error.cloned())).ok();
850 }
851 }
852 } else if chunk.is_last {
853 self.loading_remote_buffers_by_id.remove(&buffer_id);
854 self.add_buffer(buffer, cx)?;
855 }
856 }
857 }
858
859 Ok(())
860 }
861
862 pub async fn handle_save_buffer(
863 this: Model<Self>,
864 project_id: u64,
865 worktree: Option<Model<Worktree>>,
866 envelope: TypedEnvelope<proto::SaveBuffer>,
867 mut cx: AsyncAppContext,
868 ) -> Result<proto::BufferSaved> {
869 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
870 let buffer = this.update(&mut cx, |this, _| this.get_existing(buffer_id))??;
871 buffer
872 .update(&mut cx, |buffer, _| {
873 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
874 })?
875 .await?;
876 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
877
878 if let Some(new_path) = envelope.payload.new_path {
879 let worktree = worktree.context("no such worktree")?;
880 let new_path = ProjectPath::from_proto(new_path);
881 this.update(&mut cx, |this, cx| {
882 this.save_buffer_as(buffer.clone(), new_path, worktree, cx)
883 })?
884 .await?;
885 } else {
886 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
887 .await?;
888 }
889
890 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
891 project_id,
892 buffer_id: buffer_id.into(),
893 version: serialize_version(buffer.saved_version()),
894 mtime: buffer.saved_mtime().map(|time| time.into()),
895 })
896 }
897
898 pub async fn wait_for_loading_buffer(
899 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
900 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
901 loop {
902 if let Some(result) = receiver.borrow().as_ref() {
903 match result {
904 Ok(buffer) => return Ok(buffer.to_owned()),
905 Err(e) => return Err(e.to_owned()),
906 }
907 }
908 receiver.next().await;
909 }
910 }
911}
912
913impl OpenBuffer {
914 fn upgrade(&self) -> Option<Model<Buffer>> {
915 match self {
916 OpenBuffer::Strong(handle) => Some(handle.clone()),
917 OpenBuffer::Weak(handle) => handle.upgrade(),
918 OpenBuffer::Operations(_) => None,
919 }
920 }
921}
922
923fn is_not_found_error(error: &anyhow::Error) -> bool {
924 error
925 .root_cause()
926 .downcast_ref::<io::Error>()
927 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
928}