1use crate::{
2 worktree_store::{WorktreeStore, WorktreeStoreEvent},
3 ProjectPath,
4};
5use anyhow::{anyhow, Result};
6use collections::{hash_map, HashMap};
7use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
8use gpui::{
9 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
10};
11use language::{
12 proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
13 Buffer, Capability, Event as BufferEvent, Language, Operation,
14};
15use rpc::{
16 proto::{self, AnyProtoClient, PeerId},
17 ErrorExt as _, TypedEnvelope,
18};
19use std::{io, path::Path, sync::Arc};
20use text::BufferId;
21use util::{debug_panic, maybe, ResultExt as _};
22use worktree::{
23 File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
24};
25
26/// A set of open buffers.
27pub struct BufferStore {
28 retain_buffers: bool,
29 #[allow(unused)]
30 worktree_store: Model<WorktreeStore>,
31 opened_buffers: HashMap<BufferId, OpenBuffer>,
32 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
33 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
34 #[allow(clippy::type_complexity)]
35 loading_buffers_by_path: HashMap<
36 ProjectPath,
37 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
38 >,
39 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
40 remote_buffer_listeners:
41 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
42}
43
44enum OpenBuffer {
45 Strong(Model<Buffer>),
46 Weak(WeakModel<Buffer>),
47 Operations(Vec<Operation>),
48}
49
50pub enum BufferStoreEvent {
51 BufferAdded(Model<Buffer>),
52 BufferChangedFilePath {
53 buffer: Model<Buffer>,
54 old_file: Option<Arc<File>>,
55 },
56 BufferSaved {
57 buffer: Model<Buffer>,
58 has_changed_file: bool,
59 saved_version: clock::Global,
60 },
61 LocalBufferUpdated {
62 buffer: Model<Buffer>,
63 },
64 DiffBaseUpdated {
65 buffer: Model<Buffer>,
66 },
67}
68
69impl EventEmitter<BufferStoreEvent> for BufferStore {}
70
71impl BufferStore {
72 /// Creates a buffer store, optionally retaining its buffers.
73 ///
74 /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
75 /// and won't be released unless they are explicitly removed, or `retain_buffers`
76 /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
77 /// weak handles.
78 pub fn new(
79 worktree_store: Model<WorktreeStore>,
80 retain_buffers: bool,
81 cx: &mut ModelContext<Self>,
82 ) -> Self {
83 cx.subscribe(&worktree_store, |this, _, event, cx| match event {
84 WorktreeStoreEvent::WorktreeAdded(worktree) => {
85 this.subscribe_to_worktree(worktree, cx);
86 }
87 _ => {}
88 })
89 .detach();
90
91 Self {
92 retain_buffers,
93 worktree_store,
94 opened_buffers: Default::default(),
95 remote_buffer_listeners: Default::default(),
96 loading_remote_buffers_by_id: Default::default(),
97 local_buffer_ids_by_path: Default::default(),
98 local_buffer_ids_by_entry_id: Default::default(),
99 loading_buffers_by_path: Default::default(),
100 }
101 }
102
103 pub fn open_buffer(
104 &mut self,
105 project_path: ProjectPath,
106 cx: &mut ModelContext<Self>,
107 ) -> Task<Result<Model<Buffer>>> {
108 let existing_buffer = self.get_by_path(&project_path, cx);
109 if let Some(existing_buffer) = existing_buffer {
110 return Task::ready(Ok(existing_buffer));
111 }
112
113 let Some(worktree) = self
114 .worktree_store
115 .read(cx)
116 .worktree_for_id(project_path.worktree_id, cx)
117 else {
118 return Task::ready(Err(anyhow!("no such worktree")));
119 };
120
121 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
122 // If the given path is already being loaded, then wait for that existing
123 // task to complete and return the same buffer.
124 hash_map::Entry::Occupied(e) => e.get().clone(),
125
126 // Otherwise, record the fact that this path is now being loaded.
127 hash_map::Entry::Vacant(entry) => {
128 let (mut tx, rx) = postage::watch::channel();
129 entry.insert(rx.clone());
130
131 let project_path = project_path.clone();
132 let load_buffer = match worktree.read(cx) {
133 Worktree::Local(_) => {
134 self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
135 }
136 Worktree::Remote(tree) => {
137 self.open_remote_buffer_internal(&project_path.path, tree, cx)
138 }
139 };
140
141 cx.spawn(move |this, mut cx| async move {
142 let load_result = load_buffer.await;
143 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
144 // Record the fact that the buffer is no longer loading.
145 this.loading_buffers_by_path.remove(&project_path);
146 let buffer = load_result.map_err(Arc::new)?;
147 Ok(buffer)
148 })?);
149 anyhow::Ok(())
150 })
151 .detach();
152 rx
153 }
154 };
155
156 cx.background_executor().spawn(async move {
157 Self::wait_for_loading_buffer(loading_watch)
158 .await
159 .map_err(|e| e.cloned())
160 })
161 }
162
163 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
164 cx.subscribe(worktree, |this, worktree, event, cx| {
165 if worktree.read(cx).is_local() {
166 match event {
167 worktree::Event::UpdatedEntries(changes) => {
168 this.local_worktree_entries_changed(&worktree, changes, cx);
169 }
170 worktree::Event::UpdatedGitRepositories(updated_repos) => {
171 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
172 }
173 _ => {}
174 }
175 }
176 })
177 .detach();
178 }
179
180 fn local_worktree_entries_changed(
181 &mut self,
182 worktree_handle: &Model<Worktree>,
183 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
184 cx: &mut ModelContext<Self>,
185 ) {
186 let snapshot = worktree_handle.read(cx).snapshot();
187 for (path, entry_id, _) in changes {
188 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
189 }
190 }
191
192 fn local_worktree_git_repos_changed(
193 &mut self,
194 worktree_handle: Model<Worktree>,
195 changed_repos: &UpdatedGitRepositoriesSet,
196 cx: &mut ModelContext<Self>,
197 ) {
198 debug_assert!(worktree_handle.read(cx).is_local());
199
200 // Identify the loading buffers whose containing repository that has changed.
201 let future_buffers = self
202 .loading_buffers()
203 .filter_map(|(project_path, receiver)| {
204 if project_path.worktree_id != worktree_handle.read(cx).id() {
205 return None;
206 }
207 let path = &project_path.path;
208 changed_repos
209 .iter()
210 .find(|(work_dir, _)| path.starts_with(work_dir))?;
211 let path = path.clone();
212 Some(async move {
213 Self::wait_for_loading_buffer(receiver)
214 .await
215 .ok()
216 .map(|buffer| (buffer, path))
217 })
218 })
219 .collect::<FuturesUnordered<_>>();
220
221 // Identify the current buffers whose containing repository has changed.
222 let current_buffers = self
223 .buffers()
224 .filter_map(|buffer| {
225 let file = File::from_dyn(buffer.read(cx).file())?;
226 if file.worktree != worktree_handle {
227 return None;
228 }
229 changed_repos
230 .iter()
231 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
232 Some((buffer, file.path.clone()))
233 })
234 .collect::<Vec<_>>();
235
236 if future_buffers.len() + current_buffers.len() == 0 {
237 return;
238 }
239
240 cx.spawn(move |this, mut cx| async move {
241 // Wait for all of the buffers to load.
242 let future_buffers = future_buffers.collect::<Vec<_>>().await;
243
244 // Reload the diff base for every buffer whose containing git repository has changed.
245 let snapshot =
246 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
247 let diff_bases_by_buffer = cx
248 .background_executor()
249 .spawn(async move {
250 let mut diff_base_tasks = future_buffers
251 .into_iter()
252 .flatten()
253 .chain(current_buffers)
254 .filter_map(|(buffer, path)| {
255 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
256 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
257 Some(async move {
258 let base_text =
259 local_repo_entry.repo().load_index_text(&relative_path);
260 Some((buffer, base_text))
261 })
262 })
263 .collect::<FuturesUnordered<_>>();
264
265 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
266 while let Some(diff_base) = diff_base_tasks.next().await {
267 if let Some(diff_base) = diff_base {
268 diff_bases.push(diff_base);
269 }
270 }
271 diff_bases
272 })
273 .await;
274
275 this.update(&mut cx, |_, cx| {
276 // Assign the new diff bases on all of the buffers.
277 for (buffer, diff_base) in diff_bases_by_buffer {
278 buffer.update(cx, |buffer, cx| {
279 buffer.set_diff_base(diff_base.clone(), cx);
280 });
281 cx.emit(BufferStoreEvent::DiffBaseUpdated { buffer })
282 }
283 })
284 })
285 .detach_and_log_err(cx);
286 }
287
288 fn open_local_buffer_internal(
289 &mut self,
290 path: Arc<Path>,
291 worktree: Model<Worktree>,
292 cx: &mut ModelContext<Self>,
293 ) -> Task<Result<Model<Buffer>>> {
294 let load_buffer = worktree.update(cx, |worktree, cx| {
295 let load_file = worktree.load_file(path.as_ref(), cx);
296 let reservation = cx.reserve_model();
297 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
298 cx.spawn(move |_, mut cx| async move {
299 let loaded = load_file.await?;
300 let text_buffer = cx
301 .background_executor()
302 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
303 .await;
304 cx.insert_model(reservation, |_| {
305 Buffer::build(
306 text_buffer,
307 loaded.diff_base,
308 Some(loaded.file),
309 Capability::ReadWrite,
310 )
311 })
312 })
313 });
314
315 cx.spawn(move |this, mut cx| async move {
316 let buffer = match load_buffer.await {
317 Ok(buffer) => Ok(buffer),
318 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
319 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
320 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
321 Buffer::build(
322 text_buffer,
323 None,
324 Some(Arc::new(File {
325 worktree,
326 path,
327 mtime: None,
328 entry_id: None,
329 is_local: true,
330 is_deleted: false,
331 is_private: false,
332 })),
333 Capability::ReadWrite,
334 )
335 }),
336 Err(e) => Err(e),
337 }?;
338 this.update(&mut cx, |this, cx| {
339 this.add_buffer(buffer.clone(), cx).log_err();
340 })?;
341 Ok(buffer)
342 })
343 }
344
345 fn open_remote_buffer_internal(
346 &self,
347 path: &Arc<Path>,
348 worktree: &RemoteWorktree,
349 cx: &ModelContext<Self>,
350 ) -> Task<Result<Model<Buffer>>> {
351 let worktree_id = worktree.id().to_proto();
352 let project_id = worktree.project_id();
353 let client = worktree.client();
354 let path_string = path.clone().to_string_lossy().to_string();
355 cx.spawn(move |this, mut cx| async move {
356 let response = client
357 .request(proto::OpenBufferByPath {
358 project_id,
359 worktree_id,
360 path: path_string,
361 })
362 .await?;
363 let buffer_id = BufferId::new(response.buffer_id)?;
364 this.update(&mut cx, |this, cx| {
365 this.wait_for_remote_buffer(buffer_id, cx)
366 })?
367 .await
368 })
369 }
370
371 pub fn create_buffer(
372 &mut self,
373 remote_client: Option<(AnyProtoClient, u64)>,
374 cx: &mut ModelContext<Self>,
375 ) -> Task<Result<Model<Buffer>>> {
376 if let Some((remote_client, project_id)) = remote_client {
377 let create = remote_client.request(proto::OpenNewBuffer { project_id });
378 cx.spawn(|this, mut cx| async move {
379 let response = create.await?;
380 let buffer_id = BufferId::new(response.buffer_id)?;
381
382 this.update(&mut cx, |this, cx| {
383 this.wait_for_remote_buffer(buffer_id, cx)
384 })?
385 .await
386 })
387 } else {
388 Task::ready(Ok(self.create_local_buffer("", None, cx)))
389 }
390 }
391
392 pub fn create_local_buffer(
393 &mut self,
394 text: &str,
395 language: Option<Arc<Language>>,
396 cx: &mut ModelContext<Self>,
397 ) -> Model<Buffer> {
398 let buffer = cx.new_model(|cx| {
399 Buffer::local(text, cx)
400 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
401 });
402 self.add_buffer(buffer.clone(), cx).log_err();
403 buffer
404 }
405
406 pub fn save_buffer(
407 &mut self,
408 buffer: Model<Buffer>,
409 cx: &mut ModelContext<Self>,
410 ) -> Task<Result<()>> {
411 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
412 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
413 };
414 match file.worktree.read(cx) {
415 Worktree::Local(_) => {
416 self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
417 }
418 Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
419 }
420 }
421
422 pub fn save_buffer_as(
423 &mut self,
424 buffer: Model<Buffer>,
425 path: ProjectPath,
426 cx: &mut ModelContext<Self>,
427 ) -> Task<Result<()>> {
428 let Some(worktree) = self
429 .worktree_store
430 .read(cx)
431 .worktree_for_id(path.worktree_id, cx)
432 else {
433 return Task::ready(Err(anyhow!("no such worktree")));
434 };
435
436 let old_file = File::from_dyn(buffer.read(cx).file())
437 .cloned()
438 .map(Arc::new);
439
440 let task = match worktree.read(cx) {
441 Worktree::Local(_) => {
442 self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
443 }
444 Worktree::Remote(tree) => {
445 self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
446 }
447 };
448 cx.spawn(|this, mut cx| async move {
449 task.await?;
450 this.update(&mut cx, |_, cx| {
451 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
452 })
453 })
454 }
455
456 fn save_local_buffer(
457 &self,
458 worktree: Model<Worktree>,
459 buffer_handle: Model<Buffer>,
460 path: Arc<Path>,
461 mut has_changed_file: bool,
462 cx: &mut ModelContext<Self>,
463 ) -> Task<Result<()>> {
464 let buffer = buffer_handle.read(cx);
465 let text = buffer.as_rope().clone();
466 let line_ending = buffer.line_ending();
467 let version = buffer.version();
468 if buffer.file().is_some_and(|file| !file.is_created()) {
469 has_changed_file = true;
470 }
471
472 let save = worktree.update(cx, |worktree, cx| {
473 worktree.write_file(path.as_ref(), text, line_ending, cx)
474 });
475
476 cx.spawn(move |this, mut cx| async move {
477 let new_file = save.await?;
478 let mtime = new_file.mtime;
479 buffer_handle.update(&mut cx, |buffer, cx| {
480 if has_changed_file {
481 buffer.file_updated(new_file, cx);
482 }
483 buffer.did_save(version.clone(), mtime, cx);
484 })?;
485 this.update(&mut cx, |_, cx| {
486 cx.emit(BufferStoreEvent::BufferSaved {
487 buffer: buffer_handle,
488 has_changed_file,
489 saved_version: version,
490 })
491 })?;
492 Ok(())
493 })
494 }
495
496 fn save_remote_buffer(
497 &self,
498 buffer_handle: Model<Buffer>,
499 new_path: Option<proto::ProjectPath>,
500 tree: &RemoteWorktree,
501 cx: &ModelContext<Self>,
502 ) -> Task<Result<()>> {
503 let buffer = buffer_handle.read(cx);
504 let buffer_id = buffer.remote_id().into();
505 let version = buffer.version();
506 let rpc = tree.client();
507 let project_id = tree.project_id();
508 cx.spawn(move |_, mut cx| async move {
509 let response = rpc
510 .request(proto::SaveBuffer {
511 project_id,
512 buffer_id,
513 new_path,
514 version: serialize_version(&version),
515 })
516 .await?;
517 let version = deserialize_version(&response.version);
518 let mtime = response.mtime.map(|mtime| mtime.into());
519
520 buffer_handle.update(&mut cx, |buffer, cx| {
521 buffer.did_save(version.clone(), mtime, cx);
522 })?;
523
524 Ok(())
525 })
526 }
527
528 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
529 let remote_id = buffer.read(cx).remote_id();
530 let is_remote = buffer.read(cx).replica_id() != 0;
531 let open_buffer = if self.retain_buffers {
532 OpenBuffer::Strong(buffer.clone())
533 } else {
534 OpenBuffer::Weak(buffer.downgrade())
535 };
536
537 match self.opened_buffers.entry(remote_id) {
538 hash_map::Entry::Vacant(entry) => {
539 entry.insert(open_buffer);
540 }
541 hash_map::Entry::Occupied(mut entry) => {
542 if let OpenBuffer::Operations(operations) = entry.get_mut() {
543 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
544 } else if entry.get().upgrade().is_some() {
545 if is_remote {
546 return Ok(());
547 } else {
548 debug_panic!("buffer {} was already registered", remote_id);
549 Err(anyhow!("buffer {} was already registered", remote_id))?;
550 }
551 }
552 entry.insert(open_buffer);
553 }
554 }
555
556 if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
557 for sender in senders {
558 sender.send(Ok(buffer.clone())).ok();
559 }
560 }
561
562 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
563 if file.is_local {
564 self.local_buffer_ids_by_path.insert(
565 ProjectPath {
566 worktree_id: file.worktree_id(cx),
567 path: file.path.clone(),
568 },
569 remote_id,
570 );
571
572 if let Some(entry_id) = file.entry_id {
573 self.local_buffer_ids_by_entry_id
574 .insert(entry_id, remote_id);
575 }
576 }
577 }
578
579 cx.subscribe(&buffer, Self::on_buffer_event).detach();
580 cx.emit(BufferStoreEvent::BufferAdded(buffer));
581 Ok(())
582 }
583
584 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
585 self.opened_buffers
586 .values()
587 .filter_map(|buffer| buffer.upgrade())
588 }
589
590 pub fn loading_buffers(
591 &self,
592 ) -> impl Iterator<
593 Item = (
594 &ProjectPath,
595 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
596 ),
597 > {
598 self.loading_buffers_by_path
599 .iter()
600 .map(|(path, rx)| (path, rx.clone()))
601 }
602
603 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
604 self.buffers().find_map(|buffer| {
605 let file = File::from_dyn(buffer.read(cx).file())?;
606 if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
607 Some(buffer)
608 } else {
609 None
610 }
611 })
612 }
613
614 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
615 self.opened_buffers
616 .get(&buffer_id)
617 .and_then(|buffer| buffer.upgrade())
618 }
619
620 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
621 self.get(buffer_id)
622 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
623 }
624
625 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
626 self.get(buffer_id)
627 .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
628 }
629
630 pub fn wait_for_remote_buffer(
631 &mut self,
632 id: BufferId,
633 cx: &mut AppContext,
634 ) -> Task<Result<Model<Buffer>>> {
635 let buffer = self.get(id);
636 if let Some(buffer) = buffer {
637 return Task::ready(Ok(buffer));
638 }
639 let (tx, rx) = oneshot::channel();
640 self.remote_buffer_listeners.entry(id).or_default().push(tx);
641 cx.background_executor().spawn(async move { rx.await? })
642 }
643
644 pub fn buffer_version_info(
645 &self,
646 cx: &AppContext,
647 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
648 let buffers = self
649 .buffers()
650 .map(|buffer| {
651 let buffer = buffer.read(cx);
652 proto::BufferVersion {
653 id: buffer.remote_id().into(),
654 version: language::proto::serialize_version(&buffer.version),
655 }
656 })
657 .collect();
658 let incomplete_buffer_ids = self
659 .loading_remote_buffers_by_id
660 .keys()
661 .copied()
662 .collect::<Vec<_>>();
663 (buffers, incomplete_buffer_ids)
664 }
665
666 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
667 self.set_retain_buffers(false, cx);
668
669 for buffer in self.buffers() {
670 buffer.update(cx, |buffer, cx| {
671 buffer.set_capability(Capability::ReadOnly, cx)
672 });
673 }
674
675 // Wake up all futures currently waiting on a buffer to get opened,
676 // to give them a chance to fail now that we've disconnected.
677 self.remote_buffer_listeners.clear();
678 }
679
680 pub fn set_retain_buffers(&mut self, retain_buffers: bool, cx: &mut AppContext) {
681 self.retain_buffers = retain_buffers;
682 for open_buffer in self.opened_buffers.values_mut() {
683 if retain_buffers {
684 if let OpenBuffer::Weak(buffer) = open_buffer {
685 if let Some(buffer) = buffer.upgrade() {
686 *open_buffer = OpenBuffer::Strong(buffer);
687 }
688 }
689 } else {
690 if let Some(buffer) = open_buffer.upgrade() {
691 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
692 }
693 if let OpenBuffer::Strong(buffer) = open_buffer {
694 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
695 }
696 }
697 }
698 }
699
700 pub fn discard_incomplete(&mut self) {
701 self.opened_buffers
702 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
703 }
704
705 fn on_buffer_event(
706 &mut self,
707 buffer: Model<Buffer>,
708 event: &BufferEvent,
709 cx: &mut ModelContext<Self>,
710 ) {
711 match event {
712 BufferEvent::FileHandleChanged => {
713 self.buffer_changed_file(buffer, cx);
714 }
715 _ => {}
716 }
717 }
718
719 fn local_worktree_entry_changed(
720 &mut self,
721 entry_id: ProjectEntryId,
722 path: &Arc<Path>,
723 worktree: &Model<worktree::Worktree>,
724 snapshot: &worktree::Snapshot,
725 cx: &mut ModelContext<Self>,
726 ) -> Option<()> {
727 let project_path = ProjectPath {
728 worktree_id: snapshot.id(),
729 path: path.clone(),
730 };
731 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
732 Some(&buffer_id) => buffer_id,
733 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
734 };
735 let buffer = if let Some(buffer) = self.get(buffer_id) {
736 buffer
737 } else {
738 self.opened_buffers.remove(&buffer_id);
739 self.local_buffer_ids_by_path.remove(&project_path);
740 self.local_buffer_ids_by_entry_id.remove(&entry_id);
741 return None;
742 };
743
744 let (old_file, new_file) = buffer.update(cx, |buffer, cx| {
745 let old_file = File::from_dyn(buffer.file())?;
746 if old_file.worktree != *worktree {
747 return None;
748 }
749
750 let new_file = if let Some(entry) = old_file
751 .entry_id
752 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
753 {
754 File {
755 is_local: true,
756 entry_id: Some(entry.id),
757 mtime: entry.mtime,
758 path: entry.path.clone(),
759 worktree: worktree.clone(),
760 is_deleted: false,
761 is_private: entry.is_private,
762 }
763 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
764 File {
765 is_local: true,
766 entry_id: Some(entry.id),
767 mtime: entry.mtime,
768 path: entry.path.clone(),
769 worktree: worktree.clone(),
770 is_deleted: false,
771 is_private: entry.is_private,
772 }
773 } else {
774 File {
775 is_local: true,
776 entry_id: old_file.entry_id,
777 path: old_file.path.clone(),
778 mtime: old_file.mtime,
779 worktree: worktree.clone(),
780 is_deleted: true,
781 is_private: old_file.is_private,
782 }
783 };
784
785 if new_file == *old_file {
786 return None;
787 }
788
789 let old_file = Arc::new(old_file.clone());
790 let new_file = Arc::new(new_file);
791 buffer.file_updated(new_file.clone(), cx);
792 Some((old_file, new_file))
793 })?;
794
795 if new_file.path != old_file.path {
796 self.local_buffer_ids_by_path.remove(&ProjectPath {
797 path: old_file.path.clone(),
798 worktree_id: old_file.worktree_id(cx),
799 });
800 self.local_buffer_ids_by_path.insert(
801 ProjectPath {
802 worktree_id: new_file.worktree_id(cx),
803 path: new_file.path.clone(),
804 },
805 buffer_id,
806 );
807 cx.emit(BufferStoreEvent::BufferChangedFilePath {
808 buffer: buffer.clone(),
809 old_file: Some(old_file.clone()),
810 });
811 }
812
813 if new_file.entry_id != old_file.entry_id {
814 if let Some(entry_id) = old_file.entry_id {
815 self.local_buffer_ids_by_entry_id.remove(&entry_id);
816 }
817 if let Some(entry_id) = new_file.entry_id {
818 self.local_buffer_ids_by_entry_id
819 .insert(entry_id, buffer_id);
820 }
821 }
822
823 cx.emit(BufferStoreEvent::LocalBufferUpdated { buffer });
824 None
825 }
826
827 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
828 let file = File::from_dyn(buffer.read(cx).file())?;
829
830 let remote_id = buffer.read(cx).remote_id();
831 if let Some(entry_id) = file.entry_id {
832 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
833 Some(_) => {
834 return None;
835 }
836 None => {
837 self.local_buffer_ids_by_entry_id
838 .insert(entry_id, remote_id);
839 }
840 }
841 };
842 self.local_buffer_ids_by_path.insert(
843 ProjectPath {
844 worktree_id: file.worktree_id(cx),
845 path: file.path.clone(),
846 },
847 remote_id,
848 );
849
850 Some(())
851 }
852
853 pub async fn create_buffer_for_peer(
854 this: Model<Self>,
855 peer_id: PeerId,
856 buffer_id: BufferId,
857 project_id: u64,
858 client: AnyProtoClient,
859 cx: &mut AsyncAppContext,
860 ) -> Result<()> {
861 let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
862 return Ok(());
863 };
864
865 let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
866 let operations = operations.await;
867 let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
868
869 let initial_state = proto::CreateBufferForPeer {
870 project_id,
871 peer_id: Some(peer_id),
872 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
873 };
874
875 if client.send(initial_state).log_err().is_some() {
876 let client = client.clone();
877 cx.background_executor()
878 .spawn(async move {
879 let mut chunks = split_operations(operations).peekable();
880 while let Some(chunk) = chunks.next() {
881 let is_last = chunks.peek().is_none();
882 client.send(proto::CreateBufferForPeer {
883 project_id,
884 peer_id: Some(peer_id),
885 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
886 proto::BufferChunk {
887 buffer_id: buffer_id.into(),
888 operations: chunk,
889 is_last,
890 },
891 )),
892 })?;
893 }
894 anyhow::Ok(())
895 })
896 .await
897 .log_err();
898 }
899 Ok(())
900 }
901
902 pub fn handle_update_buffer(
903 &mut self,
904 envelope: TypedEnvelope<proto::UpdateBuffer>,
905 is_remote: bool,
906 cx: &mut AppContext,
907 ) -> Result<proto::Ack> {
908 let payload = envelope.payload.clone();
909 let buffer_id = BufferId::new(payload.buffer_id)?;
910 let ops = payload
911 .operations
912 .into_iter()
913 .map(language::proto::deserialize_operation)
914 .collect::<Result<Vec<_>, _>>()?;
915 match self.opened_buffers.entry(buffer_id) {
916 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
917 OpenBuffer::Strong(buffer) => {
918 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
919 }
920 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
921 OpenBuffer::Weak(_) => {}
922 },
923 hash_map::Entry::Vacant(e) => {
924 if !is_remote {
925 debug_panic!(
926 "received buffer update from {:?}",
927 envelope.original_sender_id
928 );
929 return Err(anyhow!("received buffer update for non-remote project"));
930 }
931 e.insert(OpenBuffer::Operations(ops));
932 }
933 }
934 Ok(proto::Ack {})
935 }
936
937 pub fn handle_create_buffer_for_peer(
938 &mut self,
939 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
940 mut worktrees: impl Iterator<Item = Model<Worktree>>,
941 replica_id: u16,
942 capability: Capability,
943 cx: &mut ModelContext<Self>,
944 ) -> Result<()> {
945 match envelope
946 .payload
947 .variant
948 .ok_or_else(|| anyhow!("missing variant"))?
949 {
950 proto::create_buffer_for_peer::Variant::State(mut state) => {
951 let buffer_id = BufferId::new(state.id)?;
952
953 let buffer_result = maybe!({
954 let mut buffer_file = None;
955 if let Some(file) = state.file.take() {
956 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
957 let worktree = worktrees
958 .find(|worktree| worktree.read(cx).id() == worktree_id)
959 .ok_or_else(|| {
960 anyhow!("no worktree found for id {}", file.worktree_id)
961 })?;
962 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
963 as Arc<dyn language::File>);
964 }
965 Buffer::from_proto(replica_id, capability, state, buffer_file)
966 });
967
968 match buffer_result {
969 Ok(buffer) => {
970 let buffer = cx.new_model(|_| buffer);
971 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
972 }
973 Err(error) => {
974 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
975 for listener in listeners {
976 listener.send(Err(anyhow!(error.cloned()))).ok();
977 }
978 }
979 }
980 }
981 }
982 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
983 let buffer_id = BufferId::new(chunk.buffer_id)?;
984 let buffer = self
985 .loading_remote_buffers_by_id
986 .get(&buffer_id)
987 .cloned()
988 .ok_or_else(|| {
989 anyhow!(
990 "received chunk for buffer {} without initial state",
991 chunk.buffer_id
992 )
993 })?;
994
995 let result = maybe!({
996 let operations = chunk
997 .operations
998 .into_iter()
999 .map(language::proto::deserialize_operation)
1000 .collect::<Result<Vec<_>>>()?;
1001 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1002 });
1003
1004 if let Err(error) = result {
1005 self.loading_remote_buffers_by_id.remove(&buffer_id);
1006 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1007 for listener in listeners {
1008 listener.send(Err(error.cloned())).ok();
1009 }
1010 }
1011 } else if chunk.is_last {
1012 self.loading_remote_buffers_by_id.remove(&buffer_id);
1013 self.add_buffer(buffer, cx)?;
1014 }
1015 }
1016 }
1017
1018 Ok(())
1019 }
1020
1021 pub async fn handle_save_buffer(
1022 this: Model<Self>,
1023 project_id: u64,
1024 envelope: TypedEnvelope<proto::SaveBuffer>,
1025 mut cx: AsyncAppContext,
1026 ) -> Result<proto::BufferSaved> {
1027 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1028 let buffer = this.update(&mut cx, |this, _| this.get_existing(buffer_id))??;
1029 buffer
1030 .update(&mut cx, |buffer, _| {
1031 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1032 })?
1033 .await?;
1034 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1035
1036 if let Some(new_path) = envelope.payload.new_path {
1037 let new_path = ProjectPath::from_proto(new_path);
1038 this.update(&mut cx, |this, cx| {
1039 this.save_buffer_as(buffer.clone(), new_path, cx)
1040 })?
1041 .await?;
1042 } else {
1043 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1044 .await?;
1045 }
1046
1047 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1048 project_id,
1049 buffer_id: buffer_id.into(),
1050 version: serialize_version(buffer.saved_version()),
1051 mtime: buffer.saved_mtime().map(|time| time.into()),
1052 })
1053 }
1054
1055 pub async fn handle_buffer_saved(
1056 this: Model<Self>,
1057 envelope: TypedEnvelope<proto::BufferSaved>,
1058 mut cx: AsyncAppContext,
1059 ) -> Result<()> {
1060 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1061 let version = deserialize_version(&envelope.payload.version);
1062 let mtime = envelope.payload.mtime.map(|time| time.into());
1063 this.update(&mut cx, |this, cx| {
1064 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1065 buffer.update(cx, |buffer, cx| {
1066 buffer.did_save(version, mtime, cx);
1067 });
1068 }
1069 })
1070 }
1071
1072 pub async fn handle_buffer_reloaded(
1073 this: Model<Self>,
1074 envelope: TypedEnvelope<proto::BufferReloaded>,
1075 mut cx: AsyncAppContext,
1076 ) -> Result<()> {
1077 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1078 let version = deserialize_version(&envelope.payload.version);
1079 let mtime = envelope.payload.mtime.map(|time| time.into());
1080 let line_ending = deserialize_line_ending(
1081 proto::LineEnding::from_i32(envelope.payload.line_ending)
1082 .ok_or_else(|| anyhow!("missing line ending"))?,
1083 );
1084 this.update(&mut cx, |this, cx| {
1085 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1086 buffer.update(cx, |buffer, cx| {
1087 buffer.did_reload(version, line_ending, mtime, cx);
1088 });
1089 }
1090 })
1091 }
1092
1093 pub async fn wait_for_loading_buffer(
1094 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1095 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1096 loop {
1097 if let Some(result) = receiver.borrow().as_ref() {
1098 match result {
1099 Ok(buffer) => return Ok(buffer.to_owned()),
1100 Err(e) => return Err(e.to_owned()),
1101 }
1102 }
1103 receiver.next().await;
1104 }
1105 }
1106}
1107
1108impl OpenBuffer {
1109 fn upgrade(&self) -> Option<Model<Buffer>> {
1110 match self {
1111 OpenBuffer::Strong(handle) => Some(handle.clone()),
1112 OpenBuffer::Weak(handle) => handle.upgrade(),
1113 OpenBuffer::Operations(_) => None,
1114 }
1115 }
1116}
1117
1118fn is_not_found_error(error: &anyhow::Error) -> bool {
1119 error
1120 .root_cause()
1121 .downcast_ref::<io::Error>()
1122 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1123}