1use crate::{
2 worktree_store::{WorktreeStore, WorktreeStoreEvent},
3 NoRepositoryError, ProjectPath,
4};
5use anyhow::{anyhow, Context as _, Result};
6use collections::{hash_map, HashMap};
7use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
8use git::blame::Blame;
9use gpui::{
10 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
11};
12use http_client::Url;
13use language::{
14 proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
15 Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
16};
17use rpc::{
18 proto::{self, AnyProtoClient, EnvelopedMessage, PeerId},
19 ErrorExt as _, TypedEnvelope,
20};
21use std::{io, path::Path, str::FromStr as _, sync::Arc};
22use text::BufferId;
23use util::{debug_panic, maybe, ResultExt as _};
24use worktree::{
25 File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
26 WorktreeId,
27};
28
29/// A set of open buffers.
30pub struct BufferStore {
31 remote_id: Option<u64>,
32 #[allow(unused)]
33 worktree_store: Model<WorktreeStore>,
34 opened_buffers: HashMap<BufferId, OpenBuffer>,
35 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
36 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
37 #[allow(clippy::type_complexity)]
38 loading_buffers_by_path: HashMap<
39 ProjectPath,
40 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
41 >,
42 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
43 remote_buffer_listeners:
44 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
45}
46
47enum OpenBuffer {
48 Strong(Model<Buffer>),
49 Weak(WeakModel<Buffer>),
50 Operations(Vec<Operation>),
51}
52
53pub enum BufferStoreEvent {
54 BufferAdded(Model<Buffer>),
55 BufferChangedFilePath {
56 buffer: Model<Buffer>,
57 old_file: Option<Arc<dyn language::File>>,
58 },
59 MessageToReplicas(Box<proto::Envelope>),
60}
61
62impl EventEmitter<BufferStoreEvent> for BufferStore {}
63
64impl BufferStore {
65 /// Creates a buffer store, optionally retaining its buffers.
66 ///
67 /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
68 /// and won't be released unless they are explicitly removed, or `retain_buffers`
69 /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
70 /// weak handles.
71 pub fn new(
72 worktree_store: Model<WorktreeStore>,
73 remote_id: Option<u64>,
74 cx: &mut ModelContext<Self>,
75 ) -> Self {
76 cx.subscribe(&worktree_store, |this, _, event, cx| match event {
77 WorktreeStoreEvent::WorktreeAdded(worktree) => {
78 this.subscribe_to_worktree(worktree, cx);
79 }
80 _ => {}
81 })
82 .detach();
83
84 Self {
85 remote_id,
86 worktree_store,
87 opened_buffers: Default::default(),
88 remote_buffer_listeners: Default::default(),
89 loading_remote_buffers_by_id: Default::default(),
90 local_buffer_ids_by_path: Default::default(),
91 local_buffer_ids_by_entry_id: Default::default(),
92 loading_buffers_by_path: Default::default(),
93 }
94 }
95
96 pub fn open_buffer(
97 &mut self,
98 project_path: ProjectPath,
99 cx: &mut ModelContext<Self>,
100 ) -> Task<Result<Model<Buffer>>> {
101 let existing_buffer = self.get_by_path(&project_path, cx);
102 if let Some(existing_buffer) = existing_buffer {
103 return Task::ready(Ok(existing_buffer));
104 }
105
106 let Some(worktree) = self
107 .worktree_store
108 .read(cx)
109 .worktree_for_id(project_path.worktree_id, cx)
110 else {
111 return Task::ready(Err(anyhow!("no such worktree")));
112 };
113
114 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
115 // If the given path is already being loaded, then wait for that existing
116 // task to complete and return the same buffer.
117 hash_map::Entry::Occupied(e) => e.get().clone(),
118
119 // Otherwise, record the fact that this path is now being loaded.
120 hash_map::Entry::Vacant(entry) => {
121 let (mut tx, rx) = postage::watch::channel();
122 entry.insert(rx.clone());
123
124 let project_path = project_path.clone();
125 let load_buffer = match worktree.read(cx) {
126 Worktree::Local(_) => {
127 self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
128 }
129 Worktree::Remote(tree) => {
130 self.open_remote_buffer_internal(&project_path.path, tree, cx)
131 }
132 };
133
134 cx.spawn(move |this, mut cx| async move {
135 let load_result = load_buffer.await;
136 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
137 // Record the fact that the buffer is no longer loading.
138 this.loading_buffers_by_path.remove(&project_path);
139 let buffer = load_result.map_err(Arc::new)?;
140 Ok(buffer)
141 })?);
142 anyhow::Ok(())
143 })
144 .detach();
145 rx
146 }
147 };
148
149 cx.background_executor().spawn(async move {
150 Self::wait_for_loading_buffer(loading_watch)
151 .await
152 .map_err(|e| e.cloned())
153 })
154 }
155
156 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
157 cx.subscribe(worktree, |this, worktree, event, cx| {
158 if worktree.read(cx).is_local() {
159 match event {
160 worktree::Event::UpdatedEntries(changes) => {
161 this.local_worktree_entries_changed(&worktree, changes, cx);
162 }
163 worktree::Event::UpdatedGitRepositories(updated_repos) => {
164 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
165 }
166 _ => {}
167 }
168 }
169 })
170 .detach();
171 }
172
173 fn local_worktree_entries_changed(
174 &mut self,
175 worktree_handle: &Model<Worktree>,
176 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
177 cx: &mut ModelContext<Self>,
178 ) {
179 let snapshot = worktree_handle.read(cx).snapshot();
180 for (path, entry_id, _) in changes {
181 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
182 }
183 }
184
185 fn local_worktree_git_repos_changed(
186 &mut self,
187 worktree_handle: Model<Worktree>,
188 changed_repos: &UpdatedGitRepositoriesSet,
189 cx: &mut ModelContext<Self>,
190 ) {
191 debug_assert!(worktree_handle.read(cx).is_local());
192
193 // Identify the loading buffers whose containing repository that has changed.
194 let future_buffers = self
195 .loading_buffers()
196 .filter_map(|(project_path, receiver)| {
197 if project_path.worktree_id != worktree_handle.read(cx).id() {
198 return None;
199 }
200 let path = &project_path.path;
201 changed_repos
202 .iter()
203 .find(|(work_dir, _)| path.starts_with(work_dir))?;
204 let path = path.clone();
205 Some(async move {
206 Self::wait_for_loading_buffer(receiver)
207 .await
208 .ok()
209 .map(|buffer| (buffer, path))
210 })
211 })
212 .collect::<FuturesUnordered<_>>();
213
214 // Identify the current buffers whose containing repository has changed.
215 let current_buffers = self
216 .buffers()
217 .filter_map(|buffer| {
218 let file = File::from_dyn(buffer.read(cx).file())?;
219 if file.worktree != worktree_handle {
220 return None;
221 }
222 changed_repos
223 .iter()
224 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
225 Some((buffer, file.path.clone()))
226 })
227 .collect::<Vec<_>>();
228
229 if future_buffers.len() + current_buffers.len() == 0 {
230 return;
231 }
232
233 cx.spawn(move |this, mut cx| async move {
234 // Wait for all of the buffers to load.
235 let future_buffers = future_buffers.collect::<Vec<_>>().await;
236
237 // Reload the diff base for every buffer whose containing git repository has changed.
238 let snapshot =
239 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
240 let diff_bases_by_buffer = cx
241 .background_executor()
242 .spawn(async move {
243 let mut diff_base_tasks = future_buffers
244 .into_iter()
245 .flatten()
246 .chain(current_buffers)
247 .filter_map(|(buffer, path)| {
248 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
249 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
250 Some(async move {
251 let base_text =
252 local_repo_entry.repo().load_index_text(&relative_path);
253 Some((buffer, base_text))
254 })
255 })
256 .collect::<FuturesUnordered<_>>();
257
258 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
259 while let Some(diff_base) = diff_base_tasks.next().await {
260 if let Some(diff_base) = diff_base {
261 diff_bases.push(diff_base);
262 }
263 }
264 diff_bases
265 })
266 .await;
267
268 this.update(&mut cx, |this, cx| {
269 // Assign the new diff bases on all of the buffers.
270 for (buffer, diff_base) in diff_bases_by_buffer {
271 let buffer_id = buffer.update(cx, |buffer, cx| {
272 buffer.set_diff_base(diff_base.clone(), cx);
273 buffer.remote_id().to_proto()
274 });
275 if let Some(project_id) = this.remote_id {
276 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
277 proto::UpdateDiffBase {
278 project_id,
279 buffer_id,
280 diff_base,
281 }
282 .into_envelope(0, None, None),
283 )))
284 }
285 }
286 })
287 })
288 .detach_and_log_err(cx);
289 }
290
291 fn open_local_buffer_internal(
292 &mut self,
293 path: Arc<Path>,
294 worktree: Model<Worktree>,
295 cx: &mut ModelContext<Self>,
296 ) -> Task<Result<Model<Buffer>>> {
297 let load_buffer = worktree.update(cx, |worktree, cx| {
298 let load_file = worktree.load_file(path.as_ref(), cx);
299 let reservation = cx.reserve_model();
300 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
301 cx.spawn(move |_, mut cx| async move {
302 let loaded = load_file.await?;
303 let text_buffer = cx
304 .background_executor()
305 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
306 .await;
307 cx.insert_model(reservation, |_| {
308 Buffer::build(
309 text_buffer,
310 loaded.diff_base,
311 Some(loaded.file),
312 Capability::ReadWrite,
313 )
314 })
315 })
316 });
317
318 cx.spawn(move |this, mut cx| async move {
319 let buffer = match load_buffer.await {
320 Ok(buffer) => Ok(buffer),
321 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
322 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
323 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
324 Buffer::build(
325 text_buffer,
326 None,
327 Some(Arc::new(File {
328 worktree,
329 path,
330 mtime: None,
331 entry_id: None,
332 is_local: true,
333 is_deleted: false,
334 is_private: false,
335 })),
336 Capability::ReadWrite,
337 )
338 }),
339 Err(e) => Err(e),
340 }?;
341 this.update(&mut cx, |this, cx| {
342 this.add_buffer(buffer.clone(), cx).log_err();
343 })?;
344 Ok(buffer)
345 })
346 }
347
348 fn open_remote_buffer_internal(
349 &self,
350 path: &Arc<Path>,
351 worktree: &RemoteWorktree,
352 cx: &ModelContext<Self>,
353 ) -> Task<Result<Model<Buffer>>> {
354 let worktree_id = worktree.id().to_proto();
355 let project_id = worktree.project_id();
356 let client = worktree.client();
357 let path_string = path.clone().to_string_lossy().to_string();
358 cx.spawn(move |this, mut cx| async move {
359 let response = client
360 .request(proto::OpenBufferByPath {
361 project_id,
362 worktree_id,
363 path: path_string,
364 })
365 .await?;
366 let buffer_id = BufferId::new(response.buffer_id)?;
367 this.update(&mut cx, |this, cx| {
368 this.wait_for_remote_buffer(buffer_id, cx)
369 })?
370 .await
371 })
372 }
373
374 pub fn create_buffer(
375 &mut self,
376 remote_client: Option<(AnyProtoClient, u64)>,
377 cx: &mut ModelContext<Self>,
378 ) -> Task<Result<Model<Buffer>>> {
379 if let Some((remote_client, project_id)) = remote_client {
380 let create = remote_client.request(proto::OpenNewBuffer { project_id });
381 cx.spawn(|this, mut cx| async move {
382 let response = create.await?;
383 let buffer_id = BufferId::new(response.buffer_id)?;
384
385 this.update(&mut cx, |this, cx| {
386 this.wait_for_remote_buffer(buffer_id, cx)
387 })?
388 .await
389 })
390 } else {
391 Task::ready(Ok(self.create_local_buffer("", None, cx)))
392 }
393 }
394
395 pub fn create_local_buffer(
396 &mut self,
397 text: &str,
398 language: Option<Arc<Language>>,
399 cx: &mut ModelContext<Self>,
400 ) -> Model<Buffer> {
401 let buffer = cx.new_model(|cx| {
402 Buffer::local(text, cx)
403 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
404 });
405 self.add_buffer(buffer.clone(), cx).log_err();
406 buffer
407 }
408
409 pub fn save_buffer(
410 &mut self,
411 buffer: Model<Buffer>,
412 cx: &mut ModelContext<Self>,
413 ) -> Task<Result<()>> {
414 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
415 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
416 };
417 match file.worktree.read(cx) {
418 Worktree::Local(_) => {
419 self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
420 }
421 Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
422 }
423 }
424
425 pub fn save_buffer_as(
426 &mut self,
427 buffer: Model<Buffer>,
428 path: ProjectPath,
429 cx: &mut ModelContext<Self>,
430 ) -> Task<Result<()>> {
431 let Some(worktree) = self
432 .worktree_store
433 .read(cx)
434 .worktree_for_id(path.worktree_id, cx)
435 else {
436 return Task::ready(Err(anyhow!("no such worktree")));
437 };
438
439 let old_file = buffer.read(cx).file().cloned();
440
441 let task = match worktree.read(cx) {
442 Worktree::Local(_) => {
443 self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
444 }
445 Worktree::Remote(tree) => {
446 self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
447 }
448 };
449 cx.spawn(|this, mut cx| async move {
450 task.await?;
451 this.update(&mut cx, |_, cx| {
452 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
453 })
454 })
455 }
456
457 fn save_local_buffer(
458 &self,
459 worktree: Model<Worktree>,
460 buffer_handle: Model<Buffer>,
461 path: Arc<Path>,
462 mut has_changed_file: bool,
463 cx: &mut ModelContext<Self>,
464 ) -> Task<Result<()>> {
465 let buffer = buffer_handle.read(cx);
466 let text = buffer.as_rope().clone();
467 let line_ending = buffer.line_ending();
468 let version = buffer.version();
469 let buffer_id = buffer.remote_id();
470 if buffer.file().is_some_and(|file| !file.is_created()) {
471 has_changed_file = true;
472 }
473
474 let save = worktree.update(cx, |worktree, cx| {
475 worktree.write_file(path.as_ref(), text, line_ending, cx)
476 });
477
478 cx.spawn(move |this, mut cx| async move {
479 let new_file = save.await?;
480 let mtime = new_file.mtime;
481 this.update(&mut cx, |this, cx| {
482 if let Some(project_id) = this.remote_id {
483 if has_changed_file {
484 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
485 proto::UpdateBufferFile {
486 project_id,
487 buffer_id: buffer_id.to_proto(),
488 file: Some(language::File::to_proto(&*new_file, cx)),
489 }
490 .into_envelope(0, None, None),
491 )));
492 }
493 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
494 proto::BufferSaved {
495 project_id,
496 buffer_id: buffer_id.to_proto(),
497 version: serialize_version(&version),
498 mtime: mtime.map(|time| time.into()),
499 }
500 .into_envelope(0, None, None),
501 )));
502 }
503 })?;
504 buffer_handle.update(&mut cx, |buffer, cx| {
505 if has_changed_file {
506 buffer.file_updated(new_file, cx);
507 }
508 buffer.did_save(version.clone(), mtime, cx);
509 })
510 })
511 }
512
513 fn save_remote_buffer(
514 &self,
515 buffer_handle: Model<Buffer>,
516 new_path: Option<proto::ProjectPath>,
517 tree: &RemoteWorktree,
518 cx: &ModelContext<Self>,
519 ) -> Task<Result<()>> {
520 let buffer = buffer_handle.read(cx);
521 let buffer_id = buffer.remote_id().into();
522 let version = buffer.version();
523 let rpc = tree.client();
524 let project_id = tree.project_id();
525 cx.spawn(move |_, mut cx| async move {
526 let response = rpc
527 .request(proto::SaveBuffer {
528 project_id,
529 buffer_id,
530 new_path,
531 version: serialize_version(&version),
532 })
533 .await?;
534 let version = deserialize_version(&response.version);
535 let mtime = response.mtime.map(|mtime| mtime.into());
536
537 buffer_handle.update(&mut cx, |buffer, cx| {
538 buffer.did_save(version.clone(), mtime, cx);
539 })?;
540
541 Ok(())
542 })
543 }
544
545 pub fn blame_buffer(
546 &self,
547 buffer: &Model<Buffer>,
548 version: Option<clock::Global>,
549 cx: &AppContext,
550 ) -> Task<Result<Blame>> {
551 let buffer = buffer.read(cx);
552 let Some(file) = File::from_dyn(buffer.file()) else {
553 return Task::ready(Err(anyhow!("buffer has no file")));
554 };
555
556 match file.worktree.clone().read(cx) {
557 Worktree::Local(worktree) => {
558 let worktree = worktree.snapshot();
559 let blame_params = maybe!({
560 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
561 Some(repo_for_path) => repo_for_path,
562 None => anyhow::bail!(NoRepositoryError {}),
563 };
564
565 let relative_path = repo_entry
566 .relativize(&worktree, &file.path)
567 .context("failed to relativize buffer path")?;
568
569 let repo = local_repo_entry.repo().clone();
570
571 let content = match version {
572 Some(version) => buffer.rope_for_version(&version).clone(),
573 None => buffer.as_rope().clone(),
574 };
575
576 anyhow::Ok((repo, relative_path, content))
577 });
578
579 cx.background_executor().spawn(async move {
580 let (repo, relative_path, content) = blame_params?;
581 repo.blame(&relative_path, content)
582 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
583 })
584 }
585 Worktree::Remote(worktree) => {
586 let buffer_id = buffer.remote_id();
587 let version = buffer.version();
588 let project_id = worktree.project_id();
589 let client = worktree.client();
590 cx.spawn(|_| async move {
591 let response = client
592 .request(proto::BlameBuffer {
593 project_id,
594 buffer_id: buffer_id.into(),
595 version: serialize_version(&version),
596 })
597 .await?;
598 Ok(deserialize_blame_buffer_response(response))
599 })
600 }
601 }
602 }
603
604 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
605 let remote_id = buffer.read(cx).remote_id();
606 let is_remote = buffer.read(cx).replica_id() != 0;
607 let open_buffer = if self.remote_id.is_some() {
608 OpenBuffer::Strong(buffer.clone())
609 } else {
610 OpenBuffer::Weak(buffer.downgrade())
611 };
612
613 match self.opened_buffers.entry(remote_id) {
614 hash_map::Entry::Vacant(entry) => {
615 entry.insert(open_buffer);
616 }
617 hash_map::Entry::Occupied(mut entry) => {
618 if let OpenBuffer::Operations(operations) = entry.get_mut() {
619 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
620 } else if entry.get().upgrade().is_some() {
621 if is_remote {
622 return Ok(());
623 } else {
624 debug_panic!("buffer {} was already registered", remote_id);
625 Err(anyhow!("buffer {} was already registered", remote_id))?;
626 }
627 }
628 entry.insert(open_buffer);
629 }
630 }
631
632 if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
633 for sender in senders {
634 sender.send(Ok(buffer.clone())).ok();
635 }
636 }
637
638 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
639 if file.is_local {
640 self.local_buffer_ids_by_path.insert(
641 ProjectPath {
642 worktree_id: file.worktree_id(cx),
643 path: file.path.clone(),
644 },
645 remote_id,
646 );
647
648 if let Some(entry_id) = file.entry_id {
649 self.local_buffer_ids_by_entry_id
650 .insert(entry_id, remote_id);
651 }
652 }
653 }
654
655 cx.subscribe(&buffer, Self::on_buffer_event).detach();
656 cx.emit(BufferStoreEvent::BufferAdded(buffer));
657 Ok(())
658 }
659
660 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
661 self.opened_buffers
662 .values()
663 .filter_map(|buffer| buffer.upgrade())
664 }
665
666 pub fn loading_buffers(
667 &self,
668 ) -> impl Iterator<
669 Item = (
670 &ProjectPath,
671 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
672 ),
673 > {
674 self.loading_buffers_by_path
675 .iter()
676 .map(|(path, rx)| (path, rx.clone()))
677 }
678
679 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
680 self.buffers().find_map(|buffer| {
681 let file = File::from_dyn(buffer.read(cx).file())?;
682 if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
683 Some(buffer)
684 } else {
685 None
686 }
687 })
688 }
689
690 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
691 self.opened_buffers
692 .get(&buffer_id)
693 .and_then(|buffer| buffer.upgrade())
694 }
695
696 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
697 self.get(buffer_id)
698 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
699 }
700
701 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
702 self.get(buffer_id)
703 .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
704 }
705
706 pub fn wait_for_remote_buffer(
707 &mut self,
708 id: BufferId,
709 cx: &mut AppContext,
710 ) -> Task<Result<Model<Buffer>>> {
711 let buffer = self.get(id);
712 if let Some(buffer) = buffer {
713 return Task::ready(Ok(buffer));
714 }
715 let (tx, rx) = oneshot::channel();
716 self.remote_buffer_listeners.entry(id).or_default().push(tx);
717 cx.background_executor().spawn(async move { rx.await? })
718 }
719
720 pub fn buffer_version_info(
721 &self,
722 cx: &AppContext,
723 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
724 let buffers = self
725 .buffers()
726 .map(|buffer| {
727 let buffer = buffer.read(cx);
728 proto::BufferVersion {
729 id: buffer.remote_id().into(),
730 version: language::proto::serialize_version(&buffer.version),
731 }
732 })
733 .collect();
734 let incomplete_buffer_ids = self
735 .loading_remote_buffers_by_id
736 .keys()
737 .copied()
738 .collect::<Vec<_>>();
739 (buffers, incomplete_buffer_ids)
740 }
741
742 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
743 self.set_remote_id(None, cx);
744
745 for buffer in self.buffers() {
746 buffer.update(cx, |buffer, cx| {
747 buffer.set_capability(Capability::ReadOnly, cx)
748 });
749 }
750
751 // Wake up all futures currently waiting on a buffer to get opened,
752 // to give them a chance to fail now that we've disconnected.
753 self.remote_buffer_listeners.clear();
754 }
755
756 pub fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
757 self.remote_id = remote_id;
758 for open_buffer in self.opened_buffers.values_mut() {
759 if remote_id.is_some() {
760 if let OpenBuffer::Weak(buffer) = open_buffer {
761 if let Some(buffer) = buffer.upgrade() {
762 *open_buffer = OpenBuffer::Strong(buffer);
763 }
764 }
765 } else {
766 if let Some(buffer) = open_buffer.upgrade() {
767 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
768 }
769 if let OpenBuffer::Strong(buffer) = open_buffer {
770 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
771 }
772 }
773 }
774 }
775
776 pub fn discard_incomplete(&mut self) {
777 self.opened_buffers
778 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
779 }
780
781 fn on_buffer_event(
782 &mut self,
783 buffer: Model<Buffer>,
784 event: &BufferEvent,
785 cx: &mut ModelContext<Self>,
786 ) {
787 match event {
788 BufferEvent::FileHandleChanged => {
789 self.buffer_changed_file(buffer, cx);
790 }
791 _ => {}
792 }
793 }
794
795 fn local_worktree_entry_changed(
796 &mut self,
797 entry_id: ProjectEntryId,
798 path: &Arc<Path>,
799 worktree: &Model<worktree::Worktree>,
800 snapshot: &worktree::Snapshot,
801 cx: &mut ModelContext<Self>,
802 ) -> Option<()> {
803 let project_path = ProjectPath {
804 worktree_id: snapshot.id(),
805 path: path.clone(),
806 };
807 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
808 Some(&buffer_id) => buffer_id,
809 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
810 };
811 let buffer = if let Some(buffer) = self.get(buffer_id) {
812 buffer
813 } else {
814 self.opened_buffers.remove(&buffer_id);
815 self.local_buffer_ids_by_path.remove(&project_path);
816 self.local_buffer_ids_by_entry_id.remove(&entry_id);
817 return None;
818 };
819
820 let events = buffer.update(cx, |buffer, cx| {
821 let file = buffer.file()?;
822 let old_file = File::from_dyn(Some(file))?;
823 if old_file.worktree != *worktree {
824 return None;
825 }
826
827 let new_file = if let Some(entry) = old_file
828 .entry_id
829 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
830 {
831 File {
832 is_local: true,
833 entry_id: Some(entry.id),
834 mtime: entry.mtime,
835 path: entry.path.clone(),
836 worktree: worktree.clone(),
837 is_deleted: false,
838 is_private: entry.is_private,
839 }
840 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
841 File {
842 is_local: true,
843 entry_id: Some(entry.id),
844 mtime: entry.mtime,
845 path: entry.path.clone(),
846 worktree: worktree.clone(),
847 is_deleted: false,
848 is_private: entry.is_private,
849 }
850 } else {
851 File {
852 is_local: true,
853 entry_id: old_file.entry_id,
854 path: old_file.path.clone(),
855 mtime: old_file.mtime,
856 worktree: worktree.clone(),
857 is_deleted: true,
858 is_private: old_file.is_private,
859 }
860 };
861
862 if new_file == *old_file {
863 return None;
864 }
865
866 let mut events = Vec::new();
867 if new_file.path != old_file.path {
868 self.local_buffer_ids_by_path.remove(&ProjectPath {
869 path: old_file.path.clone(),
870 worktree_id: old_file.worktree_id(cx),
871 });
872 self.local_buffer_ids_by_path.insert(
873 ProjectPath {
874 worktree_id: new_file.worktree_id(cx),
875 path: new_file.path.clone(),
876 },
877 buffer_id,
878 );
879 events.push(BufferStoreEvent::BufferChangedFilePath {
880 buffer: cx.handle(),
881 old_file: buffer.file().cloned(),
882 });
883 }
884
885 if new_file.entry_id != old_file.entry_id {
886 if let Some(entry_id) = old_file.entry_id {
887 self.local_buffer_ids_by_entry_id.remove(&entry_id);
888 }
889 if let Some(entry_id) = new_file.entry_id {
890 self.local_buffer_ids_by_entry_id
891 .insert(entry_id, buffer_id);
892 }
893 }
894
895 if let Some(project_id) = self.remote_id {
896 events.push(BufferStoreEvent::MessageToReplicas(Box::new(
897 proto::UpdateBufferFile {
898 project_id,
899 buffer_id: buffer_id.to_proto(),
900 file: Some(new_file.to_proto(cx)),
901 }
902 .into_envelope(0, None, None),
903 )))
904 }
905
906 buffer.file_updated(Arc::new(new_file), cx);
907 Some(events)
908 })?;
909
910 for event in events {
911 cx.emit(event);
912 }
913
914 None
915 }
916
917 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
918 let file = File::from_dyn(buffer.read(cx).file())?;
919
920 let remote_id = buffer.read(cx).remote_id();
921 if let Some(entry_id) = file.entry_id {
922 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
923 Some(_) => {
924 return None;
925 }
926 None => {
927 self.local_buffer_ids_by_entry_id
928 .insert(entry_id, remote_id);
929 }
930 }
931 };
932 self.local_buffer_ids_by_path.insert(
933 ProjectPath {
934 worktree_id: file.worktree_id(cx),
935 path: file.path.clone(),
936 },
937 remote_id,
938 );
939
940 Some(())
941 }
942
943 pub async fn create_buffer_for_peer(
944 this: Model<Self>,
945 peer_id: PeerId,
946 buffer_id: BufferId,
947 project_id: u64,
948 client: AnyProtoClient,
949 cx: &mut AsyncAppContext,
950 ) -> Result<()> {
951 let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
952 return Ok(());
953 };
954
955 let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
956 let operations = operations.await;
957 let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
958
959 let initial_state = proto::CreateBufferForPeer {
960 project_id,
961 peer_id: Some(peer_id),
962 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
963 };
964
965 if client.send(initial_state).log_err().is_some() {
966 let client = client.clone();
967 cx.background_executor()
968 .spawn(async move {
969 let mut chunks = split_operations(operations).peekable();
970 while let Some(chunk) = chunks.next() {
971 let is_last = chunks.peek().is_none();
972 client.send(proto::CreateBufferForPeer {
973 project_id,
974 peer_id: Some(peer_id),
975 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
976 proto::BufferChunk {
977 buffer_id: buffer_id.into(),
978 operations: chunk,
979 is_last,
980 },
981 )),
982 })?;
983 }
984 anyhow::Ok(())
985 })
986 .await
987 .log_err();
988 }
989 Ok(())
990 }
991
992 pub async fn handle_update_buffer(
993 this: Model<Self>,
994 envelope: TypedEnvelope<proto::UpdateBuffer>,
995 mut cx: AsyncAppContext,
996 ) -> Result<proto::Ack> {
997 let payload = envelope.payload.clone();
998 let buffer_id = BufferId::new(payload.buffer_id)?;
999 let ops = payload
1000 .operations
1001 .into_iter()
1002 .map(language::proto::deserialize_operation)
1003 .collect::<Result<Vec<_>, _>>()?;
1004 this.update(&mut cx, |this, cx| {
1005 match this.opened_buffers.entry(buffer_id) {
1006 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1007 OpenBuffer::Strong(buffer) => {
1008 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1009 }
1010 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1011 OpenBuffer::Weak(_) => {}
1012 },
1013 hash_map::Entry::Vacant(e) => {
1014 e.insert(OpenBuffer::Operations(ops));
1015 }
1016 }
1017 Ok(proto::Ack {})
1018 })?
1019 }
1020
1021 pub fn handle_create_buffer_for_peer(
1022 &mut self,
1023 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1024 replica_id: u16,
1025 capability: Capability,
1026 cx: &mut ModelContext<Self>,
1027 ) -> Result<()> {
1028 match envelope
1029 .payload
1030 .variant
1031 .ok_or_else(|| anyhow!("missing variant"))?
1032 {
1033 proto::create_buffer_for_peer::Variant::State(mut state) => {
1034 let buffer_id = BufferId::new(state.id)?;
1035
1036 let buffer_result = maybe!({
1037 let mut buffer_file = None;
1038 if let Some(file) = state.file.take() {
1039 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1040 let worktree = self
1041 .worktree_store
1042 .read(cx)
1043 .worktree_for_id(worktree_id, cx)
1044 .ok_or_else(|| {
1045 anyhow!("no worktree found for id {}", file.worktree_id)
1046 })?;
1047 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1048 as Arc<dyn language::File>);
1049 }
1050 Buffer::from_proto(replica_id, capability, state, buffer_file)
1051 });
1052
1053 match buffer_result {
1054 Ok(buffer) => {
1055 let buffer = cx.new_model(|_| buffer);
1056 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1057 }
1058 Err(error) => {
1059 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1060 for listener in listeners {
1061 listener.send(Err(anyhow!(error.cloned()))).ok();
1062 }
1063 }
1064 }
1065 }
1066 }
1067 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1068 let buffer_id = BufferId::new(chunk.buffer_id)?;
1069 let buffer = self
1070 .loading_remote_buffers_by_id
1071 .get(&buffer_id)
1072 .cloned()
1073 .ok_or_else(|| {
1074 anyhow!(
1075 "received chunk for buffer {} without initial state",
1076 chunk.buffer_id
1077 )
1078 })?;
1079
1080 let result = maybe!({
1081 let operations = chunk
1082 .operations
1083 .into_iter()
1084 .map(language::proto::deserialize_operation)
1085 .collect::<Result<Vec<_>>>()?;
1086 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1087 });
1088
1089 if let Err(error) = result {
1090 self.loading_remote_buffers_by_id.remove(&buffer_id);
1091 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1092 for listener in listeners {
1093 listener.send(Err(error.cloned())).ok();
1094 }
1095 }
1096 } else if chunk.is_last {
1097 self.loading_remote_buffers_by_id.remove(&buffer_id);
1098 self.add_buffer(buffer, cx)?;
1099 }
1100 }
1101 }
1102
1103 Ok(())
1104 }
1105
1106 pub async fn handle_update_buffer_file(
1107 this: Model<Self>,
1108 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1109 mut cx: AsyncAppContext,
1110 ) -> Result<()> {
1111 let buffer_id = envelope.payload.buffer_id;
1112 let buffer_id = BufferId::new(buffer_id)?;
1113
1114 this.update(&mut cx, |this, cx| {
1115 let payload = envelope.payload.clone();
1116 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1117 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1118 let worktree = this
1119 .worktree_store
1120 .read(cx)
1121 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1122 .ok_or_else(|| anyhow!("no such worktree"))?;
1123 let file = File::from_proto(file, worktree, cx)?;
1124 let old_file = buffer.update(cx, |buffer, cx| {
1125 let old_file = buffer.file().cloned();
1126 let new_path = file.path.clone();
1127 buffer.file_updated(Arc::new(file), cx);
1128 if old_file
1129 .as_ref()
1130 .map_or(true, |old| *old.path() != new_path)
1131 {
1132 Some(old_file)
1133 } else {
1134 None
1135 }
1136 });
1137 if let Some(old_file) = old_file {
1138 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1139 }
1140 }
1141 Ok(())
1142 })?
1143 }
1144
1145 pub async fn handle_update_diff_base(
1146 this: Model<Self>,
1147 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1148 mut cx: AsyncAppContext,
1149 ) -> Result<()> {
1150 this.update(&mut cx, |this, cx| {
1151 let buffer_id = envelope.payload.buffer_id;
1152 let buffer_id = BufferId::new(buffer_id)?;
1153 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1154 buffer.update(cx, |buffer, cx| {
1155 buffer.set_diff_base(envelope.payload.diff_base, cx)
1156 });
1157 }
1158 Ok(())
1159 })?
1160 }
1161
1162 pub async fn handle_save_buffer(
1163 this: Model<Self>,
1164 envelope: TypedEnvelope<proto::SaveBuffer>,
1165 mut cx: AsyncAppContext,
1166 ) -> Result<proto::BufferSaved> {
1167 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1168 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1169 anyhow::Ok((
1170 this.get_existing(buffer_id)?,
1171 this.remote_id.context("project is not shared")?,
1172 ))
1173 })??;
1174 buffer
1175 .update(&mut cx, |buffer, _| {
1176 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1177 })?
1178 .await?;
1179 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1180
1181 if let Some(new_path) = envelope.payload.new_path {
1182 let new_path = ProjectPath::from_proto(new_path);
1183 this.update(&mut cx, |this, cx| {
1184 this.save_buffer_as(buffer.clone(), new_path, cx)
1185 })?
1186 .await?;
1187 } else {
1188 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1189 .await?;
1190 }
1191
1192 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1193 project_id,
1194 buffer_id: buffer_id.into(),
1195 version: serialize_version(buffer.saved_version()),
1196 mtime: buffer.saved_mtime().map(|time| time.into()),
1197 })
1198 }
1199
1200 pub async fn handle_buffer_saved(
1201 this: Model<Self>,
1202 envelope: TypedEnvelope<proto::BufferSaved>,
1203 mut cx: AsyncAppContext,
1204 ) -> Result<()> {
1205 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1206 let version = deserialize_version(&envelope.payload.version);
1207 let mtime = envelope.payload.mtime.map(|time| time.into());
1208 this.update(&mut cx, |this, cx| {
1209 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1210 buffer.update(cx, |buffer, cx| {
1211 buffer.did_save(version, mtime, cx);
1212 });
1213 }
1214 })
1215 }
1216
1217 pub async fn handle_buffer_reloaded(
1218 this: Model<Self>,
1219 envelope: TypedEnvelope<proto::BufferReloaded>,
1220 mut cx: AsyncAppContext,
1221 ) -> Result<()> {
1222 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1223 let version = deserialize_version(&envelope.payload.version);
1224 let mtime = envelope.payload.mtime.map(|time| time.into());
1225 let line_ending = deserialize_line_ending(
1226 proto::LineEnding::from_i32(envelope.payload.line_ending)
1227 .ok_or_else(|| anyhow!("missing line ending"))?,
1228 );
1229 this.update(&mut cx, |this, cx| {
1230 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1231 buffer.update(cx, |buffer, cx| {
1232 buffer.did_reload(version, line_ending, mtime, cx);
1233 });
1234 }
1235 })
1236 }
1237
1238 pub async fn handle_blame_buffer(
1239 this: Model<Self>,
1240 envelope: TypedEnvelope<proto::BlameBuffer>,
1241 mut cx: AsyncAppContext,
1242 ) -> Result<proto::BlameBufferResponse> {
1243 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1244 let version = deserialize_version(&envelope.payload.version);
1245 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1246 buffer
1247 .update(&mut cx, |buffer, _| {
1248 buffer.wait_for_version(version.clone())
1249 })?
1250 .await?;
1251 let blame = this
1252 .update(&mut cx, |this, cx| {
1253 this.blame_buffer(&buffer, Some(version), cx)
1254 })?
1255 .await?;
1256 Ok(serialize_blame_buffer_response(blame))
1257 }
1258
1259 pub async fn wait_for_loading_buffer(
1260 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1261 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1262 loop {
1263 if let Some(result) = receiver.borrow().as_ref() {
1264 match result {
1265 Ok(buffer) => return Ok(buffer.to_owned()),
1266 Err(e) => return Err(e.to_owned()),
1267 }
1268 }
1269 receiver.next().await;
1270 }
1271 }
1272}
1273
1274impl OpenBuffer {
1275 fn upgrade(&self) -> Option<Model<Buffer>> {
1276 match self {
1277 OpenBuffer::Strong(handle) => Some(handle.clone()),
1278 OpenBuffer::Weak(handle) => handle.upgrade(),
1279 OpenBuffer::Operations(_) => None,
1280 }
1281 }
1282}
1283
1284fn is_not_found_error(error: &anyhow::Error) -> bool {
1285 error
1286 .root_cause()
1287 .downcast_ref::<io::Error>()
1288 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1289}
1290
1291fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1292 let entries = blame
1293 .entries
1294 .into_iter()
1295 .map(|entry| proto::BlameEntry {
1296 sha: entry.sha.as_bytes().into(),
1297 start_line: entry.range.start,
1298 end_line: entry.range.end,
1299 original_line_number: entry.original_line_number,
1300 author: entry.author.clone(),
1301 author_mail: entry.author_mail.clone(),
1302 author_time: entry.author_time,
1303 author_tz: entry.author_tz.clone(),
1304 committer: entry.committer.clone(),
1305 committer_mail: entry.committer_mail.clone(),
1306 committer_time: entry.committer_time,
1307 committer_tz: entry.committer_tz.clone(),
1308 summary: entry.summary.clone(),
1309 previous: entry.previous.clone(),
1310 filename: entry.filename.clone(),
1311 })
1312 .collect::<Vec<_>>();
1313
1314 let messages = blame
1315 .messages
1316 .into_iter()
1317 .map(|(oid, message)| proto::CommitMessage {
1318 oid: oid.as_bytes().into(),
1319 message,
1320 })
1321 .collect::<Vec<_>>();
1322
1323 let permalinks = blame
1324 .permalinks
1325 .into_iter()
1326 .map(|(oid, url)| proto::CommitPermalink {
1327 oid: oid.as_bytes().into(),
1328 permalink: url.to_string(),
1329 })
1330 .collect::<Vec<_>>();
1331
1332 proto::BlameBufferResponse {
1333 entries,
1334 messages,
1335 permalinks,
1336 remote_url: blame.remote_url,
1337 }
1338}
1339
1340fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1341 let entries = response
1342 .entries
1343 .into_iter()
1344 .filter_map(|entry| {
1345 Some(git::blame::BlameEntry {
1346 sha: git::Oid::from_bytes(&entry.sha).ok()?,
1347 range: entry.start_line..entry.end_line,
1348 original_line_number: entry.original_line_number,
1349 committer: entry.committer,
1350 committer_time: entry.committer_time,
1351 committer_tz: entry.committer_tz,
1352 committer_mail: entry.committer_mail,
1353 author: entry.author,
1354 author_mail: entry.author_mail,
1355 author_time: entry.author_time,
1356 author_tz: entry.author_tz,
1357 summary: entry.summary,
1358 previous: entry.previous,
1359 filename: entry.filename,
1360 })
1361 })
1362 .collect::<Vec<_>>();
1363
1364 let messages = response
1365 .messages
1366 .into_iter()
1367 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1368 .collect::<HashMap<_, _>>();
1369
1370 let permalinks = response
1371 .permalinks
1372 .into_iter()
1373 .filter_map(|permalink| {
1374 Some((
1375 git::Oid::from_bytes(&permalink.oid).ok()?,
1376 Url::from_str(&permalink.permalink).ok()?,
1377 ))
1378 })
1379 .collect::<HashMap<_, _>>();
1380
1381 Blame {
1382 entries,
1383 permalinks,
1384 messages,
1385 remote_url: response.remote_url,
1386 }
1387}