1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, ProjectPath,
5};
6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
7use anyhow::{anyhow, Context as _, Result};
8use client::Client;
9use collections::{hash_map, HashMap, HashSet};
10use fs::Fs;
11use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
12use git::blame::Blame;
13use gpui::{
14 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
15 Task, WeakModel,
16};
17use http_client::Url;
18use language::{
19 proto::{
20 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
21 split_operations,
22 },
23 Buffer, BufferEvent, Capability, File as _, Language, Operation,
24};
25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
26use smol::channel::Receiver;
27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
28use text::BufferId;
29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
31
32trait BufferStoreImpl {
33 fn open_buffer(
34 &self,
35 path: Arc<Path>,
36 worktree: Model<Worktree>,
37 cx: &mut ModelContext<BufferStore>,
38 ) -> Task<Result<Model<Buffer>>>;
39
40 fn save_buffer(
41 &self,
42 buffer: Model<Buffer>,
43 cx: &mut ModelContext<BufferStore>,
44 ) -> Task<Result<()>>;
45
46 fn save_buffer_as(
47 &self,
48 buffer: Model<Buffer>,
49 path: ProjectPath,
50 cx: &mut ModelContext<BufferStore>,
51 ) -> Task<Result<()>>;
52
53 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>>;
54
55 fn reload_buffers(
56 &self,
57 buffers: HashSet<Model<Buffer>>,
58 push_to_history: bool,
59 cx: &mut ModelContext<BufferStore>,
60 ) -> Task<Result<ProjectTransaction>>;
61
62 fn as_remote(&self) -> Option<Model<RemoteBufferStore>>;
63 fn as_local(&self) -> Option<Model<LocalBufferStore>>;
64}
65
66struct RemoteBufferStore {
67 shared_with_me: HashSet<Model<Buffer>>,
68 upstream_client: AnyProtoClient,
69 project_id: u64,
70 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
71 remote_buffer_listeners:
72 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
73 worktree_store: Model<WorktreeStore>,
74 buffer_store: WeakModel<BufferStore>,
75}
76
77struct LocalBufferStore {
78 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
79 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
80 buffer_store: WeakModel<BufferStore>,
81 worktree_store: Model<WorktreeStore>,
82 _subscription: Subscription,
83}
84
85/// A set of open buffers.
86pub struct BufferStore {
87 state: Box<dyn BufferStoreImpl>,
88 #[allow(clippy::type_complexity)]
89 loading_buffers_by_path: HashMap<
90 ProjectPath,
91 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
92 >,
93 worktree_store: Model<WorktreeStore>,
94 opened_buffers: HashMap<BufferId, OpenBuffer>,
95 downstream_client: Option<(AnyProtoClient, u64)>,
96 shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
97}
98
99enum OpenBuffer {
100 Buffer(WeakModel<Buffer>),
101 Operations(Vec<Operation>),
102}
103
104pub enum BufferStoreEvent {
105 BufferAdded(Model<Buffer>),
106 BufferDropped(BufferId),
107 BufferChangedFilePath {
108 buffer: Model<Buffer>,
109 old_file: Option<Arc<dyn language::File>>,
110 },
111}
112
113#[derive(Default, Debug)]
114pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
115
116impl EventEmitter<BufferStoreEvent> for BufferStore {}
117
118impl RemoteBufferStore {
119 pub fn wait_for_remote_buffer(
120 &mut self,
121 id: BufferId,
122 cx: &mut AppContext,
123 ) -> Task<Result<Model<Buffer>>> {
124 let buffer_store = self.buffer_store.clone();
125 let (tx, rx) = oneshot::channel();
126 self.remote_buffer_listeners.entry(id).or_default().push(tx);
127
128 cx.spawn(|cx| async move {
129 if let Some(buffer) = buffer_store
130 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
131 .ok()
132 .flatten()
133 {
134 return Ok(buffer);
135 }
136
137 cx.background_executor()
138 .spawn(async move { rx.await? })
139 .await
140 })
141 }
142
143 fn save_remote_buffer(
144 &self,
145 buffer_handle: Model<Buffer>,
146 new_path: Option<proto::ProjectPath>,
147 cx: &ModelContext<Self>,
148 ) -> Task<Result<()>> {
149 let buffer = buffer_handle.read(cx);
150 let buffer_id = buffer.remote_id().into();
151 let version = buffer.version();
152 let rpc = self.upstream_client.clone();
153 let project_id = self.project_id;
154 cx.spawn(move |_, mut cx| async move {
155 let response = rpc
156 .request(proto::SaveBuffer {
157 project_id,
158 buffer_id,
159 new_path,
160 version: serialize_version(&version),
161 })
162 .await?;
163 let version = deserialize_version(&response.version);
164 let mtime = response.mtime.map(|mtime| mtime.into());
165
166 buffer_handle.update(&mut cx, |buffer, cx| {
167 buffer.did_save(version.clone(), mtime, cx);
168 })?;
169
170 Ok(())
171 })
172 }
173
174 pub fn handle_create_buffer_for_peer(
175 &mut self,
176 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
177 replica_id: u16,
178 capability: Capability,
179 cx: &mut ModelContext<Self>,
180 ) -> Result<Option<Model<Buffer>>> {
181 match envelope
182 .payload
183 .variant
184 .ok_or_else(|| anyhow!("missing variant"))?
185 {
186 proto::create_buffer_for_peer::Variant::State(mut state) => {
187 let buffer_id = BufferId::new(state.id)?;
188
189 let buffer_result = maybe!({
190 let mut buffer_file = None;
191 if let Some(file) = state.file.take() {
192 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
193 let worktree = self
194 .worktree_store
195 .read(cx)
196 .worktree_for_id(worktree_id, cx)
197 .ok_or_else(|| {
198 anyhow!("no worktree found for id {}", file.worktree_id)
199 })?;
200 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
201 as Arc<dyn language::File>);
202 }
203 Buffer::from_proto(replica_id, capability, state, buffer_file)
204 });
205
206 match buffer_result {
207 Ok(buffer) => {
208 let buffer = cx.new_model(|_| buffer);
209 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
210 }
211 Err(error) => {
212 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
213 for listener in listeners {
214 listener.send(Err(anyhow!(error.cloned()))).ok();
215 }
216 }
217 }
218 }
219 }
220 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
221 let buffer_id = BufferId::new(chunk.buffer_id)?;
222 let buffer = self
223 .loading_remote_buffers_by_id
224 .get(&buffer_id)
225 .cloned()
226 .ok_or_else(|| {
227 anyhow!(
228 "received chunk for buffer {} without initial state",
229 chunk.buffer_id
230 )
231 })?;
232
233 let result = maybe!({
234 let operations = chunk
235 .operations
236 .into_iter()
237 .map(language::proto::deserialize_operation)
238 .collect::<Result<Vec<_>>>()?;
239 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
240 anyhow::Ok(())
241 });
242
243 if let Err(error) = result {
244 self.loading_remote_buffers_by_id.remove(&buffer_id);
245 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
246 for listener in listeners {
247 listener.send(Err(error.cloned())).ok();
248 }
249 }
250 } else if chunk.is_last {
251 self.loading_remote_buffers_by_id.remove(&buffer_id);
252 if self.upstream_client.is_via_collab() {
253 // retain buffers sent by peers to avoid races.
254 self.shared_with_me.insert(buffer.clone());
255 }
256
257 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
258 for sender in senders {
259 sender.send(Ok(buffer.clone())).ok();
260 }
261 }
262 return Ok(Some(buffer));
263 }
264 }
265 }
266 return Ok(None);
267 }
268
269 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
270 self.loading_remote_buffers_by_id
271 .keys()
272 .copied()
273 .collect::<Vec<_>>()
274 }
275
276 pub fn deserialize_project_transaction(
277 &self,
278 message: proto::ProjectTransaction,
279 push_to_history: bool,
280 cx: &mut ModelContext<Self>,
281 ) -> Task<Result<ProjectTransaction>> {
282 cx.spawn(|this, mut cx| async move {
283 let mut project_transaction = ProjectTransaction::default();
284 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
285 {
286 let buffer_id = BufferId::new(buffer_id)?;
287 let buffer = this
288 .update(&mut cx, |this, cx| {
289 this.wait_for_remote_buffer(buffer_id, cx)
290 })?
291 .await?;
292 let transaction = language::proto::deserialize_transaction(transaction)?;
293 project_transaction.0.insert(buffer, transaction);
294 }
295
296 for (buffer, transaction) in &project_transaction.0 {
297 buffer
298 .update(&mut cx, |buffer, _| {
299 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
300 })?
301 .await?;
302
303 if push_to_history {
304 buffer.update(&mut cx, |buffer, _| {
305 buffer.push_transaction(transaction.clone(), Instant::now());
306 })?;
307 }
308 }
309
310 Ok(project_transaction)
311 })
312 }
313}
314
315impl BufferStoreImpl for Model<RemoteBufferStore> {
316 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
317 Some(self.clone())
318 }
319
320 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
321 None
322 }
323
324 fn save_buffer(
325 &self,
326 buffer: Model<Buffer>,
327 cx: &mut ModelContext<BufferStore>,
328 ) -> Task<Result<()>> {
329 self.update(cx, |this, cx| {
330 this.save_remote_buffer(buffer.clone(), None, cx)
331 })
332 }
333 fn save_buffer_as(
334 &self,
335 buffer: Model<Buffer>,
336 path: ProjectPath,
337 cx: &mut ModelContext<BufferStore>,
338 ) -> Task<Result<()>> {
339 self.update(cx, |this, cx| {
340 this.save_remote_buffer(buffer, Some(path.to_proto()), cx)
341 })
342 }
343
344 fn open_buffer(
345 &self,
346 path: Arc<Path>,
347 worktree: Model<Worktree>,
348 cx: &mut ModelContext<BufferStore>,
349 ) -> Task<Result<Model<Buffer>>> {
350 self.update(cx, |this, cx| {
351 let worktree_id = worktree.read(cx).id().to_proto();
352 let project_id = this.project_id;
353 let client = this.upstream_client.clone();
354 let path_string = path.clone().to_string_lossy().to_string();
355 cx.spawn(move |this, mut cx| async move {
356 let response = client
357 .request(proto::OpenBufferByPath {
358 project_id,
359 worktree_id,
360 path: path_string,
361 })
362 .await?;
363 let buffer_id = BufferId::new(response.buffer_id)?;
364
365 let buffer = this
366 .update(&mut cx, {
367 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
368 })?
369 .await?;
370
371 Ok(buffer)
372 })
373 })
374 }
375
376 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
377 self.update(cx, |this, cx| {
378 let create = this.upstream_client.request(proto::OpenNewBuffer {
379 project_id: this.project_id,
380 });
381 cx.spawn(|this, mut cx| async move {
382 let response = create.await?;
383 let buffer_id = BufferId::new(response.buffer_id)?;
384
385 this.update(&mut cx, |this, cx| {
386 this.wait_for_remote_buffer(buffer_id, cx)
387 })?
388 .await
389 })
390 })
391 }
392
393 fn reload_buffers(
394 &self,
395 buffers: HashSet<Model<Buffer>>,
396 push_to_history: bool,
397 cx: &mut ModelContext<BufferStore>,
398 ) -> Task<Result<ProjectTransaction>> {
399 self.update(cx, |this, cx| {
400 let request = this.upstream_client.request(proto::ReloadBuffers {
401 project_id: this.project_id,
402 buffer_ids: buffers
403 .iter()
404 .map(|buffer| buffer.read(cx).remote_id().to_proto())
405 .collect(),
406 });
407
408 cx.spawn(|this, mut cx| async move {
409 let response = request
410 .await?
411 .transaction
412 .ok_or_else(|| anyhow!("missing transaction"))?;
413 this.update(&mut cx, |this, cx| {
414 this.deserialize_project_transaction(response, push_to_history, cx)
415 })?
416 .await
417 })
418 })
419 }
420}
421
422impl LocalBufferStore {
423 fn save_local_buffer(
424 &self,
425 buffer_handle: Model<Buffer>,
426 worktree: Model<Worktree>,
427 path: Arc<Path>,
428 mut has_changed_file: bool,
429 cx: &mut ModelContext<Self>,
430 ) -> Task<Result<()>> {
431 let buffer = buffer_handle.read(cx);
432
433 let text = buffer.as_rope().clone();
434 let line_ending = buffer.line_ending();
435 let version = buffer.version();
436 let buffer_id = buffer.remote_id();
437 if buffer.file().is_some_and(|file| !file.is_created()) {
438 has_changed_file = true;
439 }
440
441 let save = worktree.update(cx, |worktree, cx| {
442 worktree.write_file(path.as_ref(), text, line_ending, cx)
443 });
444
445 cx.spawn(move |this, mut cx| async move {
446 let new_file = save.await?;
447 let mtime = new_file.mtime;
448 this.update(&mut cx, |this, cx| {
449 if let Some((downstream_client, project_id)) = this.downstream_client(cx) {
450 if has_changed_file {
451 downstream_client
452 .send(proto::UpdateBufferFile {
453 project_id,
454 buffer_id: buffer_id.to_proto(),
455 file: Some(language::File::to_proto(&*new_file, cx)),
456 })
457 .log_err();
458 }
459 downstream_client
460 .send(proto::BufferSaved {
461 project_id,
462 buffer_id: buffer_id.to_proto(),
463 version: serialize_version(&version),
464 mtime: mtime.map(|time| time.into()),
465 })
466 .log_err();
467 }
468 })?;
469 buffer_handle.update(&mut cx, |buffer, cx| {
470 if has_changed_file {
471 buffer.file_updated(new_file, cx);
472 }
473 buffer.did_save(version.clone(), mtime, cx);
474 })
475 })
476 }
477
478 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
479 cx.subscribe(worktree, |this, worktree, event, cx| {
480 if worktree.read(cx).is_local() {
481 match event {
482 worktree::Event::UpdatedEntries(changes) => {
483 this.local_worktree_entries_changed(&worktree, changes, cx);
484 }
485 worktree::Event::UpdatedGitRepositories(updated_repos) => {
486 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
487 }
488 _ => {}
489 }
490 }
491 })
492 .detach();
493 }
494
495 fn local_worktree_entries_changed(
496 &mut self,
497 worktree_handle: &Model<Worktree>,
498 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
499 cx: &mut ModelContext<Self>,
500 ) {
501 let snapshot = worktree_handle.read(cx).snapshot();
502 for (path, entry_id, _) in changes {
503 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
504 }
505 }
506
507 fn local_worktree_git_repos_changed(
508 &mut self,
509 worktree_handle: Model<Worktree>,
510 changed_repos: &UpdatedGitRepositoriesSet,
511 cx: &mut ModelContext<Self>,
512 ) {
513 debug_assert!(worktree_handle.read(cx).is_local());
514 let Some(buffer_store) = self.buffer_store.upgrade() else {
515 return;
516 };
517
518 // Identify the loading buffers whose containing repository that has changed.
519 let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| {
520 let future_buffers = buffer_store
521 .loading_buffers()
522 .filter_map(|(project_path, receiver)| {
523 if project_path.worktree_id != worktree_handle.read(cx).id() {
524 return None;
525 }
526 let path = &project_path.path;
527 changed_repos
528 .iter()
529 .find(|(work_dir, _)| path.starts_with(work_dir))?;
530 let path = path.clone();
531 Some(async move {
532 BufferStore::wait_for_loading_buffer(receiver)
533 .await
534 .ok()
535 .map(|buffer| (buffer, path))
536 })
537 })
538 .collect::<FuturesUnordered<_>>();
539
540 // Identify the current buffers whose containing repository has changed.
541 let current_buffers = buffer_store
542 .buffers()
543 .filter_map(|buffer| {
544 let file = File::from_dyn(buffer.read(cx).file())?;
545 if file.worktree != worktree_handle {
546 return None;
547 }
548 changed_repos
549 .iter()
550 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
551 Some((buffer, file.path.clone()))
552 })
553 .collect::<Vec<_>>();
554 (future_buffers, current_buffers)
555 });
556
557 if future_buffers.len() + current_buffers.len() == 0 {
558 return;
559 }
560
561 cx.spawn(move |this, mut cx| async move {
562 // Wait for all of the buffers to load.
563 let future_buffers = future_buffers.collect::<Vec<_>>().await;
564
565 // Reload the diff base for every buffer whose containing git repository has changed.
566 let snapshot =
567 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
568 let diff_bases_by_buffer = cx
569 .background_executor()
570 .spawn(async move {
571 let mut diff_base_tasks = future_buffers
572 .into_iter()
573 .flatten()
574 .chain(current_buffers)
575 .filter_map(|(buffer, path)| {
576 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
577 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
578 Some(async move {
579 let base_text =
580 local_repo_entry.repo().load_index_text(&relative_path);
581 Some((buffer, base_text))
582 })
583 })
584 .collect::<FuturesUnordered<_>>();
585
586 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
587 while let Some(diff_base) = diff_base_tasks.next().await {
588 if let Some(diff_base) = diff_base {
589 diff_bases.push(diff_base);
590 }
591 }
592 diff_bases
593 })
594 .await;
595
596 this.update(&mut cx, |this, cx| {
597 // Assign the new diff bases on all of the buffers.
598 for (buffer, diff_base) in diff_bases_by_buffer {
599 let buffer_id = buffer.update(cx, |buffer, cx| {
600 buffer.set_diff_base(diff_base.clone(), cx);
601 buffer.remote_id().to_proto()
602 });
603 if let Some((client, project_id)) = &this.downstream_client(cx) {
604 client
605 .send(proto::UpdateDiffBase {
606 project_id: *project_id,
607 buffer_id,
608 diff_base,
609 })
610 .log_err();
611 }
612 }
613 })
614 })
615 .detach_and_log_err(cx);
616 }
617
618 fn local_worktree_entry_changed(
619 &mut self,
620 entry_id: ProjectEntryId,
621 path: &Arc<Path>,
622 worktree: &Model<worktree::Worktree>,
623 snapshot: &worktree::Snapshot,
624 cx: &mut ModelContext<Self>,
625 ) -> Option<()> {
626 let project_path = ProjectPath {
627 worktree_id: snapshot.id(),
628 path: path.clone(),
629 };
630 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
631 Some(&buffer_id) => buffer_id,
632 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
633 };
634 let buffer = self
635 .buffer_store
636 .update(cx, |buffer_store, _| {
637 if let Some(buffer) = buffer_store.get(buffer_id) {
638 Some(buffer)
639 } else {
640 buffer_store.opened_buffers.remove(&buffer_id);
641 None
642 }
643 })
644 .ok()
645 .flatten();
646 let buffer = if let Some(buffer) = buffer {
647 buffer
648 } else {
649 self.local_buffer_ids_by_path.remove(&project_path);
650 self.local_buffer_ids_by_entry_id.remove(&entry_id);
651 return None;
652 };
653
654 let events = buffer.update(cx, |buffer, cx| {
655 let file = buffer.file()?;
656 let old_file = File::from_dyn(Some(file))?;
657 if old_file.worktree != *worktree {
658 return None;
659 }
660
661 let new_file = if let Some(entry) = old_file
662 .entry_id
663 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
664 {
665 File {
666 is_local: true,
667 entry_id: Some(entry.id),
668 mtime: entry.mtime,
669 path: entry.path.clone(),
670 worktree: worktree.clone(),
671 is_deleted: false,
672 is_private: entry.is_private,
673 }
674 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
675 File {
676 is_local: true,
677 entry_id: Some(entry.id),
678 mtime: entry.mtime,
679 path: entry.path.clone(),
680 worktree: worktree.clone(),
681 is_deleted: false,
682 is_private: entry.is_private,
683 }
684 } else {
685 File {
686 is_local: true,
687 entry_id: old_file.entry_id,
688 path: old_file.path.clone(),
689 mtime: old_file.mtime,
690 worktree: worktree.clone(),
691 is_deleted: true,
692 is_private: old_file.is_private,
693 }
694 };
695
696 if new_file == *old_file {
697 return None;
698 }
699
700 let mut events = Vec::new();
701 if new_file.path != old_file.path {
702 self.local_buffer_ids_by_path.remove(&ProjectPath {
703 path: old_file.path.clone(),
704 worktree_id: old_file.worktree_id(cx),
705 });
706 self.local_buffer_ids_by_path.insert(
707 ProjectPath {
708 worktree_id: new_file.worktree_id(cx),
709 path: new_file.path.clone(),
710 },
711 buffer_id,
712 );
713 events.push(BufferStoreEvent::BufferChangedFilePath {
714 buffer: cx.handle(),
715 old_file: buffer.file().cloned(),
716 });
717 }
718
719 if new_file.entry_id != old_file.entry_id {
720 if let Some(entry_id) = old_file.entry_id {
721 self.local_buffer_ids_by_entry_id.remove(&entry_id);
722 }
723 if let Some(entry_id) = new_file.entry_id {
724 self.local_buffer_ids_by_entry_id
725 .insert(entry_id, buffer_id);
726 }
727 }
728
729 if let Some((client, project_id)) = &self.downstream_client(cx) {
730 client
731 .send(proto::UpdateBufferFile {
732 project_id: *project_id,
733 buffer_id: buffer_id.to_proto(),
734 file: Some(new_file.to_proto(cx)),
735 })
736 .ok();
737 }
738
739 buffer.file_updated(Arc::new(new_file), cx);
740 Some(events)
741 })?;
742 self.buffer_store
743 .update(cx, |_buffer_store, cx| {
744 for event in events {
745 cx.emit(event);
746 }
747 })
748 .log_err()?;
749
750 None
751 }
752
753 fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> {
754 self.buffer_store
755 .upgrade()?
756 .read(cx)
757 .downstream_client
758 .clone()
759 }
760
761 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
762 let file = File::from_dyn(buffer.read(cx).file())?;
763
764 let remote_id = buffer.read(cx).remote_id();
765 if let Some(entry_id) = file.entry_id {
766 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
767 Some(_) => {
768 return None;
769 }
770 None => {
771 self.local_buffer_ids_by_entry_id
772 .insert(entry_id, remote_id);
773 }
774 }
775 };
776 self.local_buffer_ids_by_path.insert(
777 ProjectPath {
778 worktree_id: file.worktree_id(cx),
779 path: file.path.clone(),
780 },
781 remote_id,
782 );
783
784 Some(())
785 }
786}
787
788impl BufferStoreImpl for Model<LocalBufferStore> {
789 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
790 None
791 }
792
793 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
794 Some(self.clone())
795 }
796
797 fn save_buffer(
798 &self,
799 buffer: Model<Buffer>,
800 cx: &mut ModelContext<BufferStore>,
801 ) -> Task<Result<()>> {
802 self.update(cx, |this, cx| {
803 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
804 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
805 };
806 let worktree = file.worktree.clone();
807 this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
808 })
809 }
810
811 fn save_buffer_as(
812 &self,
813 buffer: Model<Buffer>,
814 path: ProjectPath,
815 cx: &mut ModelContext<BufferStore>,
816 ) -> Task<Result<()>> {
817 self.update(cx, |this, cx| {
818 let Some(worktree) = this
819 .worktree_store
820 .read(cx)
821 .worktree_for_id(path.worktree_id, cx)
822 else {
823 return Task::ready(Err(anyhow!("no such worktree")));
824 };
825 this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
826 })
827 }
828
829 fn open_buffer(
830 &self,
831 path: Arc<Path>,
832 worktree: Model<Worktree>,
833 cx: &mut ModelContext<BufferStore>,
834 ) -> Task<Result<Model<Buffer>>> {
835 let buffer_store = cx.weak_model();
836 self.update(cx, |_, cx| {
837 let load_buffer = worktree.update(cx, |worktree, cx| {
838 let load_file = worktree.load_file(path.as_ref(), cx);
839 let reservation = cx.reserve_model();
840 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
841 cx.spawn(move |_, mut cx| async move {
842 let loaded = load_file.await?;
843 let text_buffer = cx
844 .background_executor()
845 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
846 .await;
847 cx.insert_model(reservation, |_| {
848 Buffer::build(
849 text_buffer,
850 loaded.diff_base,
851 Some(loaded.file),
852 Capability::ReadWrite,
853 )
854 })
855 })
856 });
857
858 cx.spawn(move |this, mut cx| async move {
859 let buffer = match load_buffer.await {
860 Ok(buffer) => Ok(buffer),
861 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
862 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
863 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
864 Buffer::build(
865 text_buffer,
866 None,
867 Some(Arc::new(File {
868 worktree,
869 path,
870 mtime: None,
871 entry_id: None,
872 is_local: true,
873 is_deleted: false,
874 is_private: false,
875 })),
876 Capability::ReadWrite,
877 )
878 }),
879 Err(e) => Err(e),
880 }?;
881 this.update(&mut cx, |this, cx| {
882 buffer_store.update(cx, |buffer_store, cx| {
883 buffer_store.add_buffer(buffer.clone(), cx)
884 })??;
885 let buffer_id = buffer.read(cx).remote_id();
886 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
887 this.local_buffer_ids_by_path.insert(
888 ProjectPath {
889 worktree_id: file.worktree_id(cx),
890 path: file.path.clone(),
891 },
892 buffer_id,
893 );
894
895 if let Some(entry_id) = file.entry_id {
896 this.local_buffer_ids_by_entry_id
897 .insert(entry_id, buffer_id);
898 }
899 }
900
901 anyhow::Ok(())
902 })??;
903
904 Ok(buffer)
905 })
906 })
907 }
908
909 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
910 let handle = self.clone();
911 cx.spawn(|buffer_store, mut cx| async move {
912 let buffer = cx.new_model(|cx| {
913 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
914 })?;
915 buffer_store.update(&mut cx, |buffer_store, cx| {
916 buffer_store.add_buffer(buffer.clone(), cx).log_err();
917 let buffer_id = buffer.read(cx).remote_id();
918 handle.update(cx, |this, cx| {
919 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
920 this.local_buffer_ids_by_path.insert(
921 ProjectPath {
922 worktree_id: file.worktree_id(cx),
923 path: file.path.clone(),
924 },
925 buffer_id,
926 );
927
928 if let Some(entry_id) = file.entry_id {
929 this.local_buffer_ids_by_entry_id
930 .insert(entry_id, buffer_id);
931 }
932 }
933 });
934 })?;
935 Ok(buffer)
936 })
937 }
938
939 fn reload_buffers(
940 &self,
941 buffers: HashSet<Model<Buffer>>,
942 push_to_history: bool,
943 cx: &mut ModelContext<BufferStore>,
944 ) -> Task<Result<ProjectTransaction>> {
945 cx.spawn(move |_, mut cx| async move {
946 let mut project_transaction = ProjectTransaction::default();
947 for buffer in buffers {
948 let transaction = buffer
949 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
950 .await?;
951 buffer.update(&mut cx, |buffer, cx| {
952 if let Some(transaction) = transaction {
953 if !push_to_history {
954 buffer.forget_transaction(transaction.id);
955 }
956 project_transaction.0.insert(cx.handle(), transaction);
957 }
958 })?;
959 }
960
961 Ok(project_transaction)
962 })
963 }
964}
965
966impl BufferStore {
967 pub fn init(client: &AnyProtoClient) {
968 client.add_model_message_handler(Self::handle_buffer_reloaded);
969 client.add_model_message_handler(Self::handle_buffer_saved);
970 client.add_model_message_handler(Self::handle_update_buffer_file);
971 client.add_model_message_handler(Self::handle_update_diff_base);
972 client.add_model_request_handler(Self::handle_save_buffer);
973 client.add_model_request_handler(Self::handle_blame_buffer);
974 client.add_model_request_handler(Self::handle_reload_buffers);
975 client.add_model_request_handler(Self::handle_get_permalink_to_line);
976 }
977
978 /// Creates a buffer store, optionally retaining its buffers.
979 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
980 let this = cx.weak_model();
981 Self {
982 state: Box::new(cx.new_model(|cx| {
983 let subscription = cx.subscribe(
984 &worktree_store,
985 |this: &mut LocalBufferStore, _, event, cx| {
986 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
987 this.subscribe_to_worktree(worktree, cx);
988 }
989 },
990 );
991
992 LocalBufferStore {
993 local_buffer_ids_by_path: Default::default(),
994 local_buffer_ids_by_entry_id: Default::default(),
995 buffer_store: this,
996 worktree_store: worktree_store.clone(),
997 _subscription: subscription,
998 }
999 })),
1000 downstream_client: None,
1001 opened_buffers: Default::default(),
1002 shared_buffers: Default::default(),
1003 loading_buffers_by_path: Default::default(),
1004 worktree_store,
1005 }
1006 }
1007
1008 pub fn remote(
1009 worktree_store: Model<WorktreeStore>,
1010 upstream_client: AnyProtoClient,
1011 remote_id: u64,
1012 cx: &mut ModelContext<Self>,
1013 ) -> Self {
1014 let this = cx.weak_model();
1015 Self {
1016 state: Box::new(cx.new_model(|_| RemoteBufferStore {
1017 shared_with_me: Default::default(),
1018 loading_remote_buffers_by_id: Default::default(),
1019 remote_buffer_listeners: Default::default(),
1020 project_id: remote_id,
1021 upstream_client,
1022 worktree_store: worktree_store.clone(),
1023 buffer_store: this,
1024 })),
1025 downstream_client: None,
1026 opened_buffers: Default::default(),
1027 loading_buffers_by_path: Default::default(),
1028 shared_buffers: Default::default(),
1029 worktree_store,
1030 }
1031 }
1032
1033 pub fn open_buffer(
1034 &mut self,
1035 project_path: ProjectPath,
1036 cx: &mut ModelContext<Self>,
1037 ) -> Task<Result<Model<Buffer>>> {
1038 let existing_buffer = self.get_by_path(&project_path, cx);
1039 if let Some(existing_buffer) = existing_buffer {
1040 return Task::ready(Ok(existing_buffer));
1041 }
1042
1043 let Some(worktree) = self
1044 .worktree_store
1045 .read(cx)
1046 .worktree_for_id(project_path.worktree_id, cx)
1047 else {
1048 return Task::ready(Err(anyhow!("no such worktree")));
1049 };
1050
1051 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1052 // If the given path is already being loaded, then wait for that existing
1053 // task to complete and return the same buffer.
1054 hash_map::Entry::Occupied(e) => e.get().clone(),
1055
1056 // Otherwise, record the fact that this path is now being loaded.
1057 hash_map::Entry::Vacant(entry) => {
1058 let (mut tx, rx) = postage::watch::channel();
1059 entry.insert(rx.clone());
1060
1061 let project_path = project_path.clone();
1062 let load_buffer = self
1063 .state
1064 .open_buffer(project_path.path.clone(), worktree, cx);
1065
1066 cx.spawn(move |this, mut cx| async move {
1067 let load_result = load_buffer.await;
1068 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
1069 // Record the fact that the buffer is no longer loading.
1070 this.loading_buffers_by_path.remove(&project_path);
1071 let buffer = load_result.map_err(Arc::new)?;
1072 Ok(buffer)
1073 })?);
1074 anyhow::Ok(())
1075 })
1076 .detach();
1077 rx
1078 }
1079 };
1080
1081 cx.background_executor().spawn(async move {
1082 Self::wait_for_loading_buffer(loading_watch)
1083 .await
1084 .map_err(|e| e.cloned())
1085 })
1086 }
1087
1088 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1089 self.state.create_buffer(cx)
1090 }
1091
1092 pub fn save_buffer(
1093 &mut self,
1094 buffer: Model<Buffer>,
1095 cx: &mut ModelContext<Self>,
1096 ) -> Task<Result<()>> {
1097 self.state.save_buffer(buffer, cx)
1098 }
1099
1100 pub fn save_buffer_as(
1101 &mut self,
1102 buffer: Model<Buffer>,
1103 path: ProjectPath,
1104 cx: &mut ModelContext<Self>,
1105 ) -> Task<Result<()>> {
1106 let old_file = buffer.read(cx).file().cloned();
1107 let task = self.state.save_buffer_as(buffer.clone(), path, cx);
1108 cx.spawn(|this, mut cx| async move {
1109 task.await?;
1110 this.update(&mut cx, |_, cx| {
1111 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1112 })
1113 })
1114 }
1115
1116 pub fn blame_buffer(
1117 &self,
1118 buffer: &Model<Buffer>,
1119 version: Option<clock::Global>,
1120 cx: &AppContext,
1121 ) -> Task<Result<Option<Blame>>> {
1122 let buffer = buffer.read(cx);
1123 let Some(file) = File::from_dyn(buffer.file()) else {
1124 return Task::ready(Err(anyhow!("buffer has no file")));
1125 };
1126
1127 match file.worktree.clone().read(cx) {
1128 Worktree::Local(worktree) => {
1129 let worktree = worktree.snapshot();
1130 let blame_params = maybe!({
1131 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1132 Some(repo_for_path) => repo_for_path,
1133 None => return Ok(None),
1134 };
1135
1136 let relative_path = repo_entry
1137 .relativize(&worktree, &file.path)
1138 .context("failed to relativize buffer path")?;
1139
1140 let repo = local_repo_entry.repo().clone();
1141
1142 let content = match version {
1143 Some(version) => buffer.rope_for_version(&version).clone(),
1144 None => buffer.as_rope().clone(),
1145 };
1146
1147 anyhow::Ok(Some((repo, relative_path, content)))
1148 });
1149
1150 cx.background_executor().spawn(async move {
1151 let Some((repo, relative_path, content)) = blame_params? else {
1152 return Ok(None);
1153 };
1154 repo.blame(&relative_path, content)
1155 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1156 .map(Some)
1157 })
1158 }
1159 Worktree::Remote(worktree) => {
1160 let buffer_id = buffer.remote_id();
1161 let version = buffer.version();
1162 let project_id = worktree.project_id();
1163 let client = worktree.client();
1164 cx.spawn(|_| async move {
1165 let response = client
1166 .request(proto::BlameBuffer {
1167 project_id,
1168 buffer_id: buffer_id.into(),
1169 version: serialize_version(&version),
1170 })
1171 .await?;
1172 Ok(deserialize_blame_buffer_response(response))
1173 })
1174 }
1175 }
1176 }
1177
1178 pub fn get_permalink_to_line(
1179 &self,
1180 buffer: &Model<Buffer>,
1181 selection: Range<u32>,
1182 cx: &AppContext,
1183 ) -> Task<Result<url::Url>> {
1184 let buffer = buffer.read(cx);
1185 let Some(file) = File::from_dyn(buffer.file()) else {
1186 return Task::ready(Err(anyhow!("buffer has no file")));
1187 };
1188
1189 match file.worktree.clone().read(cx) {
1190 Worktree::Local(worktree) => {
1191 let Some(repo) = worktree.local_git_repo(file.path()) else {
1192 return Task::ready(Err(anyhow!("no repository for buffer found")));
1193 };
1194
1195 let path = file.path().clone();
1196
1197 cx.spawn(|cx| async move {
1198 const REMOTE_NAME: &str = "origin";
1199 let origin_url = repo
1200 .remote_url(REMOTE_NAME)
1201 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1202
1203 let sha = repo
1204 .head_sha()
1205 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1206
1207 let provider_registry =
1208 cx.update(GitHostingProviderRegistry::default_global)?;
1209
1210 let (provider, remote) =
1211 parse_git_remote_url(provider_registry, &origin_url)
1212 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1213
1214 let path = path
1215 .to_str()
1216 .context("failed to convert buffer path to string")?;
1217
1218 Ok(provider.build_permalink(
1219 remote,
1220 BuildPermalinkParams {
1221 sha: &sha,
1222 path,
1223 selection: Some(selection),
1224 },
1225 ))
1226 })
1227 }
1228 Worktree::Remote(worktree) => {
1229 let buffer_id = buffer.remote_id();
1230 let project_id = worktree.project_id();
1231 let client = worktree.client();
1232 cx.spawn(|_| async move {
1233 let response = client
1234 .request(proto::GetPermalinkToLine {
1235 project_id,
1236 buffer_id: buffer_id.into(),
1237 selection: Some(proto::Range {
1238 start: selection.start as u64,
1239 end: selection.end as u64,
1240 }),
1241 })
1242 .await?;
1243
1244 url::Url::parse(&response.permalink).context("failed to parse permalink")
1245 })
1246 }
1247 }
1248 }
1249
1250 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1251 let remote_id = buffer.read(cx).remote_id();
1252 let is_remote = buffer.read(cx).replica_id() != 0;
1253 let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
1254
1255 let handle = cx.handle().downgrade();
1256 buffer.update(cx, move |_, cx| {
1257 cx.on_release(move |buffer, cx| {
1258 handle
1259 .update(cx, |_, cx| {
1260 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1261 })
1262 .ok();
1263 })
1264 .detach()
1265 });
1266
1267 match self.opened_buffers.entry(remote_id) {
1268 hash_map::Entry::Vacant(entry) => {
1269 entry.insert(open_buffer);
1270 }
1271 hash_map::Entry::Occupied(mut entry) => {
1272 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1273 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1274 } else if entry.get().upgrade().is_some() {
1275 if is_remote {
1276 return Ok(());
1277 } else {
1278 debug_panic!("buffer {} was already registered", remote_id);
1279 Err(anyhow!("buffer {} was already registered", remote_id))?;
1280 }
1281 }
1282 entry.insert(open_buffer);
1283 }
1284 }
1285
1286 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1287 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1288 Ok(())
1289 }
1290
1291 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1292 self.opened_buffers
1293 .values()
1294 .filter_map(|buffer| buffer.upgrade())
1295 }
1296
1297 pub fn loading_buffers(
1298 &self,
1299 ) -> impl Iterator<
1300 Item = (
1301 &ProjectPath,
1302 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1303 ),
1304 > {
1305 self.loading_buffers_by_path
1306 .iter()
1307 .map(|(path, rx)| (path, rx.clone()))
1308 }
1309
1310 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1311 self.buffers().find_map(|buffer| {
1312 let file = File::from_dyn(buffer.read(cx).file())?;
1313 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1314 Some(buffer)
1315 } else {
1316 None
1317 }
1318 })
1319 }
1320
1321 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1322 self.opened_buffers
1323 .get(&buffer_id)
1324 .and_then(|buffer| buffer.upgrade())
1325 }
1326
1327 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1328 self.get(buffer_id)
1329 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1330 }
1331
1332 pub fn get_possibly_incomplete(
1333 &self,
1334 buffer_id: BufferId,
1335 cx: &AppContext,
1336 ) -> Option<Model<Buffer>> {
1337 self.get(buffer_id).or_else(|| {
1338 self.state.as_remote().and_then(|remote| {
1339 remote
1340 .read(cx)
1341 .loading_remote_buffers_by_id
1342 .get(&buffer_id)
1343 .cloned()
1344 })
1345 })
1346 }
1347
1348 pub fn buffer_version_info(
1349 &self,
1350 cx: &AppContext,
1351 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1352 let buffers = self
1353 .buffers()
1354 .map(|buffer| {
1355 let buffer = buffer.read(cx);
1356 proto::BufferVersion {
1357 id: buffer.remote_id().into(),
1358 version: language::proto::serialize_version(&buffer.version),
1359 }
1360 })
1361 .collect();
1362 let incomplete_buffer_ids = self
1363 .state
1364 .as_remote()
1365 .map(|remote| remote.read(cx).incomplete_buffer_ids())
1366 .unwrap_or_default();
1367 (buffers, incomplete_buffer_ids)
1368 }
1369
1370 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1371 for open_buffer in self.opened_buffers.values_mut() {
1372 if let Some(buffer) = open_buffer.upgrade() {
1373 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1374 }
1375 }
1376
1377 for buffer in self.buffers() {
1378 buffer.update(cx, |buffer, cx| {
1379 buffer.set_capability(Capability::ReadOnly, cx)
1380 });
1381 }
1382
1383 if let Some(remote) = self.state.as_remote() {
1384 remote.update(cx, |remote, _| {
1385 // Wake up all futures currently waiting on a buffer to get opened,
1386 // to give them a chance to fail now that we've disconnected.
1387 remote.remote_buffer_listeners.clear()
1388 })
1389 }
1390 }
1391
1392 pub fn shared(
1393 &mut self,
1394 remote_id: u64,
1395 downstream_client: AnyProtoClient,
1396 _cx: &mut AppContext,
1397 ) {
1398 self.downstream_client = Some((downstream_client, remote_id));
1399 }
1400
1401 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1402 self.downstream_client.take();
1403 self.forget_shared_buffers();
1404 }
1405
1406 pub fn discard_incomplete(&mut self) {
1407 self.opened_buffers
1408 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1409 }
1410
1411 pub fn find_search_candidates(
1412 &mut self,
1413 query: &SearchQuery,
1414 mut limit: usize,
1415 fs: Arc<dyn Fs>,
1416 cx: &mut ModelContext<Self>,
1417 ) -> Receiver<Model<Buffer>> {
1418 let (tx, rx) = smol::channel::unbounded();
1419 let mut open_buffers = HashSet::default();
1420 let mut unnamed_buffers = Vec::new();
1421 for handle in self.buffers() {
1422 let buffer = handle.read(cx);
1423 if let Some(entry_id) = buffer.entry_id(cx) {
1424 open_buffers.insert(entry_id);
1425 } else {
1426 limit = limit.saturating_sub(1);
1427 unnamed_buffers.push(handle)
1428 };
1429 }
1430
1431 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1432 let mut project_paths_rx = self
1433 .worktree_store
1434 .update(cx, |worktree_store, cx| {
1435 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1436 })
1437 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1438
1439 cx.spawn(|this, mut cx| async move {
1440 for buffer in unnamed_buffers {
1441 tx.send(buffer).await.ok();
1442 }
1443
1444 while let Some(project_paths) = project_paths_rx.next().await {
1445 let buffers = this.update(&mut cx, |this, cx| {
1446 project_paths
1447 .into_iter()
1448 .map(|project_path| this.open_buffer(project_path, cx))
1449 .collect::<Vec<_>>()
1450 })?;
1451 for buffer_task in buffers {
1452 if let Some(buffer) = buffer_task.await.log_err() {
1453 if tx.send(buffer).await.is_err() {
1454 return anyhow::Ok(());
1455 }
1456 }
1457 }
1458 }
1459 anyhow::Ok(())
1460 })
1461 .detach();
1462 rx
1463 }
1464
1465 fn on_buffer_event(
1466 &mut self,
1467 buffer: Model<Buffer>,
1468 event: &BufferEvent,
1469 cx: &mut ModelContext<Self>,
1470 ) {
1471 match event {
1472 BufferEvent::FileHandleChanged => {
1473 if let Some(local) = self.state.as_local() {
1474 local.update(cx, |local, cx| {
1475 local.buffer_changed_file(buffer, cx);
1476 })
1477 }
1478 }
1479 BufferEvent::Reloaded => {
1480 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1481 return;
1482 };
1483 let buffer = buffer.read(cx);
1484 downstream_client
1485 .send(proto::BufferReloaded {
1486 project_id: *project_id,
1487 buffer_id: buffer.remote_id().to_proto(),
1488 version: serialize_version(&buffer.version()),
1489 mtime: buffer.saved_mtime().map(|t| t.into()),
1490 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1491 })
1492 .log_err();
1493 }
1494 _ => {}
1495 }
1496 }
1497
1498 pub async fn handle_update_buffer(
1499 this: Model<Self>,
1500 envelope: TypedEnvelope<proto::UpdateBuffer>,
1501 mut cx: AsyncAppContext,
1502 ) -> Result<proto::Ack> {
1503 let payload = envelope.payload.clone();
1504 let buffer_id = BufferId::new(payload.buffer_id)?;
1505 let ops = payload
1506 .operations
1507 .into_iter()
1508 .map(language::proto::deserialize_operation)
1509 .collect::<Result<Vec<_>, _>>()?;
1510 this.update(&mut cx, |this, cx| {
1511 match this.opened_buffers.entry(buffer_id) {
1512 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1513 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1514 OpenBuffer::Buffer(buffer) => {
1515 if let Some(buffer) = buffer.upgrade() {
1516 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1517 }
1518 }
1519 },
1520 hash_map::Entry::Vacant(e) => {
1521 e.insert(OpenBuffer::Operations(ops));
1522 }
1523 }
1524 Ok(proto::Ack {})
1525 })?
1526 }
1527
1528 pub fn handle_synchronize_buffers(
1529 &mut self,
1530 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1531 cx: &mut ModelContext<Self>,
1532 client: Arc<Client>,
1533 ) -> Result<proto::SynchronizeBuffersResponse> {
1534 let project_id = envelope.payload.project_id;
1535 let mut response = proto::SynchronizeBuffersResponse {
1536 buffers: Default::default(),
1537 };
1538 let Some(guest_id) = envelope.original_sender_id else {
1539 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1540 };
1541
1542 self.shared_buffers.entry(guest_id).or_default().clear();
1543 for buffer in envelope.payload.buffers {
1544 let buffer_id = BufferId::new(buffer.id)?;
1545 let remote_version = language::proto::deserialize_version(&buffer.version);
1546 if let Some(buffer) = self.get(buffer_id) {
1547 self.shared_buffers
1548 .entry(guest_id)
1549 .or_default()
1550 .insert(buffer.clone());
1551
1552 let buffer = buffer.read(cx);
1553 response.buffers.push(proto::BufferVersion {
1554 id: buffer_id.into(),
1555 version: language::proto::serialize_version(&buffer.version),
1556 });
1557
1558 let operations = buffer.serialize_ops(Some(remote_version), cx);
1559 let client = client.clone();
1560 if let Some(file) = buffer.file() {
1561 client
1562 .send(proto::UpdateBufferFile {
1563 project_id,
1564 buffer_id: buffer_id.into(),
1565 file: Some(file.to_proto(cx)),
1566 })
1567 .log_err();
1568 }
1569
1570 client
1571 .send(proto::UpdateDiffBase {
1572 project_id,
1573 buffer_id: buffer_id.into(),
1574 diff_base: buffer.diff_base().map(ToString::to_string),
1575 })
1576 .log_err();
1577
1578 client
1579 .send(proto::BufferReloaded {
1580 project_id,
1581 buffer_id: buffer_id.into(),
1582 version: language::proto::serialize_version(buffer.saved_version()),
1583 mtime: buffer.saved_mtime().map(|time| time.into()),
1584 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1585 as i32,
1586 })
1587 .log_err();
1588
1589 cx.background_executor()
1590 .spawn(
1591 async move {
1592 let operations = operations.await;
1593 for chunk in split_operations(operations) {
1594 client
1595 .request(proto::UpdateBuffer {
1596 project_id,
1597 buffer_id: buffer_id.into(),
1598 operations: chunk,
1599 })
1600 .await?;
1601 }
1602 anyhow::Ok(())
1603 }
1604 .log_err(),
1605 )
1606 .detach();
1607 }
1608 }
1609 Ok(response)
1610 }
1611
1612 pub fn handle_create_buffer_for_peer(
1613 &mut self,
1614 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1615 replica_id: u16,
1616 capability: Capability,
1617 cx: &mut ModelContext<Self>,
1618 ) -> Result<()> {
1619 let Some(remote) = self.state.as_remote() else {
1620 return Err(anyhow!("buffer store is not a remote"));
1621 };
1622
1623 if let Some(buffer) = remote.update(cx, |remote, cx| {
1624 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)
1625 })? {
1626 self.add_buffer(buffer, cx)?;
1627 }
1628
1629 Ok(())
1630 }
1631
1632 pub async fn handle_update_buffer_file(
1633 this: Model<Self>,
1634 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1635 mut cx: AsyncAppContext,
1636 ) -> Result<()> {
1637 let buffer_id = envelope.payload.buffer_id;
1638 let buffer_id = BufferId::new(buffer_id)?;
1639
1640 this.update(&mut cx, |this, cx| {
1641 let payload = envelope.payload.clone();
1642 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1643 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1644 let worktree = this
1645 .worktree_store
1646 .read(cx)
1647 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1648 .ok_or_else(|| anyhow!("no such worktree"))?;
1649 let file = File::from_proto(file, worktree, cx)?;
1650 let old_file = buffer.update(cx, |buffer, cx| {
1651 let old_file = buffer.file().cloned();
1652 let new_path = file.path.clone();
1653 buffer.file_updated(Arc::new(file), cx);
1654 if old_file
1655 .as_ref()
1656 .map_or(true, |old| *old.path() != new_path)
1657 {
1658 Some(old_file)
1659 } else {
1660 None
1661 }
1662 });
1663 if let Some(old_file) = old_file {
1664 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1665 }
1666 }
1667 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1668 downstream_client
1669 .send(proto::UpdateBufferFile {
1670 project_id: *project_id,
1671 buffer_id: buffer_id.into(),
1672 file: envelope.payload.file,
1673 })
1674 .log_err();
1675 }
1676 Ok(())
1677 })?
1678 }
1679
1680 pub async fn handle_update_diff_base(
1681 this: Model<Self>,
1682 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1683 mut cx: AsyncAppContext,
1684 ) -> Result<()> {
1685 this.update(&mut cx, |this, cx| {
1686 let buffer_id = envelope.payload.buffer_id;
1687 let buffer_id = BufferId::new(buffer_id)?;
1688 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1689 buffer.update(cx, |buffer, cx| {
1690 buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
1691 });
1692 }
1693 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1694 downstream_client
1695 .send(proto::UpdateDiffBase {
1696 project_id: *project_id,
1697 buffer_id: buffer_id.into(),
1698 diff_base: envelope.payload.diff_base,
1699 })
1700 .log_err();
1701 }
1702 Ok(())
1703 })?
1704 }
1705
1706 pub async fn handle_save_buffer(
1707 this: Model<Self>,
1708 envelope: TypedEnvelope<proto::SaveBuffer>,
1709 mut cx: AsyncAppContext,
1710 ) -> Result<proto::BufferSaved> {
1711 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1712 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1713 anyhow::Ok((
1714 this.get_existing(buffer_id)?,
1715 this.downstream_client
1716 .as_ref()
1717 .map(|(_, project_id)| *project_id)
1718 .context("project is not shared")?,
1719 ))
1720 })??;
1721 buffer
1722 .update(&mut cx, |buffer, _| {
1723 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1724 })?
1725 .await?;
1726 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1727
1728 if let Some(new_path) = envelope.payload.new_path {
1729 let new_path = ProjectPath::from_proto(new_path);
1730 this.update(&mut cx, |this, cx| {
1731 this.save_buffer_as(buffer.clone(), new_path, cx)
1732 })?
1733 .await?;
1734 } else {
1735 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1736 .await?;
1737 }
1738
1739 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1740 project_id,
1741 buffer_id: buffer_id.into(),
1742 version: serialize_version(buffer.saved_version()),
1743 mtime: buffer.saved_mtime().map(|time| time.into()),
1744 })
1745 }
1746
1747 pub async fn handle_close_buffer(
1748 this: Model<Self>,
1749 envelope: TypedEnvelope<proto::CloseBuffer>,
1750 mut cx: AsyncAppContext,
1751 ) -> Result<()> {
1752 let peer_id = envelope.sender_id;
1753 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1754 this.update(&mut cx, |this, _| {
1755 if let Some(buffer) = this.get(buffer_id) {
1756 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1757 if shared.remove(&buffer) {
1758 if shared.is_empty() {
1759 this.shared_buffers.remove(&peer_id);
1760 }
1761 return;
1762 }
1763 }
1764 };
1765 debug_panic!(
1766 "peer_id {} closed buffer_id {} which was either not open or already closed",
1767 peer_id,
1768 buffer_id
1769 )
1770 })
1771 }
1772
1773 pub async fn handle_buffer_saved(
1774 this: Model<Self>,
1775 envelope: TypedEnvelope<proto::BufferSaved>,
1776 mut cx: AsyncAppContext,
1777 ) -> Result<()> {
1778 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1779 let version = deserialize_version(&envelope.payload.version);
1780 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1781 this.update(&mut cx, move |this, cx| {
1782 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1783 buffer.update(cx, |buffer, cx| {
1784 buffer.did_save(version, mtime, cx);
1785 });
1786 }
1787
1788 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1789 downstream_client
1790 .send(proto::BufferSaved {
1791 project_id: *project_id,
1792 buffer_id: buffer_id.into(),
1793 mtime: envelope.payload.mtime,
1794 version: envelope.payload.version,
1795 })
1796 .log_err();
1797 }
1798 })
1799 }
1800
1801 pub async fn handle_buffer_reloaded(
1802 this: Model<Self>,
1803 envelope: TypedEnvelope<proto::BufferReloaded>,
1804 mut cx: AsyncAppContext,
1805 ) -> Result<()> {
1806 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1807 let version = deserialize_version(&envelope.payload.version);
1808 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1809 let line_ending = deserialize_line_ending(
1810 proto::LineEnding::from_i32(envelope.payload.line_ending)
1811 .ok_or_else(|| anyhow!("missing line ending"))?,
1812 );
1813 this.update(&mut cx, |this, cx| {
1814 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1815 buffer.update(cx, |buffer, cx| {
1816 buffer.did_reload(version, line_ending, mtime, cx);
1817 });
1818 }
1819
1820 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1821 downstream_client
1822 .send(proto::BufferReloaded {
1823 project_id: *project_id,
1824 buffer_id: buffer_id.into(),
1825 mtime: envelope.payload.mtime,
1826 version: envelope.payload.version,
1827 line_ending: envelope.payload.line_ending,
1828 })
1829 .log_err();
1830 }
1831 })
1832 }
1833
1834 pub async fn handle_blame_buffer(
1835 this: Model<Self>,
1836 envelope: TypedEnvelope<proto::BlameBuffer>,
1837 mut cx: AsyncAppContext,
1838 ) -> Result<proto::BlameBufferResponse> {
1839 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1840 let version = deserialize_version(&envelope.payload.version);
1841 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1842 buffer
1843 .update(&mut cx, |buffer, _| {
1844 buffer.wait_for_version(version.clone())
1845 })?
1846 .await?;
1847 let blame = this
1848 .update(&mut cx, |this, cx| {
1849 this.blame_buffer(&buffer, Some(version), cx)
1850 })?
1851 .await?;
1852 Ok(serialize_blame_buffer_response(blame))
1853 }
1854
1855 pub async fn handle_get_permalink_to_line(
1856 this: Model<Self>,
1857 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1858 mut cx: AsyncAppContext,
1859 ) -> Result<proto::GetPermalinkToLineResponse> {
1860 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1861 // let version = deserialize_version(&envelope.payload.version);
1862 let selection = {
1863 let proto_selection = envelope
1864 .payload
1865 .selection
1866 .context("no selection to get permalink for defined")?;
1867 proto_selection.start as u32..proto_selection.end as u32
1868 };
1869 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1870 let permalink = this
1871 .update(&mut cx, |this, cx| {
1872 this.get_permalink_to_line(&buffer, selection, cx)
1873 })?
1874 .await?;
1875 Ok(proto::GetPermalinkToLineResponse {
1876 permalink: permalink.to_string(),
1877 })
1878 }
1879
1880 pub async fn wait_for_loading_buffer(
1881 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1882 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1883 loop {
1884 if let Some(result) = receiver.borrow().as_ref() {
1885 match result {
1886 Ok(buffer) => return Ok(buffer.to_owned()),
1887 Err(e) => return Err(e.to_owned()),
1888 }
1889 }
1890 receiver.next().await;
1891 }
1892 }
1893
1894 pub fn reload_buffers(
1895 &self,
1896 buffers: HashSet<Model<Buffer>>,
1897 push_to_history: bool,
1898 cx: &mut ModelContext<Self>,
1899 ) -> Task<Result<ProjectTransaction>> {
1900 if buffers.is_empty() {
1901 return Task::ready(Ok(ProjectTransaction::default()));
1902 }
1903
1904 self.state.reload_buffers(buffers, push_to_history, cx)
1905 }
1906
1907 async fn handle_reload_buffers(
1908 this: Model<Self>,
1909 envelope: TypedEnvelope<proto::ReloadBuffers>,
1910 mut cx: AsyncAppContext,
1911 ) -> Result<proto::ReloadBuffersResponse> {
1912 let sender_id = envelope.original_sender_id().unwrap_or_default();
1913 let reload = this.update(&mut cx, |this, cx| {
1914 let mut buffers = HashSet::default();
1915 for buffer_id in &envelope.payload.buffer_ids {
1916 let buffer_id = BufferId::new(*buffer_id)?;
1917 buffers.insert(this.get_existing(buffer_id)?);
1918 }
1919 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1920 })??;
1921
1922 let project_transaction = reload.await?;
1923 let project_transaction = this.update(&mut cx, |this, cx| {
1924 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1925 })?;
1926 Ok(proto::ReloadBuffersResponse {
1927 transaction: Some(project_transaction),
1928 })
1929 }
1930
1931 pub fn create_buffer_for_peer(
1932 &mut self,
1933 buffer: &Model<Buffer>,
1934 peer_id: proto::PeerId,
1935 cx: &mut ModelContext<Self>,
1936 ) -> Task<Result<()>> {
1937 let buffer_id = buffer.read(cx).remote_id();
1938 if !self
1939 .shared_buffers
1940 .entry(peer_id)
1941 .or_default()
1942 .insert(buffer.clone())
1943 {
1944 return Task::ready(Ok(()));
1945 }
1946
1947 let Some((client, project_id)) = self.downstream_client.clone() else {
1948 return Task::ready(Ok(()));
1949 };
1950
1951 cx.spawn(|this, mut cx| async move {
1952 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1953 return anyhow::Ok(());
1954 };
1955
1956 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1957 let operations = operations.await;
1958 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1959
1960 let initial_state = proto::CreateBufferForPeer {
1961 project_id,
1962 peer_id: Some(peer_id),
1963 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1964 };
1965
1966 if client.send(initial_state).log_err().is_some() {
1967 let client = client.clone();
1968 cx.background_executor()
1969 .spawn(async move {
1970 let mut chunks = split_operations(operations).peekable();
1971 while let Some(chunk) = chunks.next() {
1972 let is_last = chunks.peek().is_none();
1973 client.send(proto::CreateBufferForPeer {
1974 project_id,
1975 peer_id: Some(peer_id),
1976 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1977 proto::BufferChunk {
1978 buffer_id: buffer_id.into(),
1979 operations: chunk,
1980 is_last,
1981 },
1982 )),
1983 })?;
1984 }
1985 anyhow::Ok(())
1986 })
1987 .await
1988 .log_err();
1989 }
1990 Ok(())
1991 })
1992 }
1993
1994 pub fn forget_shared_buffers(&mut self) {
1995 self.shared_buffers.clear();
1996 }
1997
1998 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1999 self.shared_buffers.remove(peer_id);
2000 }
2001
2002 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2003 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2004 self.shared_buffers.insert(new_peer_id, buffers);
2005 }
2006 }
2007
2008 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
2009 &self.shared_buffers
2010 }
2011
2012 pub fn create_local_buffer(
2013 &mut self,
2014 text: &str,
2015 language: Option<Arc<Language>>,
2016 cx: &mut ModelContext<Self>,
2017 ) -> Model<Buffer> {
2018 let buffer = cx.new_model(|cx| {
2019 Buffer::local(text, cx)
2020 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2021 });
2022
2023 self.add_buffer(buffer.clone(), cx).log_err();
2024 let buffer_id = buffer.read(cx).remote_id();
2025
2026 let local = self
2027 .state
2028 .as_local()
2029 .expect("local-only method called in a non-local context");
2030 local.update(cx, |this, cx| {
2031 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2032 this.local_buffer_ids_by_path.insert(
2033 ProjectPath {
2034 worktree_id: file.worktree_id(cx),
2035 path: file.path.clone(),
2036 },
2037 buffer_id,
2038 );
2039
2040 if let Some(entry_id) = file.entry_id {
2041 this.local_buffer_ids_by_entry_id
2042 .insert(entry_id, buffer_id);
2043 }
2044 }
2045 });
2046 buffer
2047 }
2048
2049 pub fn deserialize_project_transaction(
2050 &mut self,
2051 message: proto::ProjectTransaction,
2052 push_to_history: bool,
2053 cx: &mut ModelContext<Self>,
2054 ) -> Task<Result<ProjectTransaction>> {
2055 if let Some(remote) = self.state.as_remote() {
2056 remote.update(cx, |remote, cx| {
2057 remote.deserialize_project_transaction(message, push_to_history, cx)
2058 })
2059 } else {
2060 debug_panic!("not a remote buffer store");
2061 Task::ready(Err(anyhow!("not a remote buffer store")))
2062 }
2063 }
2064
2065 pub fn wait_for_remote_buffer(
2066 &self,
2067 id: BufferId,
2068 cx: &mut AppContext,
2069 ) -> Task<Result<Model<Buffer>>> {
2070 if let Some(remote) = self.state.as_remote() {
2071 remote.update(cx, |remote, cx| remote.wait_for_remote_buffer(id, cx))
2072 } else {
2073 debug_panic!("not a remote buffer store");
2074 Task::ready(Err(anyhow!("not a remote buffer store")))
2075 }
2076 }
2077
2078 pub fn serialize_project_transaction_for_peer(
2079 &mut self,
2080 project_transaction: ProjectTransaction,
2081 peer_id: proto::PeerId,
2082 cx: &mut ModelContext<Self>,
2083 ) -> proto::ProjectTransaction {
2084 let mut serialized_transaction = proto::ProjectTransaction {
2085 buffer_ids: Default::default(),
2086 transactions: Default::default(),
2087 };
2088 for (buffer, transaction) in project_transaction.0 {
2089 self.create_buffer_for_peer(&buffer, peer_id, cx)
2090 .detach_and_log_err(cx);
2091 serialized_transaction
2092 .buffer_ids
2093 .push(buffer.read(cx).remote_id().into());
2094 serialized_transaction
2095 .transactions
2096 .push(language::proto::serialize_transaction(&transaction));
2097 }
2098 serialized_transaction
2099 }
2100}
2101
2102impl OpenBuffer {
2103 fn upgrade(&self) -> Option<Model<Buffer>> {
2104 match self {
2105 OpenBuffer::Buffer(handle) => handle.upgrade(),
2106 OpenBuffer::Operations(_) => None,
2107 }
2108 }
2109}
2110
2111fn is_not_found_error(error: &anyhow::Error) -> bool {
2112 error
2113 .root_cause()
2114 .downcast_ref::<io::Error>()
2115 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2116}
2117
2118fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2119 let Some(blame) = blame else {
2120 return proto::BlameBufferResponse {
2121 blame_response: None,
2122 };
2123 };
2124
2125 let entries = blame
2126 .entries
2127 .into_iter()
2128 .map(|entry| proto::BlameEntry {
2129 sha: entry.sha.as_bytes().into(),
2130 start_line: entry.range.start,
2131 end_line: entry.range.end,
2132 original_line_number: entry.original_line_number,
2133 author: entry.author.clone(),
2134 author_mail: entry.author_mail.clone(),
2135 author_time: entry.author_time,
2136 author_tz: entry.author_tz.clone(),
2137 committer: entry.committer.clone(),
2138 committer_mail: entry.committer_mail.clone(),
2139 committer_time: entry.committer_time,
2140 committer_tz: entry.committer_tz.clone(),
2141 summary: entry.summary.clone(),
2142 previous: entry.previous.clone(),
2143 filename: entry.filename.clone(),
2144 })
2145 .collect::<Vec<_>>();
2146
2147 let messages = blame
2148 .messages
2149 .into_iter()
2150 .map(|(oid, message)| proto::CommitMessage {
2151 oid: oid.as_bytes().into(),
2152 message,
2153 })
2154 .collect::<Vec<_>>();
2155
2156 let permalinks = blame
2157 .permalinks
2158 .into_iter()
2159 .map(|(oid, url)| proto::CommitPermalink {
2160 oid: oid.as_bytes().into(),
2161 permalink: url.to_string(),
2162 })
2163 .collect::<Vec<_>>();
2164
2165 proto::BlameBufferResponse {
2166 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2167 entries,
2168 messages,
2169 permalinks,
2170 remote_url: blame.remote_url,
2171 }),
2172 }
2173}
2174
2175fn deserialize_blame_buffer_response(
2176 response: proto::BlameBufferResponse,
2177) -> Option<git::blame::Blame> {
2178 let response = response.blame_response?;
2179 let entries = response
2180 .entries
2181 .into_iter()
2182 .filter_map(|entry| {
2183 Some(git::blame::BlameEntry {
2184 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2185 range: entry.start_line..entry.end_line,
2186 original_line_number: entry.original_line_number,
2187 committer: entry.committer,
2188 committer_time: entry.committer_time,
2189 committer_tz: entry.committer_tz,
2190 committer_mail: entry.committer_mail,
2191 author: entry.author,
2192 author_mail: entry.author_mail,
2193 author_time: entry.author_time,
2194 author_tz: entry.author_tz,
2195 summary: entry.summary,
2196 previous: entry.previous,
2197 filename: entry.filename,
2198 })
2199 })
2200 .collect::<Vec<_>>();
2201
2202 let messages = response
2203 .messages
2204 .into_iter()
2205 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2206 .collect::<HashMap<_, _>>();
2207
2208 let permalinks = response
2209 .permalinks
2210 .into_iter()
2211 .filter_map(|permalink| {
2212 Some((
2213 git::Oid::from_bytes(&permalink.oid).ok()?,
2214 Url::from_str(&permalink.permalink).ok()?,
2215 ))
2216 })
2217 .collect::<HashMap<_, _>>();
2218
2219 Some(Blame {
2220 entries,
2221 permalinks,
2222 messages,
2223 remote_url: response.remote_url,
2224 })
2225}