1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use diff::{BufferDiff, BufferDiffEvent, BufferDiffSnapshot};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{
27 proto::{self, ToProto},
28 AnyProtoClient, ErrorExt as _, TypedEnvelope,
29};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::BufferId;
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum DiffKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_diffs:
58 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
59 worktree_store: Entity<WorktreeStore>,
60 opened_buffers: HashMap<BufferId, OpenBuffer>,
61 downstream_client: Option<(AnyProtoClient, u64)>,
62 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
63}
64
65#[derive(Hash, Eq, PartialEq, Clone)]
66struct SharedBuffer {
67 buffer: Entity<Buffer>,
68 diff: Option<Entity<BufferDiff>>,
69 lsp_handle: Option<OpenLspBufferHandle>,
70}
71
72#[derive(Default)]
73struct BufferDiffState {
74 unstaged_diff: Option<WeakEntity<BufferDiff>>,
75 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
76 recalculate_diff_task: Option<Task<Result<()>>>,
77 language: Option<Arc<Language>>,
78 language_registry: Option<Arc<LanguageRegistry>>,
79 diff_updated_futures: Vec<oneshot::Sender<()>>,
80
81 head_text: Option<Arc<String>>,
82 index_text: Option<Arc<String>>,
83 head_changed: bool,
84 index_changed: bool,
85 language_changed: bool,
86}
87
88#[derive(Clone, Debug)]
89enum DiffBasesChange {
90 SetIndex(Option<String>),
91 SetHead(Option<String>),
92 SetEach {
93 index: Option<String>,
94 head: Option<String>,
95 },
96 SetBoth(Option<String>),
97}
98
99impl BufferDiffState {
100 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
101 self.language = buffer.read(cx).language().cloned();
102 self.language_changed = true;
103 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
104 }
105
106 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
107 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
108 }
109
110 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
111 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
112 }
113
114 fn handle_base_texts_updated(
115 &mut self,
116 buffer: text::BufferSnapshot,
117 message: proto::UpdateDiffBases,
118 cx: &mut Context<Self>,
119 ) {
120 use proto::update_diff_bases::Mode;
121
122 let Some(mode) = Mode::from_i32(message.mode) else {
123 return;
124 };
125
126 let diff_bases_change = match mode {
127 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
128 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
129 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
130 Mode::IndexAndHead => DiffBasesChange::SetEach {
131 index: message.staged_text,
132 head: message.committed_text,
133 },
134 };
135
136 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
137 }
138
139 fn diff_bases_changed(
140 &mut self,
141 buffer: text::BufferSnapshot,
142 diff_bases_change: DiffBasesChange,
143 cx: &mut Context<Self>,
144 ) -> oneshot::Receiver<()> {
145 match diff_bases_change {
146 DiffBasesChange::SetIndex(index) => {
147 self.index_text = index.map(|mut index| {
148 text::LineEnding::normalize(&mut index);
149 Arc::new(index)
150 });
151 self.index_changed = true;
152 }
153 DiffBasesChange::SetHead(head) => {
154 self.head_text = head.map(|mut head| {
155 text::LineEnding::normalize(&mut head);
156 Arc::new(head)
157 });
158 self.head_changed = true;
159 }
160 DiffBasesChange::SetBoth(text) => {
161 let text = text.map(|mut text| {
162 text::LineEnding::normalize(&mut text);
163 Arc::new(text)
164 });
165 self.head_text = text.clone();
166 self.index_text = text;
167 self.head_changed = true;
168 self.index_changed = true;
169 }
170 DiffBasesChange::SetEach { index, head } => {
171 self.index_text = index.map(|mut index| {
172 text::LineEnding::normalize(&mut index);
173 Arc::new(index)
174 });
175 self.index_changed = true;
176 self.head_text = head.map(|mut head| {
177 text::LineEnding::normalize(&mut head);
178 Arc::new(head)
179 });
180 self.head_changed = true;
181 }
182 }
183
184 self.recalculate_diffs(buffer, cx)
185 }
186
187 fn recalculate_diffs(
188 &mut self,
189 buffer: text::BufferSnapshot,
190 cx: &mut Context<Self>,
191 ) -> oneshot::Receiver<()> {
192 let (tx, rx) = oneshot::channel();
193 self.diff_updated_futures.push(tx);
194
195 let language = self.language.clone();
196 let language_registry = self.language_registry.clone();
197 let unstaged_diff = self.unstaged_diff();
198 let uncommitted_diff = self.uncommitted_diff();
199 let head = self.head_text.clone();
200 let index = self.index_text.clone();
201 let index_changed = self.index_changed;
202 let head_changed = self.head_changed;
203 let language_changed = self.language_changed;
204 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
205 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
206 (None, None) => true,
207 _ => false,
208 };
209 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
210 if let Some(unstaged_diff) = &unstaged_diff {
211 let snapshot = if index_changed || language_changed {
212 cx.update(|cx| {
213 BufferDiffSnapshot::build(
214 buffer.clone(),
215 index,
216 language.clone(),
217 language_registry.clone(),
218 cx,
219 )
220 })?
221 .await
222 } else {
223 unstaged_diff
224 .read_with(&cx, |changes, cx| {
225 BufferDiffSnapshot::build_with_base_buffer(
226 buffer.clone(),
227 index,
228 changes.snapshot.base_text.clone(),
229 cx,
230 )
231 })?
232 .await
233 };
234
235 unstaged_diff.update(&mut cx, |unstaged_diff, cx| {
236 unstaged_diff.set_state(snapshot, &buffer, cx);
237 if language_changed {
238 cx.emit(BufferDiffEvent::LanguageChanged);
239 }
240 })?;
241 }
242
243 if let Some(uncommitted_diff) = &uncommitted_diff {
244 let snapshot =
245 if let (Some(unstaged_diff), true) = (&unstaged_diff, index_matches_head) {
246 unstaged_diff.read_with(&cx, |diff, _| diff.snapshot.clone())?
247 } else if head_changed || language_changed {
248 cx.update(|cx| {
249 BufferDiffSnapshot::build(
250 buffer.clone(),
251 head,
252 language.clone(),
253 language_registry.clone(),
254 cx,
255 )
256 })?
257 .await
258 } else {
259 uncommitted_diff
260 .read_with(&cx, |changes, cx| {
261 BufferDiffSnapshot::build_with_base_buffer(
262 buffer.clone(),
263 head,
264 changes.snapshot.base_text.clone(),
265 cx,
266 )
267 })?
268 .await
269 };
270
271 uncommitted_diff.update(&mut cx, |diff, cx| {
272 diff.set_state(snapshot, &buffer, cx);
273 if language_changed {
274 cx.emit(BufferDiffEvent::LanguageChanged);
275 }
276 })?;
277 }
278
279 if let Some(this) = this.upgrade() {
280 this.update(&mut cx, |this, _| {
281 this.index_changed = false;
282 this.head_changed = false;
283 for tx in this.diff_updated_futures.drain(..) {
284 tx.send(()).ok();
285 }
286 })?;
287 }
288
289 Ok(())
290 }));
291
292 rx
293 }
294}
295
296enum BufferStoreState {
297 Local(LocalBufferStore),
298 Remote(RemoteBufferStore),
299}
300
301struct RemoteBufferStore {
302 shared_with_me: HashSet<Entity<Buffer>>,
303 upstream_client: AnyProtoClient,
304 project_id: u64,
305 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
306 remote_buffer_listeners:
307 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
308 worktree_store: Entity<WorktreeStore>,
309}
310
311struct LocalBufferStore {
312 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
313 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
314 worktree_store: Entity<WorktreeStore>,
315 _subscription: Subscription,
316}
317
318enum OpenBuffer {
319 Complete {
320 buffer: WeakEntity<Buffer>,
321 diff_state: Entity<BufferDiffState>,
322 },
323 Operations(Vec<Operation>),
324}
325
326pub enum BufferStoreEvent {
327 BufferAdded(Entity<Buffer>),
328 BufferDropped(BufferId),
329 BufferChangedFilePath {
330 buffer: Entity<Buffer>,
331 old_file: Option<Arc<dyn language::File>>,
332 },
333}
334
335#[derive(Default, Debug)]
336pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
337
338impl EventEmitter<BufferStoreEvent> for BufferStore {}
339
340impl RemoteBufferStore {
341 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
342 let project_id = self.project_id;
343 let client = self.upstream_client.clone();
344 cx.background_executor().spawn(async move {
345 let response = client
346 .request(proto::OpenUnstagedDiff {
347 project_id,
348 buffer_id: buffer_id.to_proto(),
349 })
350 .await?;
351 Ok(response.staged_text)
352 })
353 }
354
355 fn open_uncommitted_diff(
356 &self,
357 buffer_id: BufferId,
358 cx: &App,
359 ) -> Task<Result<DiffBasesChange>> {
360 use proto::open_uncommitted_diff_response::Mode;
361
362 let project_id = self.project_id;
363 let client = self.upstream_client.clone();
364 cx.background_executor().spawn(async move {
365 let response = client
366 .request(proto::OpenUncommittedDiff {
367 project_id,
368 buffer_id: buffer_id.to_proto(),
369 })
370 .await?;
371 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
372 let bases = match mode {
373 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
374 Mode::IndexAndHead => DiffBasesChange::SetEach {
375 head: response.committed_text,
376 index: response.staged_text,
377 },
378 };
379 Ok(bases)
380 })
381 }
382
383 pub fn wait_for_remote_buffer(
384 &mut self,
385 id: BufferId,
386 cx: &mut Context<BufferStore>,
387 ) -> Task<Result<Entity<Buffer>>> {
388 let (tx, rx) = oneshot::channel();
389 self.remote_buffer_listeners.entry(id).or_default().push(tx);
390
391 cx.spawn(|this, cx| async move {
392 if let Some(buffer) = this
393 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
394 .ok()
395 .flatten()
396 {
397 return Ok(buffer);
398 }
399
400 cx.background_executor()
401 .spawn(async move { rx.await? })
402 .await
403 })
404 }
405
406 fn save_remote_buffer(
407 &self,
408 buffer_handle: Entity<Buffer>,
409 new_path: Option<proto::ProjectPath>,
410 cx: &Context<BufferStore>,
411 ) -> Task<Result<()>> {
412 let buffer = buffer_handle.read(cx);
413 let buffer_id = buffer.remote_id().into();
414 let version = buffer.version();
415 let rpc = self.upstream_client.clone();
416 let project_id = self.project_id;
417 cx.spawn(move |_, mut cx| async move {
418 let response = rpc
419 .request(proto::SaveBuffer {
420 project_id,
421 buffer_id,
422 new_path,
423 version: serialize_version(&version),
424 })
425 .await?;
426 let version = deserialize_version(&response.version);
427 let mtime = response.mtime.map(|mtime| mtime.into());
428
429 buffer_handle.update(&mut cx, |buffer, cx| {
430 buffer.did_save(version.clone(), mtime, cx);
431 })?;
432
433 Ok(())
434 })
435 }
436
437 pub fn handle_create_buffer_for_peer(
438 &mut self,
439 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
440 replica_id: u16,
441 capability: Capability,
442 cx: &mut Context<BufferStore>,
443 ) -> Result<Option<Entity<Buffer>>> {
444 match envelope
445 .payload
446 .variant
447 .ok_or_else(|| anyhow!("missing variant"))?
448 {
449 proto::create_buffer_for_peer::Variant::State(mut state) => {
450 let buffer_id = BufferId::new(state.id)?;
451
452 let buffer_result = maybe!({
453 let mut buffer_file = None;
454 if let Some(file) = state.file.take() {
455 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
456 let worktree = self
457 .worktree_store
458 .read(cx)
459 .worktree_for_id(worktree_id, cx)
460 .ok_or_else(|| {
461 anyhow!("no worktree found for id {}", file.worktree_id)
462 })?;
463 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
464 as Arc<dyn language::File>);
465 }
466 Buffer::from_proto(replica_id, capability, state, buffer_file)
467 });
468
469 match buffer_result {
470 Ok(buffer) => {
471 let buffer = cx.new(|_| buffer);
472 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
473 }
474 Err(error) => {
475 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
476 for listener in listeners {
477 listener.send(Err(anyhow!(error.cloned()))).ok();
478 }
479 }
480 }
481 }
482 }
483 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
484 let buffer_id = BufferId::new(chunk.buffer_id)?;
485 let buffer = self
486 .loading_remote_buffers_by_id
487 .get(&buffer_id)
488 .cloned()
489 .ok_or_else(|| {
490 anyhow!(
491 "received chunk for buffer {} without initial state",
492 chunk.buffer_id
493 )
494 })?;
495
496 let result = maybe!({
497 let operations = chunk
498 .operations
499 .into_iter()
500 .map(language::proto::deserialize_operation)
501 .collect::<Result<Vec<_>>>()?;
502 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
503 anyhow::Ok(())
504 });
505
506 if let Err(error) = result {
507 self.loading_remote_buffers_by_id.remove(&buffer_id);
508 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
509 for listener in listeners {
510 listener.send(Err(error.cloned())).ok();
511 }
512 }
513 } else if chunk.is_last {
514 self.loading_remote_buffers_by_id.remove(&buffer_id);
515 if self.upstream_client.is_via_collab() {
516 // retain buffers sent by peers to avoid races.
517 self.shared_with_me.insert(buffer.clone());
518 }
519
520 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
521 for sender in senders {
522 sender.send(Ok(buffer.clone())).ok();
523 }
524 }
525 return Ok(Some(buffer));
526 }
527 }
528 }
529 return Ok(None);
530 }
531
532 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
533 self.loading_remote_buffers_by_id
534 .keys()
535 .copied()
536 .collect::<Vec<_>>()
537 }
538
539 pub fn deserialize_project_transaction(
540 &self,
541 message: proto::ProjectTransaction,
542 push_to_history: bool,
543 cx: &mut Context<BufferStore>,
544 ) -> Task<Result<ProjectTransaction>> {
545 cx.spawn(|this, mut cx| async move {
546 let mut project_transaction = ProjectTransaction::default();
547 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
548 {
549 let buffer_id = BufferId::new(buffer_id)?;
550 let buffer = this
551 .update(&mut cx, |this, cx| {
552 this.wait_for_remote_buffer(buffer_id, cx)
553 })?
554 .await?;
555 let transaction = language::proto::deserialize_transaction(transaction)?;
556 project_transaction.0.insert(buffer, transaction);
557 }
558
559 for (buffer, transaction) in &project_transaction.0 {
560 buffer
561 .update(&mut cx, |buffer, _| {
562 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
563 })?
564 .await?;
565
566 if push_to_history {
567 buffer.update(&mut cx, |buffer, _| {
568 buffer.push_transaction(transaction.clone(), Instant::now());
569 })?;
570 }
571 }
572
573 Ok(project_transaction)
574 })
575 }
576
577 fn open_buffer(
578 &self,
579 path: Arc<Path>,
580 worktree: Entity<Worktree>,
581 cx: &mut Context<BufferStore>,
582 ) -> Task<Result<Entity<Buffer>>> {
583 let worktree_id = worktree.read(cx).id().to_proto();
584 let project_id = self.project_id;
585 let client = self.upstream_client.clone();
586 cx.spawn(move |this, mut cx| async move {
587 let response = client
588 .request(proto::OpenBufferByPath {
589 project_id,
590 worktree_id,
591 path: path.to_proto(),
592 })
593 .await?;
594 let buffer_id = BufferId::new(response.buffer_id)?;
595
596 let buffer = this
597 .update(&mut cx, {
598 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
599 })?
600 .await?;
601
602 Ok(buffer)
603 })
604 }
605
606 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
607 let create = self.upstream_client.request(proto::OpenNewBuffer {
608 project_id: self.project_id,
609 });
610 cx.spawn(|this, mut cx| async move {
611 let response = create.await?;
612 let buffer_id = BufferId::new(response.buffer_id)?;
613
614 this.update(&mut cx, |this, cx| {
615 this.wait_for_remote_buffer(buffer_id, cx)
616 })?
617 .await
618 })
619 }
620
621 fn reload_buffers(
622 &self,
623 buffers: HashSet<Entity<Buffer>>,
624 push_to_history: bool,
625 cx: &mut Context<BufferStore>,
626 ) -> Task<Result<ProjectTransaction>> {
627 let request = self.upstream_client.request(proto::ReloadBuffers {
628 project_id: self.project_id,
629 buffer_ids: buffers
630 .iter()
631 .map(|buffer| buffer.read(cx).remote_id().to_proto())
632 .collect(),
633 });
634
635 cx.spawn(|this, mut cx| async move {
636 let response = request
637 .await?
638 .transaction
639 .ok_or_else(|| anyhow!("missing transaction"))?;
640 this.update(&mut cx, |this, cx| {
641 this.deserialize_project_transaction(response, push_to_history, cx)
642 })?
643 .await
644 })
645 }
646}
647
648impl LocalBufferStore {
649 fn worktree_for_buffer(
650 &self,
651 buffer: &Entity<Buffer>,
652 cx: &App,
653 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
654 let file = buffer.read(cx).file()?;
655 let worktree_id = file.worktree_id(cx);
656 let path = file.path().clone();
657 let worktree = self
658 .worktree_store
659 .read(cx)
660 .worktree_for_id(worktree_id, cx)?;
661 Some((worktree, path))
662 }
663
664 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
665 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
666 worktree.read(cx).load_staged_file(path.as_ref(), cx)
667 } else {
668 return Task::ready(Err(anyhow!("no such worktree")));
669 }
670 }
671
672 fn load_committed_text(
673 &self,
674 buffer: &Entity<Buffer>,
675 cx: &App,
676 ) -> Task<Result<Option<String>>> {
677 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
678 worktree.read(cx).load_committed_file(path.as_ref(), cx)
679 } else {
680 Task::ready(Err(anyhow!("no such worktree")))
681 }
682 }
683
684 fn save_local_buffer(
685 &self,
686 buffer_handle: Entity<Buffer>,
687 worktree: Entity<Worktree>,
688 path: Arc<Path>,
689 mut has_changed_file: bool,
690 cx: &mut Context<BufferStore>,
691 ) -> Task<Result<()>> {
692 let buffer = buffer_handle.read(cx);
693
694 let text = buffer.as_rope().clone();
695 let line_ending = buffer.line_ending();
696 let version = buffer.version();
697 let buffer_id = buffer.remote_id();
698 if buffer
699 .file()
700 .is_some_and(|file| file.disk_state() == DiskState::New)
701 {
702 has_changed_file = true;
703 }
704
705 let save = worktree.update(cx, |worktree, cx| {
706 worktree.write_file(path.as_ref(), text, line_ending, cx)
707 });
708
709 cx.spawn(move |this, mut cx| async move {
710 let new_file = save.await?;
711 let mtime = new_file.disk_state().mtime();
712 this.update(&mut cx, |this, cx| {
713 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
714 if has_changed_file {
715 downstream_client
716 .send(proto::UpdateBufferFile {
717 project_id,
718 buffer_id: buffer_id.to_proto(),
719 file: Some(language::File::to_proto(&*new_file, cx)),
720 })
721 .log_err();
722 }
723 downstream_client
724 .send(proto::BufferSaved {
725 project_id,
726 buffer_id: buffer_id.to_proto(),
727 version: serialize_version(&version),
728 mtime: mtime.map(|time| time.into()),
729 })
730 .log_err();
731 }
732 })?;
733 buffer_handle.update(&mut cx, |buffer, cx| {
734 if has_changed_file {
735 buffer.file_updated(new_file, cx);
736 }
737 buffer.did_save(version.clone(), mtime, cx);
738 })
739 })
740 }
741
742 fn subscribe_to_worktree(
743 &mut self,
744 worktree: &Entity<Worktree>,
745 cx: &mut Context<BufferStore>,
746 ) {
747 cx.subscribe(worktree, |this, worktree, event, cx| {
748 if worktree.read(cx).is_local() {
749 match event {
750 worktree::Event::UpdatedEntries(changes) => {
751 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
752 }
753 worktree::Event::UpdatedGitRepositories(updated_repos) => {
754 Self::local_worktree_git_repos_changed(
755 this,
756 worktree.clone(),
757 updated_repos,
758 cx,
759 )
760 }
761 _ => {}
762 }
763 }
764 })
765 .detach();
766 }
767
768 fn local_worktree_entries_changed(
769 this: &mut BufferStore,
770 worktree_handle: &Entity<Worktree>,
771 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
772 cx: &mut Context<BufferStore>,
773 ) {
774 let snapshot = worktree_handle.read(cx).snapshot();
775 for (path, entry_id, _) in changes {
776 Self::local_worktree_entry_changed(
777 this,
778 *entry_id,
779 path,
780 worktree_handle,
781 &snapshot,
782 cx,
783 );
784 }
785 }
786
787 fn local_worktree_git_repos_changed(
788 this: &mut BufferStore,
789 worktree_handle: Entity<Worktree>,
790 changed_repos: &UpdatedGitRepositoriesSet,
791 cx: &mut Context<BufferStore>,
792 ) {
793 debug_assert!(worktree_handle.read(cx).is_local());
794
795 let mut diff_state_updates = Vec::new();
796 for buffer in this.opened_buffers.values() {
797 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
798 continue;
799 };
800 let Some(buffer) = buffer.upgrade() else {
801 continue;
802 };
803 let buffer = buffer.read(cx);
804 let Some(file) = File::from_dyn(buffer.file()) else {
805 continue;
806 };
807 if file.worktree != worktree_handle {
808 continue;
809 }
810 let diff_state = diff_state.read(cx);
811 if changed_repos
812 .iter()
813 .any(|(work_dir, _)| file.path.starts_with(work_dir))
814 {
815 let snapshot = buffer.text_snapshot();
816 diff_state_updates.push((
817 snapshot.clone(),
818 file.path.clone(),
819 diff_state
820 .unstaged_diff
821 .as_ref()
822 .and_then(|set| set.upgrade())
823 .is_some(),
824 diff_state
825 .uncommitted_diff
826 .as_ref()
827 .and_then(|set| set.upgrade())
828 .is_some(),
829 ))
830 }
831 }
832
833 if diff_state_updates.is_empty() {
834 return;
835 }
836
837 cx.spawn(move |this, mut cx| async move {
838 let snapshot =
839 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
840 let diff_bases_changes_by_buffer = cx
841 .background_executor()
842 .spawn(async move {
843 diff_state_updates
844 .into_iter()
845 .filter_map(
846 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
847 let local_repo = snapshot.local_repo_for_path(&path)?;
848 let relative_path = local_repo.relativize(&path).ok()?;
849 let staged_text = if needs_staged_text {
850 local_repo.repo().load_index_text(&relative_path)
851 } else {
852 None
853 };
854 let committed_text = if needs_committed_text {
855 local_repo.repo().load_committed_text(&relative_path)
856 } else {
857 None
858 };
859 let diff_bases_change =
860 match (needs_staged_text, needs_committed_text) {
861 (true, true) => Some(if staged_text == committed_text {
862 DiffBasesChange::SetBoth(committed_text)
863 } else {
864 DiffBasesChange::SetEach {
865 index: staged_text,
866 head: committed_text,
867 }
868 }),
869 (true, false) => {
870 Some(DiffBasesChange::SetIndex(staged_text))
871 }
872 (false, true) => {
873 Some(DiffBasesChange::SetHead(committed_text))
874 }
875 (false, false) => None,
876 };
877 Some((buffer_snapshot, diff_bases_change))
878 },
879 )
880 .collect::<Vec<_>>()
881 })
882 .await;
883
884 this.update(&mut cx, |this, cx| {
885 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
886 let Some(OpenBuffer::Complete { diff_state, .. }) =
887 this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
888 else {
889 continue;
890 };
891 let Some(diff_bases_change) = diff_bases_change else {
892 continue;
893 };
894
895 diff_state.update(cx, |diff_state, cx| {
896 use proto::update_diff_bases::Mode;
897
898 if let Some((client, project_id)) = this.downstream_client.as_ref() {
899 let buffer_id = buffer_snapshot.remote_id().to_proto();
900 let (staged_text, committed_text, mode) = match diff_bases_change
901 .clone()
902 {
903 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
904 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
905 DiffBasesChange::SetEach { index, head } => {
906 (index, head, Mode::IndexAndHead)
907 }
908 DiffBasesChange::SetBoth(text) => {
909 (None, text, Mode::IndexMatchesHead)
910 }
911 };
912 let message = proto::UpdateDiffBases {
913 project_id: *project_id,
914 buffer_id,
915 staged_text,
916 committed_text,
917 mode: mode as i32,
918 };
919
920 client.send(message).log_err();
921 }
922
923 let _ =
924 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
925 });
926 }
927 })
928 })
929 .detach_and_log_err(cx);
930 }
931
932 fn local_worktree_entry_changed(
933 this: &mut BufferStore,
934 entry_id: ProjectEntryId,
935 path: &Arc<Path>,
936 worktree: &Entity<worktree::Worktree>,
937 snapshot: &worktree::Snapshot,
938 cx: &mut Context<BufferStore>,
939 ) -> Option<()> {
940 let project_path = ProjectPath {
941 worktree_id: snapshot.id(),
942 path: path.clone(),
943 };
944
945 let buffer_id = {
946 let local = this.as_local_mut()?;
947 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
948 Some(&buffer_id) => buffer_id,
949 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
950 }
951 };
952
953 let buffer = if let Some(buffer) = this.get(buffer_id) {
954 Some(buffer)
955 } else {
956 this.opened_buffers.remove(&buffer_id);
957 None
958 };
959
960 let buffer = if let Some(buffer) = buffer {
961 buffer
962 } else {
963 let this = this.as_local_mut()?;
964 this.local_buffer_ids_by_path.remove(&project_path);
965 this.local_buffer_ids_by_entry_id.remove(&entry_id);
966 return None;
967 };
968
969 let events = buffer.update(cx, |buffer, cx| {
970 let local = this.as_local_mut()?;
971 let file = buffer.file()?;
972 let old_file = File::from_dyn(Some(file))?;
973 if old_file.worktree != *worktree {
974 return None;
975 }
976
977 let snapshot_entry = old_file
978 .entry_id
979 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
980 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
981
982 let new_file = if let Some(entry) = snapshot_entry {
983 File {
984 disk_state: match entry.mtime {
985 Some(mtime) => DiskState::Present { mtime },
986 None => old_file.disk_state,
987 },
988 is_local: true,
989 entry_id: Some(entry.id),
990 path: entry.path.clone(),
991 worktree: worktree.clone(),
992 is_private: entry.is_private,
993 }
994 } else {
995 File {
996 disk_state: DiskState::Deleted,
997 is_local: true,
998 entry_id: old_file.entry_id,
999 path: old_file.path.clone(),
1000 worktree: worktree.clone(),
1001 is_private: old_file.is_private,
1002 }
1003 };
1004
1005 if new_file == *old_file {
1006 return None;
1007 }
1008
1009 let mut events = Vec::new();
1010 if new_file.path != old_file.path {
1011 local.local_buffer_ids_by_path.remove(&ProjectPath {
1012 path: old_file.path.clone(),
1013 worktree_id: old_file.worktree_id(cx),
1014 });
1015 local.local_buffer_ids_by_path.insert(
1016 ProjectPath {
1017 worktree_id: new_file.worktree_id(cx),
1018 path: new_file.path.clone(),
1019 },
1020 buffer_id,
1021 );
1022 events.push(BufferStoreEvent::BufferChangedFilePath {
1023 buffer: cx.entity(),
1024 old_file: buffer.file().cloned(),
1025 });
1026 }
1027
1028 if new_file.entry_id != old_file.entry_id {
1029 if let Some(entry_id) = old_file.entry_id {
1030 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1031 }
1032 if let Some(entry_id) = new_file.entry_id {
1033 local
1034 .local_buffer_ids_by_entry_id
1035 .insert(entry_id, buffer_id);
1036 }
1037 }
1038
1039 if let Some((client, project_id)) = &this.downstream_client {
1040 client
1041 .send(proto::UpdateBufferFile {
1042 project_id: *project_id,
1043 buffer_id: buffer_id.to_proto(),
1044 file: Some(new_file.to_proto(cx)),
1045 })
1046 .ok();
1047 }
1048
1049 buffer.file_updated(Arc::new(new_file), cx);
1050 Some(events)
1051 })?;
1052
1053 for event in events {
1054 cx.emit(event);
1055 }
1056
1057 None
1058 }
1059
1060 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1061 let file = File::from_dyn(buffer.read(cx).file())?;
1062
1063 let remote_id = buffer.read(cx).remote_id();
1064 if let Some(entry_id) = file.entry_id {
1065 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1066 Some(_) => {
1067 return None;
1068 }
1069 None => {
1070 self.local_buffer_ids_by_entry_id
1071 .insert(entry_id, remote_id);
1072 }
1073 }
1074 };
1075 self.local_buffer_ids_by_path.insert(
1076 ProjectPath {
1077 worktree_id: file.worktree_id(cx),
1078 path: file.path.clone(),
1079 },
1080 remote_id,
1081 );
1082
1083 Some(())
1084 }
1085
1086 fn save_buffer(
1087 &self,
1088 buffer: Entity<Buffer>,
1089 cx: &mut Context<BufferStore>,
1090 ) -> Task<Result<()>> {
1091 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1092 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1093 };
1094 let worktree = file.worktree.clone();
1095 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1096 }
1097
1098 fn save_buffer_as(
1099 &self,
1100 buffer: Entity<Buffer>,
1101 path: ProjectPath,
1102 cx: &mut Context<BufferStore>,
1103 ) -> Task<Result<()>> {
1104 let Some(worktree) = self
1105 .worktree_store
1106 .read(cx)
1107 .worktree_for_id(path.worktree_id, cx)
1108 else {
1109 return Task::ready(Err(anyhow!("no such worktree")));
1110 };
1111 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1112 }
1113
1114 fn open_buffer(
1115 &self,
1116 path: Arc<Path>,
1117 worktree: Entity<Worktree>,
1118 cx: &mut Context<BufferStore>,
1119 ) -> Task<Result<Entity<Buffer>>> {
1120 let load_buffer = worktree.update(cx, |worktree, cx| {
1121 let load_file = worktree.load_file(path.as_ref(), cx);
1122 let reservation = cx.reserve_entity();
1123 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1124 cx.spawn(move |_, mut cx| async move {
1125 let loaded = load_file.await?;
1126 let text_buffer = cx
1127 .background_executor()
1128 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1129 .await;
1130 cx.insert_entity(reservation, |_| {
1131 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1132 })
1133 })
1134 });
1135
1136 cx.spawn(move |this, mut cx| async move {
1137 let buffer = match load_buffer.await {
1138 Ok(buffer) => Ok(buffer),
1139 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1140 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1141 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1142 Buffer::build(
1143 text_buffer,
1144 Some(Arc::new(File {
1145 worktree,
1146 path,
1147 disk_state: DiskState::New,
1148 entry_id: None,
1149 is_local: true,
1150 is_private: false,
1151 })),
1152 Capability::ReadWrite,
1153 )
1154 }),
1155 Err(e) => Err(e),
1156 }?;
1157 this.update(&mut cx, |this, cx| {
1158 this.add_buffer(buffer.clone(), cx)?;
1159 let buffer_id = buffer.read(cx).remote_id();
1160 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1161 let this = this.as_local_mut().unwrap();
1162 this.local_buffer_ids_by_path.insert(
1163 ProjectPath {
1164 worktree_id: file.worktree_id(cx),
1165 path: file.path.clone(),
1166 },
1167 buffer_id,
1168 );
1169
1170 if let Some(entry_id) = file.entry_id {
1171 this.local_buffer_ids_by_entry_id
1172 .insert(entry_id, buffer_id);
1173 }
1174 }
1175
1176 anyhow::Ok(())
1177 })??;
1178
1179 Ok(buffer)
1180 })
1181 }
1182
1183 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1184 cx.spawn(|buffer_store, mut cx| async move {
1185 let buffer =
1186 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1187 buffer_store.update(&mut cx, |buffer_store, cx| {
1188 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1189 })?;
1190 Ok(buffer)
1191 })
1192 }
1193
1194 fn reload_buffers(
1195 &self,
1196 buffers: HashSet<Entity<Buffer>>,
1197 push_to_history: bool,
1198 cx: &mut Context<BufferStore>,
1199 ) -> Task<Result<ProjectTransaction>> {
1200 cx.spawn(move |_, mut cx| async move {
1201 let mut project_transaction = ProjectTransaction::default();
1202 for buffer in buffers {
1203 let transaction = buffer
1204 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1205 .await?;
1206 buffer.update(&mut cx, |buffer, cx| {
1207 if let Some(transaction) = transaction {
1208 if !push_to_history {
1209 buffer.forget_transaction(transaction.id);
1210 }
1211 project_transaction.0.insert(cx.entity(), transaction);
1212 }
1213 })?;
1214 }
1215
1216 Ok(project_transaction)
1217 })
1218 }
1219}
1220
1221impl BufferStore {
1222 pub fn init(client: &AnyProtoClient) {
1223 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1224 client.add_entity_message_handler(Self::handle_buffer_saved);
1225 client.add_entity_message_handler(Self::handle_update_buffer_file);
1226 client.add_entity_request_handler(Self::handle_save_buffer);
1227 client.add_entity_request_handler(Self::handle_blame_buffer);
1228 client.add_entity_request_handler(Self::handle_reload_buffers);
1229 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1230 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1231 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1232 client.add_entity_message_handler(Self::handle_update_diff_bases);
1233 }
1234
1235 /// Creates a buffer store, optionally retaining its buffers.
1236 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1237 Self {
1238 state: BufferStoreState::Local(LocalBufferStore {
1239 local_buffer_ids_by_path: Default::default(),
1240 local_buffer_ids_by_entry_id: Default::default(),
1241 worktree_store: worktree_store.clone(),
1242 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1243 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1244 let this = this.as_local_mut().unwrap();
1245 this.subscribe_to_worktree(worktree, cx);
1246 }
1247 }),
1248 }),
1249 downstream_client: None,
1250 opened_buffers: Default::default(),
1251 shared_buffers: Default::default(),
1252 loading_buffers: Default::default(),
1253 loading_diffs: Default::default(),
1254 worktree_store,
1255 }
1256 }
1257
1258 pub fn remote(
1259 worktree_store: Entity<WorktreeStore>,
1260 upstream_client: AnyProtoClient,
1261 remote_id: u64,
1262 _cx: &mut Context<Self>,
1263 ) -> Self {
1264 Self {
1265 state: BufferStoreState::Remote(RemoteBufferStore {
1266 shared_with_me: Default::default(),
1267 loading_remote_buffers_by_id: Default::default(),
1268 remote_buffer_listeners: Default::default(),
1269 project_id: remote_id,
1270 upstream_client,
1271 worktree_store: worktree_store.clone(),
1272 }),
1273 downstream_client: None,
1274 opened_buffers: Default::default(),
1275 loading_buffers: Default::default(),
1276 loading_diffs: Default::default(),
1277 shared_buffers: Default::default(),
1278 worktree_store,
1279 }
1280 }
1281
1282 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1283 match &mut self.state {
1284 BufferStoreState::Local(state) => Some(state),
1285 _ => None,
1286 }
1287 }
1288
1289 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1290 match &mut self.state {
1291 BufferStoreState::Remote(state) => Some(state),
1292 _ => None,
1293 }
1294 }
1295
1296 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1297 match &self.state {
1298 BufferStoreState::Remote(state) => Some(state),
1299 _ => None,
1300 }
1301 }
1302
1303 pub fn open_buffer(
1304 &mut self,
1305 project_path: ProjectPath,
1306 cx: &mut Context<Self>,
1307 ) -> Task<Result<Entity<Buffer>>> {
1308 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1309 return Task::ready(Ok(buffer));
1310 }
1311
1312 let task = match self.loading_buffers.entry(project_path.clone()) {
1313 hash_map::Entry::Occupied(e) => e.get().clone(),
1314 hash_map::Entry::Vacant(entry) => {
1315 let path = project_path.path.clone();
1316 let Some(worktree) = self
1317 .worktree_store
1318 .read(cx)
1319 .worktree_for_id(project_path.worktree_id, cx)
1320 else {
1321 return Task::ready(Err(anyhow!("no such worktree")));
1322 };
1323 let load_buffer = match &self.state {
1324 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1325 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1326 };
1327
1328 entry
1329 .insert(
1330 cx.spawn(move |this, mut cx| async move {
1331 let load_result = load_buffer.await;
1332 this.update(&mut cx, |this, _cx| {
1333 // Record the fact that the buffer is no longer loading.
1334 this.loading_buffers.remove(&project_path);
1335 })
1336 .ok();
1337 load_result.map_err(Arc::new)
1338 })
1339 .shared(),
1340 )
1341 .clone()
1342 }
1343 };
1344
1345 cx.background_executor()
1346 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1347 }
1348
1349 pub fn open_unstaged_diff(
1350 &mut self,
1351 buffer: Entity<Buffer>,
1352 cx: &mut Context<Self>,
1353 ) -> Task<Result<Entity<BufferDiff>>> {
1354 let buffer_id = buffer.read(cx).remote_id();
1355 if let Some(diff) = self.get_unstaged_diff(buffer_id, cx) {
1356 return Task::ready(Ok(diff));
1357 }
1358
1359 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1360 hash_map::Entry::Occupied(e) => e.get().clone(),
1361 hash_map::Entry::Vacant(entry) => {
1362 let staged_text = match &self.state {
1363 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1364 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1365 };
1366
1367 entry
1368 .insert(
1369 cx.spawn(move |this, cx| async move {
1370 Self::open_diff_internal(
1371 this,
1372 DiffKind::Unstaged,
1373 staged_text.await.map(DiffBasesChange::SetIndex),
1374 buffer,
1375 cx,
1376 )
1377 .await
1378 .map_err(Arc::new)
1379 })
1380 .shared(),
1381 )
1382 .clone()
1383 }
1384 };
1385
1386 cx.background_executor()
1387 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1388 }
1389
1390 pub fn open_uncommitted_diff(
1391 &mut self,
1392 buffer: Entity<Buffer>,
1393 cx: &mut Context<Self>,
1394 ) -> Task<Result<Entity<BufferDiff>>> {
1395 let buffer_id = buffer.read(cx).remote_id();
1396 if let Some(diff) = self.get_uncommitted_diff(buffer_id, cx) {
1397 return Task::ready(Ok(diff));
1398 }
1399
1400 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1401 hash_map::Entry::Occupied(e) => e.get().clone(),
1402 hash_map::Entry::Vacant(entry) => {
1403 let changes = match &self.state {
1404 BufferStoreState::Local(this) => {
1405 let committed_text = this.load_committed_text(&buffer, cx);
1406 let staged_text = this.load_staged_text(&buffer, cx);
1407 cx.background_executor().spawn(async move {
1408 let committed_text = committed_text.await?;
1409 let staged_text = staged_text.await?;
1410 let diff_bases_change = if committed_text == staged_text {
1411 DiffBasesChange::SetBoth(committed_text)
1412 } else {
1413 DiffBasesChange::SetEach {
1414 index: staged_text,
1415 head: committed_text,
1416 }
1417 };
1418 Ok(diff_bases_change)
1419 })
1420 }
1421 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1422 };
1423
1424 entry
1425 .insert(
1426 cx.spawn(move |this, cx| async move {
1427 Self::open_diff_internal(
1428 this,
1429 DiffKind::Uncommitted,
1430 changes.await,
1431 buffer,
1432 cx,
1433 )
1434 .await
1435 .map_err(Arc::new)
1436 })
1437 .shared(),
1438 )
1439 .clone()
1440 }
1441 };
1442
1443 cx.background_executor()
1444 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1445 }
1446
1447 async fn open_diff_internal(
1448 this: WeakEntity<Self>,
1449 kind: DiffKind,
1450 texts: Result<DiffBasesChange>,
1451 buffer_entity: Entity<Buffer>,
1452 mut cx: AsyncApp,
1453 ) -> Result<Entity<BufferDiff>> {
1454 let diff_bases_change = match texts {
1455 Err(e) => {
1456 this.update(&mut cx, |this, cx| {
1457 let buffer = buffer_entity.read(cx);
1458 let buffer_id = buffer.remote_id();
1459 this.loading_diffs.remove(&(buffer_id, kind));
1460 })?;
1461 return Err(e);
1462 }
1463 Ok(change) => change,
1464 };
1465
1466 this.update(&mut cx, |this, cx| {
1467 let buffer = buffer_entity.read(cx);
1468 let buffer_id = buffer.remote_id();
1469 let language = buffer.language().cloned();
1470 let language_registry = buffer.language_registry();
1471 let text_snapshot = buffer.text_snapshot();
1472 this.loading_diffs.remove(&(buffer_id, kind));
1473
1474 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1475 this.opened_buffers.get_mut(&buffer_id)
1476 {
1477 diff_state.update(cx, |diff_state, cx| {
1478 diff_state.language = language;
1479 diff_state.language_registry = language_registry;
1480
1481 let diff = cx.new(|_| BufferDiff {
1482 buffer_id,
1483 snapshot: BufferDiffSnapshot::new(&text_snapshot),
1484 unstaged_diff: None,
1485 });
1486 match kind {
1487 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1488 DiffKind::Uncommitted => {
1489 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1490 diff
1491 } else {
1492 let unstaged_diff = cx.new(|_| BufferDiff {
1493 buffer_id,
1494 snapshot: BufferDiffSnapshot::new(&text_snapshot),
1495 unstaged_diff: None,
1496 });
1497 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1498 unstaged_diff
1499 };
1500
1501 diff.update(cx, |diff, _| {
1502 diff.unstaged_diff = Some(unstaged_diff);
1503 });
1504 diff_state.uncommitted_diff = Some(diff.downgrade())
1505 }
1506 };
1507
1508 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1509
1510 Ok(async move {
1511 rx.await.ok();
1512 Ok(diff)
1513 })
1514 })
1515 } else {
1516 Err(anyhow!("buffer was closed"))
1517 }
1518 })??
1519 .await
1520 }
1521
1522 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1523 match &self.state {
1524 BufferStoreState::Local(this) => this.create_buffer(cx),
1525 BufferStoreState::Remote(this) => this.create_buffer(cx),
1526 }
1527 }
1528
1529 pub fn save_buffer(
1530 &mut self,
1531 buffer: Entity<Buffer>,
1532 cx: &mut Context<Self>,
1533 ) -> Task<Result<()>> {
1534 match &mut self.state {
1535 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1536 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1537 }
1538 }
1539
1540 pub fn save_buffer_as(
1541 &mut self,
1542 buffer: Entity<Buffer>,
1543 path: ProjectPath,
1544 cx: &mut Context<Self>,
1545 ) -> Task<Result<()>> {
1546 let old_file = buffer.read(cx).file().cloned();
1547 let task = match &self.state {
1548 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1549 BufferStoreState::Remote(this) => {
1550 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1551 }
1552 };
1553 cx.spawn(|this, mut cx| async move {
1554 task.await?;
1555 this.update(&mut cx, |_, cx| {
1556 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1557 })
1558 })
1559 }
1560
1561 pub fn blame_buffer(
1562 &self,
1563 buffer: &Entity<Buffer>,
1564 version: Option<clock::Global>,
1565 cx: &App,
1566 ) -> Task<Result<Option<Blame>>> {
1567 let buffer = buffer.read(cx);
1568 let Some(file) = File::from_dyn(buffer.file()) else {
1569 return Task::ready(Err(anyhow!("buffer has no file")));
1570 };
1571
1572 match file.worktree.clone().read(cx) {
1573 Worktree::Local(worktree) => {
1574 let worktree = worktree.snapshot();
1575 let blame_params = maybe!({
1576 let local_repo = match worktree.local_repo_for_path(&file.path) {
1577 Some(repo_for_path) => repo_for_path,
1578 None => return Ok(None),
1579 };
1580
1581 let relative_path = local_repo
1582 .relativize(&file.path)
1583 .context("failed to relativize buffer path")?;
1584
1585 let repo = local_repo.repo().clone();
1586
1587 let content = match version {
1588 Some(version) => buffer.rope_for_version(&version).clone(),
1589 None => buffer.as_rope().clone(),
1590 };
1591
1592 anyhow::Ok(Some((repo, relative_path, content)))
1593 });
1594
1595 cx.background_executor().spawn(async move {
1596 let Some((repo, relative_path, content)) = blame_params? else {
1597 return Ok(None);
1598 };
1599 repo.blame(&relative_path, content)
1600 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1601 .map(Some)
1602 })
1603 }
1604 Worktree::Remote(worktree) => {
1605 let buffer_id = buffer.remote_id();
1606 let version = buffer.version();
1607 let project_id = worktree.project_id();
1608 let client = worktree.client();
1609 cx.spawn(|_| async move {
1610 let response = client
1611 .request(proto::BlameBuffer {
1612 project_id,
1613 buffer_id: buffer_id.into(),
1614 version: serialize_version(&version),
1615 })
1616 .await?;
1617 Ok(deserialize_blame_buffer_response(response))
1618 })
1619 }
1620 }
1621 }
1622
1623 pub fn get_permalink_to_line(
1624 &self,
1625 buffer: &Entity<Buffer>,
1626 selection: Range<u32>,
1627 cx: &App,
1628 ) -> Task<Result<url::Url>> {
1629 let buffer = buffer.read(cx);
1630 let Some(file) = File::from_dyn(buffer.file()) else {
1631 return Task::ready(Err(anyhow!("buffer has no file")));
1632 };
1633
1634 match file.worktree.read(cx) {
1635 Worktree::Local(worktree) => {
1636 let worktree_path = worktree.abs_path().clone();
1637 let Some((repo_entry, repo)) =
1638 worktree.repository_for_path(file.path()).and_then(|entry| {
1639 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1640 Some((entry, repo))
1641 })
1642 else {
1643 // If we're not in a Git repo, check whether this is a Rust source
1644 // file in the Cargo registry (presumably opened with go-to-definition
1645 // from a normal Rust file). If so, we can put together a permalink
1646 // using crate metadata.
1647 if !buffer
1648 .language()
1649 .is_some_and(|lang| lang.name() == "Rust".into())
1650 {
1651 return Task::ready(Err(anyhow!("no permalink available")));
1652 }
1653 let file_path = worktree_path.join(file.path());
1654 return cx.spawn(|cx| async move {
1655 let provider_registry =
1656 cx.update(GitHostingProviderRegistry::default_global)?;
1657 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1658 .map_err(|_| anyhow!("no permalink available"))
1659 });
1660 };
1661
1662 let path = match repo_entry.relativize(file.path()) {
1663 Ok(RepoPath(path)) => path,
1664 Err(e) => return Task::ready(Err(e)),
1665 };
1666
1667 cx.spawn(|cx| async move {
1668 const REMOTE_NAME: &str = "origin";
1669 let origin_url = repo
1670 .remote_url(REMOTE_NAME)
1671 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1672
1673 let sha = repo
1674 .head_sha()
1675 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1676
1677 let provider_registry =
1678 cx.update(GitHostingProviderRegistry::default_global)?;
1679
1680 let (provider, remote) =
1681 parse_git_remote_url(provider_registry, &origin_url)
1682 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1683
1684 let path = path
1685 .to_str()
1686 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1687
1688 Ok(provider.build_permalink(
1689 remote,
1690 BuildPermalinkParams {
1691 sha: &sha,
1692 path,
1693 selection: Some(selection),
1694 },
1695 ))
1696 })
1697 }
1698 Worktree::Remote(worktree) => {
1699 let buffer_id = buffer.remote_id();
1700 let project_id = worktree.project_id();
1701 let client = worktree.client();
1702 cx.spawn(|_| async move {
1703 let response = client
1704 .request(proto::GetPermalinkToLine {
1705 project_id,
1706 buffer_id: buffer_id.into(),
1707 selection: Some(proto::Range {
1708 start: selection.start as u64,
1709 end: selection.end as u64,
1710 }),
1711 })
1712 .await?;
1713
1714 url::Url::parse(&response.permalink).context("failed to parse permalink")
1715 })
1716 }
1717 }
1718 }
1719
1720 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1721 let buffer = buffer_entity.read(cx);
1722 let language = buffer.language().cloned();
1723 let language_registry = buffer.language_registry();
1724 let remote_id = buffer.remote_id();
1725 let is_remote = buffer.replica_id() != 0;
1726 let open_buffer = OpenBuffer::Complete {
1727 buffer: buffer_entity.downgrade(),
1728 diff_state: cx.new(|_| BufferDiffState {
1729 language,
1730 language_registry,
1731 ..Default::default()
1732 }),
1733 };
1734
1735 let handle = cx.entity().downgrade();
1736 buffer_entity.update(cx, move |_, cx| {
1737 cx.on_release(move |buffer, cx| {
1738 handle
1739 .update(cx, |_, cx| {
1740 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1741 })
1742 .ok();
1743 })
1744 .detach()
1745 });
1746
1747 match self.opened_buffers.entry(remote_id) {
1748 hash_map::Entry::Vacant(entry) => {
1749 entry.insert(open_buffer);
1750 }
1751 hash_map::Entry::Occupied(mut entry) => {
1752 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1753 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1754 } else if entry.get().upgrade().is_some() {
1755 if is_remote {
1756 return Ok(());
1757 } else {
1758 debug_panic!("buffer {} was already registered", remote_id);
1759 Err(anyhow!("buffer {} was already registered", remote_id))?;
1760 }
1761 }
1762 entry.insert(open_buffer);
1763 }
1764 }
1765
1766 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1767 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1768 Ok(())
1769 }
1770
1771 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1772 self.opened_buffers
1773 .values()
1774 .filter_map(|buffer| buffer.upgrade())
1775 }
1776
1777 pub fn loading_buffers(
1778 &self,
1779 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1780 self.loading_buffers.iter().map(|(path, task)| {
1781 let task = task.clone();
1782 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1783 })
1784 }
1785
1786 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1787 self.buffers().find_map(|buffer| {
1788 let file = File::from_dyn(buffer.read(cx).file())?;
1789 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1790 Some(buffer)
1791 } else {
1792 None
1793 }
1794 })
1795 }
1796
1797 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1798 self.opened_buffers.get(&buffer_id)?.upgrade()
1799 }
1800
1801 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1802 self.get(buffer_id)
1803 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1804 }
1805
1806 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1807 self.get(buffer_id).or_else(|| {
1808 self.as_remote()
1809 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1810 })
1811 }
1812
1813 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1814 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1815 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1816 } else {
1817 None
1818 }
1819 }
1820
1821 pub fn get_uncommitted_diff(
1822 &self,
1823 buffer_id: BufferId,
1824 cx: &App,
1825 ) -> Option<Entity<BufferDiff>> {
1826 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1827 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1828 } else {
1829 None
1830 }
1831 }
1832
1833 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1834 let buffers = self
1835 .buffers()
1836 .map(|buffer| {
1837 let buffer = buffer.read(cx);
1838 proto::BufferVersion {
1839 id: buffer.remote_id().into(),
1840 version: language::proto::serialize_version(&buffer.version),
1841 }
1842 })
1843 .collect();
1844 let incomplete_buffer_ids = self
1845 .as_remote()
1846 .map(|remote| remote.incomplete_buffer_ids())
1847 .unwrap_or_default();
1848 (buffers, incomplete_buffer_ids)
1849 }
1850
1851 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1852 for open_buffer in self.opened_buffers.values_mut() {
1853 if let Some(buffer) = open_buffer.upgrade() {
1854 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1855 }
1856 }
1857
1858 for buffer in self.buffers() {
1859 buffer.update(cx, |buffer, cx| {
1860 buffer.set_capability(Capability::ReadOnly, cx)
1861 });
1862 }
1863
1864 if let Some(remote) = self.as_remote_mut() {
1865 // Wake up all futures currently waiting on a buffer to get opened,
1866 // to give them a chance to fail now that we've disconnected.
1867 remote.remote_buffer_listeners.clear()
1868 }
1869 }
1870
1871 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1872 self.downstream_client = Some((downstream_client, remote_id));
1873 }
1874
1875 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1876 self.downstream_client.take();
1877 self.forget_shared_buffers();
1878 }
1879
1880 pub fn discard_incomplete(&mut self) {
1881 self.opened_buffers
1882 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1883 }
1884
1885 pub fn find_search_candidates(
1886 &mut self,
1887 query: &SearchQuery,
1888 mut limit: usize,
1889 fs: Arc<dyn Fs>,
1890 cx: &mut Context<Self>,
1891 ) -> Receiver<Entity<Buffer>> {
1892 let (tx, rx) = smol::channel::unbounded();
1893 let mut open_buffers = HashSet::default();
1894 let mut unnamed_buffers = Vec::new();
1895 for handle in self.buffers() {
1896 let buffer = handle.read(cx);
1897 if let Some(entry_id) = buffer.entry_id(cx) {
1898 open_buffers.insert(entry_id);
1899 } else {
1900 limit = limit.saturating_sub(1);
1901 unnamed_buffers.push(handle)
1902 };
1903 }
1904
1905 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1906 let project_paths_rx = self
1907 .worktree_store
1908 .update(cx, |worktree_store, cx| {
1909 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1910 })
1911 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1912
1913 cx.spawn(|this, mut cx| async move {
1914 for buffer in unnamed_buffers {
1915 tx.send(buffer).await.ok();
1916 }
1917
1918 let mut project_paths_rx = pin!(project_paths_rx);
1919 while let Some(project_paths) = project_paths_rx.next().await {
1920 let buffers = this.update(&mut cx, |this, cx| {
1921 project_paths
1922 .into_iter()
1923 .map(|project_path| this.open_buffer(project_path, cx))
1924 .collect::<Vec<_>>()
1925 })?;
1926 for buffer_task in buffers {
1927 if let Some(buffer) = buffer_task.await.log_err() {
1928 if tx.send(buffer).await.is_err() {
1929 return anyhow::Ok(());
1930 }
1931 }
1932 }
1933 }
1934 anyhow::Ok(())
1935 })
1936 .detach();
1937 rx
1938 }
1939
1940 pub fn recalculate_buffer_diffs(
1941 &mut self,
1942 buffers: Vec<Entity<Buffer>>,
1943 cx: &mut Context<Self>,
1944 ) -> impl Future<Output = ()> {
1945 let mut futures = Vec::new();
1946 for buffer in buffers {
1947 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1948 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1949 {
1950 let buffer = buffer.read(cx).text_snapshot();
1951 futures.push(diff_state.update(cx, |diff_state, cx| {
1952 diff_state.recalculate_diffs(buffer, cx)
1953 }));
1954 }
1955 }
1956 async move {
1957 futures::future::join_all(futures).await;
1958 }
1959 }
1960
1961 fn on_buffer_event(
1962 &mut self,
1963 buffer: Entity<Buffer>,
1964 event: &BufferEvent,
1965 cx: &mut Context<Self>,
1966 ) {
1967 match event {
1968 BufferEvent::FileHandleChanged => {
1969 if let Some(local) = self.as_local_mut() {
1970 local.buffer_changed_file(buffer, cx);
1971 }
1972 }
1973 BufferEvent::Reloaded => {
1974 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1975 return;
1976 };
1977 let buffer = buffer.read(cx);
1978 downstream_client
1979 .send(proto::BufferReloaded {
1980 project_id: *project_id,
1981 buffer_id: buffer.remote_id().to_proto(),
1982 version: serialize_version(&buffer.version()),
1983 mtime: buffer.saved_mtime().map(|t| t.into()),
1984 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1985 })
1986 .log_err();
1987 }
1988 BufferEvent::LanguageChanged => {
1989 let buffer_id = buffer.read(cx).remote_id();
1990 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1991 self.opened_buffers.get(&buffer_id)
1992 {
1993 diff_state.update(cx, |diff_state, cx| {
1994 diff_state.buffer_language_changed(buffer, cx);
1995 });
1996 }
1997 }
1998 _ => {}
1999 }
2000 }
2001
2002 pub async fn handle_update_buffer(
2003 this: Entity<Self>,
2004 envelope: TypedEnvelope<proto::UpdateBuffer>,
2005 mut cx: AsyncApp,
2006 ) -> Result<proto::Ack> {
2007 let payload = envelope.payload.clone();
2008 let buffer_id = BufferId::new(payload.buffer_id)?;
2009 let ops = payload
2010 .operations
2011 .into_iter()
2012 .map(language::proto::deserialize_operation)
2013 .collect::<Result<Vec<_>, _>>()?;
2014 this.update(&mut cx, |this, cx| {
2015 match this.opened_buffers.entry(buffer_id) {
2016 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2017 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2018 OpenBuffer::Complete { buffer, .. } => {
2019 if let Some(buffer) = buffer.upgrade() {
2020 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2021 }
2022 }
2023 },
2024 hash_map::Entry::Vacant(e) => {
2025 e.insert(OpenBuffer::Operations(ops));
2026 }
2027 }
2028 Ok(proto::Ack {})
2029 })?
2030 }
2031
2032 pub fn register_shared_lsp_handle(
2033 &mut self,
2034 peer_id: proto::PeerId,
2035 buffer_id: BufferId,
2036 handle: OpenLspBufferHandle,
2037 ) {
2038 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2039 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2040 buffer.lsp_handle = Some(handle);
2041 return;
2042 }
2043 }
2044 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2045 }
2046
2047 pub fn handle_synchronize_buffers(
2048 &mut self,
2049 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2050 cx: &mut Context<Self>,
2051 client: Arc<Client>,
2052 ) -> Result<proto::SynchronizeBuffersResponse> {
2053 let project_id = envelope.payload.project_id;
2054 let mut response = proto::SynchronizeBuffersResponse {
2055 buffers: Default::default(),
2056 };
2057 let Some(guest_id) = envelope.original_sender_id else {
2058 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2059 };
2060
2061 self.shared_buffers.entry(guest_id).or_default().clear();
2062 for buffer in envelope.payload.buffers {
2063 let buffer_id = BufferId::new(buffer.id)?;
2064 let remote_version = language::proto::deserialize_version(&buffer.version);
2065 if let Some(buffer) = self.get(buffer_id) {
2066 self.shared_buffers
2067 .entry(guest_id)
2068 .or_default()
2069 .entry(buffer_id)
2070 .or_insert_with(|| SharedBuffer {
2071 buffer: buffer.clone(),
2072 diff: None,
2073 lsp_handle: None,
2074 });
2075
2076 let buffer = buffer.read(cx);
2077 response.buffers.push(proto::BufferVersion {
2078 id: buffer_id.into(),
2079 version: language::proto::serialize_version(&buffer.version),
2080 });
2081
2082 let operations = buffer.serialize_ops(Some(remote_version), cx);
2083 let client = client.clone();
2084 if let Some(file) = buffer.file() {
2085 client
2086 .send(proto::UpdateBufferFile {
2087 project_id,
2088 buffer_id: buffer_id.into(),
2089 file: Some(file.to_proto(cx)),
2090 })
2091 .log_err();
2092 }
2093
2094 // TODO(max): do something
2095 // client
2096 // .send(proto::UpdateStagedText {
2097 // project_id,
2098 // buffer_id: buffer_id.into(),
2099 // diff_base: buffer.diff_base().map(ToString::to_string),
2100 // })
2101 // .log_err();
2102
2103 client
2104 .send(proto::BufferReloaded {
2105 project_id,
2106 buffer_id: buffer_id.into(),
2107 version: language::proto::serialize_version(buffer.saved_version()),
2108 mtime: buffer.saved_mtime().map(|time| time.into()),
2109 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2110 as i32,
2111 })
2112 .log_err();
2113
2114 cx.background_executor()
2115 .spawn(
2116 async move {
2117 let operations = operations.await;
2118 for chunk in split_operations(operations) {
2119 client
2120 .request(proto::UpdateBuffer {
2121 project_id,
2122 buffer_id: buffer_id.into(),
2123 operations: chunk,
2124 })
2125 .await?;
2126 }
2127 anyhow::Ok(())
2128 }
2129 .log_err(),
2130 )
2131 .detach();
2132 }
2133 }
2134 Ok(response)
2135 }
2136
2137 pub fn handle_create_buffer_for_peer(
2138 &mut self,
2139 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2140 replica_id: u16,
2141 capability: Capability,
2142 cx: &mut Context<Self>,
2143 ) -> Result<()> {
2144 let Some(remote) = self.as_remote_mut() else {
2145 return Err(anyhow!("buffer store is not a remote"));
2146 };
2147
2148 if let Some(buffer) =
2149 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2150 {
2151 self.add_buffer(buffer, cx)?;
2152 }
2153
2154 Ok(())
2155 }
2156
2157 pub async fn handle_update_buffer_file(
2158 this: Entity<Self>,
2159 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2160 mut cx: AsyncApp,
2161 ) -> Result<()> {
2162 let buffer_id = envelope.payload.buffer_id;
2163 let buffer_id = BufferId::new(buffer_id)?;
2164
2165 this.update(&mut cx, |this, cx| {
2166 let payload = envelope.payload.clone();
2167 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2168 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2169 let worktree = this
2170 .worktree_store
2171 .read(cx)
2172 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2173 .ok_or_else(|| anyhow!("no such worktree"))?;
2174 let file = File::from_proto(file, worktree, cx)?;
2175 let old_file = buffer.update(cx, |buffer, cx| {
2176 let old_file = buffer.file().cloned();
2177 let new_path = file.path.clone();
2178 buffer.file_updated(Arc::new(file), cx);
2179 if old_file
2180 .as_ref()
2181 .map_or(true, |old| *old.path() != new_path)
2182 {
2183 Some(old_file)
2184 } else {
2185 None
2186 }
2187 });
2188 if let Some(old_file) = old_file {
2189 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2190 }
2191 }
2192 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2193 downstream_client
2194 .send(proto::UpdateBufferFile {
2195 project_id: *project_id,
2196 buffer_id: buffer_id.into(),
2197 file: envelope.payload.file,
2198 })
2199 .log_err();
2200 }
2201 Ok(())
2202 })?
2203 }
2204
2205 pub async fn handle_save_buffer(
2206 this: Entity<Self>,
2207 envelope: TypedEnvelope<proto::SaveBuffer>,
2208 mut cx: AsyncApp,
2209 ) -> Result<proto::BufferSaved> {
2210 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2211 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2212 anyhow::Ok((
2213 this.get_existing(buffer_id)?,
2214 this.downstream_client
2215 .as_ref()
2216 .map(|(_, project_id)| *project_id)
2217 .context("project is not shared")?,
2218 ))
2219 })??;
2220 buffer
2221 .update(&mut cx, |buffer, _| {
2222 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2223 })?
2224 .await?;
2225 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2226
2227 if let Some(new_path) = envelope.payload.new_path {
2228 let new_path = ProjectPath::from_proto(new_path);
2229 this.update(&mut cx, |this, cx| {
2230 this.save_buffer_as(buffer.clone(), new_path, cx)
2231 })?
2232 .await?;
2233 } else {
2234 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2235 .await?;
2236 }
2237
2238 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2239 project_id,
2240 buffer_id: buffer_id.into(),
2241 version: serialize_version(buffer.saved_version()),
2242 mtime: buffer.saved_mtime().map(|time| time.into()),
2243 })
2244 }
2245
2246 pub async fn handle_close_buffer(
2247 this: Entity<Self>,
2248 envelope: TypedEnvelope<proto::CloseBuffer>,
2249 mut cx: AsyncApp,
2250 ) -> Result<()> {
2251 let peer_id = envelope.sender_id;
2252 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2253 this.update(&mut cx, |this, _| {
2254 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2255 if shared.remove(&buffer_id).is_some() {
2256 if shared.is_empty() {
2257 this.shared_buffers.remove(&peer_id);
2258 }
2259 return;
2260 }
2261 }
2262 debug_panic!(
2263 "peer_id {} closed buffer_id {} which was either not open or already closed",
2264 peer_id,
2265 buffer_id
2266 )
2267 })
2268 }
2269
2270 pub async fn handle_buffer_saved(
2271 this: Entity<Self>,
2272 envelope: TypedEnvelope<proto::BufferSaved>,
2273 mut cx: AsyncApp,
2274 ) -> Result<()> {
2275 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2276 let version = deserialize_version(&envelope.payload.version);
2277 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2278 this.update(&mut cx, move |this, cx| {
2279 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2280 buffer.update(cx, |buffer, cx| {
2281 buffer.did_save(version, mtime, cx);
2282 });
2283 }
2284
2285 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2286 downstream_client
2287 .send(proto::BufferSaved {
2288 project_id: *project_id,
2289 buffer_id: buffer_id.into(),
2290 mtime: envelope.payload.mtime,
2291 version: envelope.payload.version,
2292 })
2293 .log_err();
2294 }
2295 })
2296 }
2297
2298 pub async fn handle_buffer_reloaded(
2299 this: Entity<Self>,
2300 envelope: TypedEnvelope<proto::BufferReloaded>,
2301 mut cx: AsyncApp,
2302 ) -> Result<()> {
2303 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2304 let version = deserialize_version(&envelope.payload.version);
2305 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2306 let line_ending = deserialize_line_ending(
2307 proto::LineEnding::from_i32(envelope.payload.line_ending)
2308 .ok_or_else(|| anyhow!("missing line ending"))?,
2309 );
2310 this.update(&mut cx, |this, cx| {
2311 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2312 buffer.update(cx, |buffer, cx| {
2313 buffer.did_reload(version, line_ending, mtime, cx);
2314 });
2315 }
2316
2317 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2318 downstream_client
2319 .send(proto::BufferReloaded {
2320 project_id: *project_id,
2321 buffer_id: buffer_id.into(),
2322 mtime: envelope.payload.mtime,
2323 version: envelope.payload.version,
2324 line_ending: envelope.payload.line_ending,
2325 })
2326 .log_err();
2327 }
2328 })
2329 }
2330
2331 pub async fn handle_blame_buffer(
2332 this: Entity<Self>,
2333 envelope: TypedEnvelope<proto::BlameBuffer>,
2334 mut cx: AsyncApp,
2335 ) -> Result<proto::BlameBufferResponse> {
2336 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2337 let version = deserialize_version(&envelope.payload.version);
2338 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2339 buffer
2340 .update(&mut cx, |buffer, _| {
2341 buffer.wait_for_version(version.clone())
2342 })?
2343 .await?;
2344 let blame = this
2345 .update(&mut cx, |this, cx| {
2346 this.blame_buffer(&buffer, Some(version), cx)
2347 })?
2348 .await?;
2349 Ok(serialize_blame_buffer_response(blame))
2350 }
2351
2352 pub async fn handle_get_permalink_to_line(
2353 this: Entity<Self>,
2354 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2355 mut cx: AsyncApp,
2356 ) -> Result<proto::GetPermalinkToLineResponse> {
2357 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2358 // let version = deserialize_version(&envelope.payload.version);
2359 let selection = {
2360 let proto_selection = envelope
2361 .payload
2362 .selection
2363 .context("no selection to get permalink for defined")?;
2364 proto_selection.start as u32..proto_selection.end as u32
2365 };
2366 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2367 let permalink = this
2368 .update(&mut cx, |this, cx| {
2369 this.get_permalink_to_line(&buffer, selection, cx)
2370 })?
2371 .await?;
2372 Ok(proto::GetPermalinkToLineResponse {
2373 permalink: permalink.to_string(),
2374 })
2375 }
2376
2377 pub async fn handle_open_unstaged_diff(
2378 this: Entity<Self>,
2379 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2380 mut cx: AsyncApp,
2381 ) -> Result<proto::OpenUnstagedDiffResponse> {
2382 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2383 let diff = this
2384 .update(&mut cx, |this, cx| {
2385 let buffer = this.get(buffer_id)?;
2386 Some(this.open_unstaged_diff(buffer, cx))
2387 })?
2388 .ok_or_else(|| anyhow!("no such buffer"))?
2389 .await?;
2390 this.update(&mut cx, |this, _| {
2391 let shared_buffers = this
2392 .shared_buffers
2393 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2394 .or_default();
2395 debug_assert!(shared_buffers.contains_key(&buffer_id));
2396 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2397 shared.diff = Some(diff.clone());
2398 }
2399 })?;
2400 let staged_text = diff.read_with(&cx, |diff, _| {
2401 diff.snapshot.base_text.as_ref().map(|buffer| buffer.text())
2402 })?;
2403 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2404 }
2405
2406 pub async fn handle_open_uncommitted_diff(
2407 this: Entity<Self>,
2408 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2409 mut cx: AsyncApp,
2410 ) -> Result<proto::OpenUncommittedDiffResponse> {
2411 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2412 let diff = this
2413 .update(&mut cx, |this, cx| {
2414 let buffer = this.get(buffer_id)?;
2415 Some(this.open_uncommitted_diff(buffer, cx))
2416 })?
2417 .ok_or_else(|| anyhow!("no such buffer"))?
2418 .await?;
2419 this.update(&mut cx, |this, _| {
2420 let shared_buffers = this
2421 .shared_buffers
2422 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2423 .or_default();
2424 debug_assert!(shared_buffers.contains_key(&buffer_id));
2425 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2426 shared.diff = Some(diff.clone());
2427 }
2428 })?;
2429 diff.read_with(&cx, |diff, cx| {
2430 use proto::open_uncommitted_diff_response::Mode;
2431
2432 let staged_buffer = diff
2433 .unstaged_diff
2434 .as_ref()
2435 .and_then(|diff| diff.read(cx).snapshot.base_text.as_ref());
2436
2437 let mode;
2438 let staged_text;
2439 let committed_text;
2440 if let Some(committed_buffer) = &diff.snapshot.base_text {
2441 committed_text = Some(committed_buffer.text());
2442 if let Some(staged_buffer) = staged_buffer {
2443 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2444 mode = Mode::IndexMatchesHead;
2445 staged_text = None;
2446 } else {
2447 mode = Mode::IndexAndHead;
2448 staged_text = Some(staged_buffer.text());
2449 }
2450 } else {
2451 mode = Mode::IndexAndHead;
2452 staged_text = None;
2453 }
2454 } else {
2455 mode = Mode::IndexAndHead;
2456 committed_text = None;
2457 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2458 }
2459
2460 proto::OpenUncommittedDiffResponse {
2461 committed_text,
2462 staged_text,
2463 mode: mode.into(),
2464 }
2465 })
2466 }
2467
2468 pub async fn handle_update_diff_bases(
2469 this: Entity<Self>,
2470 request: TypedEnvelope<proto::UpdateDiffBases>,
2471 mut cx: AsyncApp,
2472 ) -> Result<()> {
2473 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2474 this.update(&mut cx, |this, cx| {
2475 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2476 this.opened_buffers.get_mut(&buffer_id)
2477 {
2478 if let Some(buffer) = buffer.upgrade() {
2479 let buffer = buffer.read(cx).text_snapshot();
2480 diff_state.update(cx, |diff_state, cx| {
2481 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2482 })
2483 }
2484 }
2485 })
2486 }
2487
2488 pub fn reload_buffers(
2489 &self,
2490 buffers: HashSet<Entity<Buffer>>,
2491 push_to_history: bool,
2492 cx: &mut Context<Self>,
2493 ) -> Task<Result<ProjectTransaction>> {
2494 if buffers.is_empty() {
2495 return Task::ready(Ok(ProjectTransaction::default()));
2496 }
2497 match &self.state {
2498 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2499 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2500 }
2501 }
2502
2503 async fn handle_reload_buffers(
2504 this: Entity<Self>,
2505 envelope: TypedEnvelope<proto::ReloadBuffers>,
2506 mut cx: AsyncApp,
2507 ) -> Result<proto::ReloadBuffersResponse> {
2508 let sender_id = envelope.original_sender_id().unwrap_or_default();
2509 let reload = this.update(&mut cx, |this, cx| {
2510 let mut buffers = HashSet::default();
2511 for buffer_id in &envelope.payload.buffer_ids {
2512 let buffer_id = BufferId::new(*buffer_id)?;
2513 buffers.insert(this.get_existing(buffer_id)?);
2514 }
2515 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2516 })??;
2517
2518 let project_transaction = reload.await?;
2519 let project_transaction = this.update(&mut cx, |this, cx| {
2520 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2521 })?;
2522 Ok(proto::ReloadBuffersResponse {
2523 transaction: Some(project_transaction),
2524 })
2525 }
2526
2527 pub fn create_buffer_for_peer(
2528 &mut self,
2529 buffer: &Entity<Buffer>,
2530 peer_id: proto::PeerId,
2531 cx: &mut Context<Self>,
2532 ) -> Task<Result<()>> {
2533 let buffer_id = buffer.read(cx).remote_id();
2534 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2535 if shared_buffers.contains_key(&buffer_id) {
2536 return Task::ready(Ok(()));
2537 }
2538 shared_buffers.insert(
2539 buffer_id,
2540 SharedBuffer {
2541 buffer: buffer.clone(),
2542 diff: None,
2543 lsp_handle: None,
2544 },
2545 );
2546
2547 let Some((client, project_id)) = self.downstream_client.clone() else {
2548 return Task::ready(Ok(()));
2549 };
2550
2551 cx.spawn(|this, mut cx| async move {
2552 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2553 return anyhow::Ok(());
2554 };
2555
2556 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2557 let operations = operations.await;
2558 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2559
2560 let initial_state = proto::CreateBufferForPeer {
2561 project_id,
2562 peer_id: Some(peer_id),
2563 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2564 };
2565
2566 if client.send(initial_state).log_err().is_some() {
2567 let client = client.clone();
2568 cx.background_executor()
2569 .spawn(async move {
2570 let mut chunks = split_operations(operations).peekable();
2571 while let Some(chunk) = chunks.next() {
2572 let is_last = chunks.peek().is_none();
2573 client.send(proto::CreateBufferForPeer {
2574 project_id,
2575 peer_id: Some(peer_id),
2576 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2577 proto::BufferChunk {
2578 buffer_id: buffer_id.into(),
2579 operations: chunk,
2580 is_last,
2581 },
2582 )),
2583 })?;
2584 }
2585 anyhow::Ok(())
2586 })
2587 .await
2588 .log_err();
2589 }
2590 Ok(())
2591 })
2592 }
2593
2594 pub fn forget_shared_buffers(&mut self) {
2595 self.shared_buffers.clear();
2596 }
2597
2598 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2599 self.shared_buffers.remove(peer_id);
2600 }
2601
2602 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2603 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2604 self.shared_buffers.insert(new_peer_id, buffers);
2605 }
2606 }
2607
2608 pub fn has_shared_buffers(&self) -> bool {
2609 !self.shared_buffers.is_empty()
2610 }
2611
2612 pub fn create_local_buffer(
2613 &mut self,
2614 text: &str,
2615 language: Option<Arc<Language>>,
2616 cx: &mut Context<Self>,
2617 ) -> Entity<Buffer> {
2618 let buffer = cx.new(|cx| {
2619 Buffer::local(text, cx)
2620 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2621 });
2622
2623 self.add_buffer(buffer.clone(), cx).log_err();
2624 let buffer_id = buffer.read(cx).remote_id();
2625
2626 let this = self
2627 .as_local_mut()
2628 .expect("local-only method called in a non-local context");
2629 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2630 this.local_buffer_ids_by_path.insert(
2631 ProjectPath {
2632 worktree_id: file.worktree_id(cx),
2633 path: file.path.clone(),
2634 },
2635 buffer_id,
2636 );
2637
2638 if let Some(entry_id) = file.entry_id {
2639 this.local_buffer_ids_by_entry_id
2640 .insert(entry_id, buffer_id);
2641 }
2642 }
2643 buffer
2644 }
2645
2646 pub fn deserialize_project_transaction(
2647 &mut self,
2648 message: proto::ProjectTransaction,
2649 push_to_history: bool,
2650 cx: &mut Context<Self>,
2651 ) -> Task<Result<ProjectTransaction>> {
2652 if let Some(this) = self.as_remote_mut() {
2653 this.deserialize_project_transaction(message, push_to_history, cx)
2654 } else {
2655 debug_panic!("not a remote buffer store");
2656 Task::ready(Err(anyhow!("not a remote buffer store")))
2657 }
2658 }
2659
2660 pub fn wait_for_remote_buffer(
2661 &mut self,
2662 id: BufferId,
2663 cx: &mut Context<BufferStore>,
2664 ) -> Task<Result<Entity<Buffer>>> {
2665 if let Some(this) = self.as_remote_mut() {
2666 this.wait_for_remote_buffer(id, cx)
2667 } else {
2668 debug_panic!("not a remote buffer store");
2669 Task::ready(Err(anyhow!("not a remote buffer store")))
2670 }
2671 }
2672
2673 pub fn serialize_project_transaction_for_peer(
2674 &mut self,
2675 project_transaction: ProjectTransaction,
2676 peer_id: proto::PeerId,
2677 cx: &mut Context<Self>,
2678 ) -> proto::ProjectTransaction {
2679 let mut serialized_transaction = proto::ProjectTransaction {
2680 buffer_ids: Default::default(),
2681 transactions: Default::default(),
2682 };
2683 for (buffer, transaction) in project_transaction.0 {
2684 self.create_buffer_for_peer(&buffer, peer_id, cx)
2685 .detach_and_log_err(cx);
2686 serialized_transaction
2687 .buffer_ids
2688 .push(buffer.read(cx).remote_id().into());
2689 serialized_transaction
2690 .transactions
2691 .push(language::proto::serialize_transaction(&transaction));
2692 }
2693 serialized_transaction
2694 }
2695}
2696
2697impl OpenBuffer {
2698 fn upgrade(&self) -> Option<Entity<Buffer>> {
2699 match self {
2700 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2701 OpenBuffer::Operations(_) => None,
2702 }
2703 }
2704}
2705
2706fn is_not_found_error(error: &anyhow::Error) -> bool {
2707 error
2708 .root_cause()
2709 .downcast_ref::<io::Error>()
2710 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2711}
2712
2713fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2714 let Some(blame) = blame else {
2715 return proto::BlameBufferResponse {
2716 blame_response: None,
2717 };
2718 };
2719
2720 let entries = blame
2721 .entries
2722 .into_iter()
2723 .map(|entry| proto::BlameEntry {
2724 sha: entry.sha.as_bytes().into(),
2725 start_line: entry.range.start,
2726 end_line: entry.range.end,
2727 original_line_number: entry.original_line_number,
2728 author: entry.author.clone(),
2729 author_mail: entry.author_mail.clone(),
2730 author_time: entry.author_time,
2731 author_tz: entry.author_tz.clone(),
2732 committer: entry.committer.clone(),
2733 committer_mail: entry.committer_mail.clone(),
2734 committer_time: entry.committer_time,
2735 committer_tz: entry.committer_tz.clone(),
2736 summary: entry.summary.clone(),
2737 previous: entry.previous.clone(),
2738 filename: entry.filename.clone(),
2739 })
2740 .collect::<Vec<_>>();
2741
2742 let messages = blame
2743 .messages
2744 .into_iter()
2745 .map(|(oid, message)| proto::CommitMessage {
2746 oid: oid.as_bytes().into(),
2747 message,
2748 })
2749 .collect::<Vec<_>>();
2750
2751 let permalinks = blame
2752 .permalinks
2753 .into_iter()
2754 .map(|(oid, url)| proto::CommitPermalink {
2755 oid: oid.as_bytes().into(),
2756 permalink: url.to_string(),
2757 })
2758 .collect::<Vec<_>>();
2759
2760 proto::BlameBufferResponse {
2761 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2762 entries,
2763 messages,
2764 permalinks,
2765 remote_url: blame.remote_url,
2766 }),
2767 }
2768}
2769
2770fn deserialize_blame_buffer_response(
2771 response: proto::BlameBufferResponse,
2772) -> Option<git::blame::Blame> {
2773 let response = response.blame_response?;
2774 let entries = response
2775 .entries
2776 .into_iter()
2777 .filter_map(|entry| {
2778 Some(git::blame::BlameEntry {
2779 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2780 range: entry.start_line..entry.end_line,
2781 original_line_number: entry.original_line_number,
2782 committer: entry.committer,
2783 committer_time: entry.committer_time,
2784 committer_tz: entry.committer_tz,
2785 committer_mail: entry.committer_mail,
2786 author: entry.author,
2787 author_mail: entry.author_mail,
2788 author_time: entry.author_time,
2789 author_tz: entry.author_tz,
2790 summary: entry.summary,
2791 previous: entry.previous,
2792 filename: entry.filename,
2793 })
2794 })
2795 .collect::<Vec<_>>();
2796
2797 let messages = response
2798 .messages
2799 .into_iter()
2800 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2801 .collect::<HashMap<_, _>>();
2802
2803 let permalinks = response
2804 .permalinks
2805 .into_iter()
2806 .filter_map(|permalink| {
2807 Some((
2808 git::Oid::from_bytes(&permalink.oid).ok()?,
2809 Url::from_str(&permalink.permalink).ok()?,
2810 ))
2811 })
2812 .collect::<HashMap<_, _>>();
2813
2814 Some(Blame {
2815 entries,
2816 permalinks,
2817 messages,
2818 remote_url: response.remote_url,
2819 })
2820}
2821
2822fn get_permalink_in_rust_registry_src(
2823 provider_registry: Arc<GitHostingProviderRegistry>,
2824 path: PathBuf,
2825 selection: Range<u32>,
2826) -> Result<url::Url> {
2827 #[derive(Deserialize)]
2828 struct CargoVcsGit {
2829 sha1: String,
2830 }
2831
2832 #[derive(Deserialize)]
2833 struct CargoVcsInfo {
2834 git: CargoVcsGit,
2835 path_in_vcs: String,
2836 }
2837
2838 #[derive(Deserialize)]
2839 struct CargoPackage {
2840 repository: String,
2841 }
2842
2843 #[derive(Deserialize)]
2844 struct CargoToml {
2845 package: CargoPackage,
2846 }
2847
2848 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2849 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2850 Some((dir, json))
2851 }) else {
2852 bail!("No .cargo_vcs_info.json found in parent directories")
2853 };
2854 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2855 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2856 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2857 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2858 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2859 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2860 let permalink = provider.build_permalink(
2861 remote,
2862 BuildPermalinkParams {
2863 sha: &cargo_vcs_info.git.sha1,
2864 path: &path.to_string_lossy(),
2865 selection: Some(selection),
2866 },
2867 );
2868 Ok(permalink)
2869}