1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use diff::{BufferDiff, BufferDiffEvent, BufferDiffSnapshot};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
27use serde::Deserialize;
28use smol::channel::Receiver;
29use std::{
30 io,
31 ops::Range,
32 path::{Path, PathBuf},
33 pin::pin,
34 str::FromStr as _,
35 sync::Arc,
36 time::Instant,
37};
38use text::BufferId;
39use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
40use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
43enum DiffKind {
44 Unstaged,
45 Uncommitted,
46}
47
48/// A set of open buffers.
49pub struct BufferStore {
50 state: BufferStoreState,
51 #[allow(clippy::type_complexity)]
52 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
53 #[allow(clippy::type_complexity)]
54 loading_diffs:
55 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
56 worktree_store: Entity<WorktreeStore>,
57 opened_buffers: HashMap<BufferId, OpenBuffer>,
58 downstream_client: Option<(AnyProtoClient, u64)>,
59 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
60}
61
62#[derive(Hash, Eq, PartialEq, Clone)]
63struct SharedBuffer {
64 buffer: Entity<Buffer>,
65 diff: Option<Entity<BufferDiff>>,
66 lsp_handle: Option<OpenLspBufferHandle>,
67}
68
69#[derive(Default)]
70struct BufferDiffState {
71 unstaged_diff: Option<WeakEntity<BufferDiff>>,
72 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
73 recalculate_diff_task: Option<Task<Result<()>>>,
74 language: Option<Arc<Language>>,
75 language_registry: Option<Arc<LanguageRegistry>>,
76 diff_updated_futures: Vec<oneshot::Sender<()>>,
77
78 head_text: Option<Arc<String>>,
79 index_text: Option<Arc<String>>,
80 head_changed: bool,
81 index_changed: bool,
82 language_changed: bool,
83}
84
85#[derive(Clone, Debug)]
86enum DiffBasesChange {
87 SetIndex(Option<String>),
88 SetHead(Option<String>),
89 SetEach {
90 index: Option<String>,
91 head: Option<String>,
92 },
93 SetBoth(Option<String>),
94}
95
96impl BufferDiffState {
97 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
98 self.language = buffer.read(cx).language().cloned();
99 self.language_changed = true;
100 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
101 }
102
103 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
104 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
105 }
106
107 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
108 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
109 }
110
111 fn handle_base_texts_updated(
112 &mut self,
113 buffer: text::BufferSnapshot,
114 message: proto::UpdateDiffBases,
115 cx: &mut Context<Self>,
116 ) {
117 use proto::update_diff_bases::Mode;
118
119 let Some(mode) = Mode::from_i32(message.mode) else {
120 return;
121 };
122
123 let diff_bases_change = match mode {
124 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
125 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
126 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
127 Mode::IndexAndHead => DiffBasesChange::SetEach {
128 index: message.staged_text,
129 head: message.committed_text,
130 },
131 };
132
133 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
134 }
135
136 fn diff_bases_changed(
137 &mut self,
138 buffer: text::BufferSnapshot,
139 diff_bases_change: DiffBasesChange,
140 cx: &mut Context<Self>,
141 ) -> oneshot::Receiver<()> {
142 match diff_bases_change {
143 DiffBasesChange::SetIndex(index) => {
144 self.index_text = index.map(|mut index| {
145 text::LineEnding::normalize(&mut index);
146 Arc::new(index)
147 });
148 self.index_changed = true;
149 }
150 DiffBasesChange::SetHead(head) => {
151 self.head_text = head.map(|mut head| {
152 text::LineEnding::normalize(&mut head);
153 Arc::new(head)
154 });
155 self.head_changed = true;
156 }
157 DiffBasesChange::SetBoth(text) => {
158 let text = text.map(|mut text| {
159 text::LineEnding::normalize(&mut text);
160 Arc::new(text)
161 });
162 self.head_text = text.clone();
163 self.index_text = text;
164 self.head_changed = true;
165 self.index_changed = true;
166 }
167 DiffBasesChange::SetEach { index, head } => {
168 self.index_text = index.map(|mut index| {
169 text::LineEnding::normalize(&mut index);
170 Arc::new(index)
171 });
172 self.index_changed = true;
173 self.head_text = head.map(|mut head| {
174 text::LineEnding::normalize(&mut head);
175 Arc::new(head)
176 });
177 self.head_changed = true;
178 }
179 }
180
181 self.recalculate_diffs(buffer, cx)
182 }
183
184 fn recalculate_diffs(
185 &mut self,
186 buffer: text::BufferSnapshot,
187 cx: &mut Context<Self>,
188 ) -> oneshot::Receiver<()> {
189 let (tx, rx) = oneshot::channel();
190 self.diff_updated_futures.push(tx);
191
192 let language = self.language.clone();
193 let language_registry = self.language_registry.clone();
194 let unstaged_diff = self.unstaged_diff();
195 let uncommitted_diff = self.uncommitted_diff();
196 let head = self.head_text.clone();
197 let index = self.index_text.clone();
198 let index_changed = self.index_changed;
199 let head_changed = self.head_changed;
200 let language_changed = self.language_changed;
201 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
202 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
203 (None, None) => true,
204 _ => false,
205 };
206 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
207 if let Some(unstaged_diff) = &unstaged_diff {
208 let snapshot = if index_changed || language_changed {
209 cx.update(|cx| {
210 BufferDiffSnapshot::build(
211 buffer.clone(),
212 index,
213 language.clone(),
214 language_registry.clone(),
215 cx,
216 )
217 })?
218 .await
219 } else {
220 unstaged_diff
221 .read_with(&cx, |changes, cx| {
222 BufferDiffSnapshot::build_with_base_buffer(
223 buffer.clone(),
224 index,
225 changes.snapshot.base_text.clone(),
226 cx,
227 )
228 })?
229 .await
230 };
231
232 unstaged_diff.update(&mut cx, |unstaged_diff, cx| {
233 unstaged_diff.set_state(snapshot, &buffer, cx);
234 if language_changed {
235 cx.emit(BufferDiffEvent::LanguageChanged);
236 }
237 })?;
238 }
239
240 if let Some(uncommitted_diff) = &uncommitted_diff {
241 let snapshot =
242 if let (Some(unstaged_diff), true) = (&unstaged_diff, index_matches_head) {
243 unstaged_diff.read_with(&cx, |diff, _| diff.snapshot.clone())?
244 } else if head_changed || language_changed {
245 cx.update(|cx| {
246 BufferDiffSnapshot::build(
247 buffer.clone(),
248 head,
249 language.clone(),
250 language_registry.clone(),
251 cx,
252 )
253 })?
254 .await
255 } else {
256 uncommitted_diff
257 .read_with(&cx, |changes, cx| {
258 BufferDiffSnapshot::build_with_base_buffer(
259 buffer.clone(),
260 head,
261 changes.snapshot.base_text.clone(),
262 cx,
263 )
264 })?
265 .await
266 };
267
268 uncommitted_diff.update(&mut cx, |diff, cx| {
269 diff.set_state(snapshot, &buffer, cx);
270 if language_changed {
271 cx.emit(BufferDiffEvent::LanguageChanged);
272 }
273 })?;
274 }
275
276 if let Some(this) = this.upgrade() {
277 this.update(&mut cx, |this, _| {
278 this.index_changed = false;
279 this.head_changed = false;
280 for tx in this.diff_updated_futures.drain(..) {
281 tx.send(()).ok();
282 }
283 })?;
284 }
285
286 Ok(())
287 }));
288
289 rx
290 }
291}
292
293enum BufferStoreState {
294 Local(LocalBufferStore),
295 Remote(RemoteBufferStore),
296}
297
298struct RemoteBufferStore {
299 shared_with_me: HashSet<Entity<Buffer>>,
300 upstream_client: AnyProtoClient,
301 project_id: u64,
302 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
303 remote_buffer_listeners:
304 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
305 worktree_store: Entity<WorktreeStore>,
306}
307
308struct LocalBufferStore {
309 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
310 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
311 worktree_store: Entity<WorktreeStore>,
312 _subscription: Subscription,
313}
314
315enum OpenBuffer {
316 Complete {
317 buffer: WeakEntity<Buffer>,
318 diff_state: Entity<BufferDiffState>,
319 },
320 Operations(Vec<Operation>),
321}
322
323pub enum BufferStoreEvent {
324 BufferAdded(Entity<Buffer>),
325 BufferDropped(BufferId),
326 BufferChangedFilePath {
327 buffer: Entity<Buffer>,
328 old_file: Option<Arc<dyn language::File>>,
329 },
330}
331
332#[derive(Default, Debug)]
333pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
334
335impl EventEmitter<BufferStoreEvent> for BufferStore {}
336
337impl RemoteBufferStore {
338 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
339 let project_id = self.project_id;
340 let client = self.upstream_client.clone();
341 cx.background_executor().spawn(async move {
342 let response = client
343 .request(proto::OpenUnstagedDiff {
344 project_id,
345 buffer_id: buffer_id.to_proto(),
346 })
347 .await?;
348 Ok(response.staged_text)
349 })
350 }
351
352 fn open_uncommitted_diff(
353 &self,
354 buffer_id: BufferId,
355 cx: &App,
356 ) -> Task<Result<DiffBasesChange>> {
357 use proto::open_uncommitted_diff_response::Mode;
358
359 let project_id = self.project_id;
360 let client = self.upstream_client.clone();
361 cx.background_executor().spawn(async move {
362 let response = client
363 .request(proto::OpenUncommittedDiff {
364 project_id,
365 buffer_id: buffer_id.to_proto(),
366 })
367 .await?;
368 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
369 let bases = match mode {
370 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
371 Mode::IndexAndHead => DiffBasesChange::SetEach {
372 head: response.committed_text,
373 index: response.staged_text,
374 },
375 };
376 Ok(bases)
377 })
378 }
379
380 pub fn wait_for_remote_buffer(
381 &mut self,
382 id: BufferId,
383 cx: &mut Context<BufferStore>,
384 ) -> Task<Result<Entity<Buffer>>> {
385 let (tx, rx) = oneshot::channel();
386 self.remote_buffer_listeners.entry(id).or_default().push(tx);
387
388 cx.spawn(|this, cx| async move {
389 if let Some(buffer) = this
390 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
391 .ok()
392 .flatten()
393 {
394 return Ok(buffer);
395 }
396
397 cx.background_executor()
398 .spawn(async move { rx.await? })
399 .await
400 })
401 }
402
403 fn save_remote_buffer(
404 &self,
405 buffer_handle: Entity<Buffer>,
406 new_path: Option<proto::ProjectPath>,
407 cx: &Context<BufferStore>,
408 ) -> Task<Result<()>> {
409 let buffer = buffer_handle.read(cx);
410 let buffer_id = buffer.remote_id().into();
411 let version = buffer.version();
412 let rpc = self.upstream_client.clone();
413 let project_id = self.project_id;
414 cx.spawn(move |_, mut cx| async move {
415 let response = rpc
416 .request(proto::SaveBuffer {
417 project_id,
418 buffer_id,
419 new_path,
420 version: serialize_version(&version),
421 })
422 .await?;
423 let version = deserialize_version(&response.version);
424 let mtime = response.mtime.map(|mtime| mtime.into());
425
426 buffer_handle.update(&mut cx, |buffer, cx| {
427 buffer.did_save(version.clone(), mtime, cx);
428 })?;
429
430 Ok(())
431 })
432 }
433
434 pub fn handle_create_buffer_for_peer(
435 &mut self,
436 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
437 replica_id: u16,
438 capability: Capability,
439 cx: &mut Context<BufferStore>,
440 ) -> Result<Option<Entity<Buffer>>> {
441 match envelope
442 .payload
443 .variant
444 .ok_or_else(|| anyhow!("missing variant"))?
445 {
446 proto::create_buffer_for_peer::Variant::State(mut state) => {
447 let buffer_id = BufferId::new(state.id)?;
448
449 let buffer_result = maybe!({
450 let mut buffer_file = None;
451 if let Some(file) = state.file.take() {
452 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
453 let worktree = self
454 .worktree_store
455 .read(cx)
456 .worktree_for_id(worktree_id, cx)
457 .ok_or_else(|| {
458 anyhow!("no worktree found for id {}", file.worktree_id)
459 })?;
460 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
461 as Arc<dyn language::File>);
462 }
463 Buffer::from_proto(replica_id, capability, state, buffer_file)
464 });
465
466 match buffer_result {
467 Ok(buffer) => {
468 let buffer = cx.new(|_| buffer);
469 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
470 }
471 Err(error) => {
472 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
473 for listener in listeners {
474 listener.send(Err(anyhow!(error.cloned()))).ok();
475 }
476 }
477 }
478 }
479 }
480 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
481 let buffer_id = BufferId::new(chunk.buffer_id)?;
482 let buffer = self
483 .loading_remote_buffers_by_id
484 .get(&buffer_id)
485 .cloned()
486 .ok_or_else(|| {
487 anyhow!(
488 "received chunk for buffer {} without initial state",
489 chunk.buffer_id
490 )
491 })?;
492
493 let result = maybe!({
494 let operations = chunk
495 .operations
496 .into_iter()
497 .map(language::proto::deserialize_operation)
498 .collect::<Result<Vec<_>>>()?;
499 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
500 anyhow::Ok(())
501 });
502
503 if let Err(error) = result {
504 self.loading_remote_buffers_by_id.remove(&buffer_id);
505 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
506 for listener in listeners {
507 listener.send(Err(error.cloned())).ok();
508 }
509 }
510 } else if chunk.is_last {
511 self.loading_remote_buffers_by_id.remove(&buffer_id);
512 if self.upstream_client.is_via_collab() {
513 // retain buffers sent by peers to avoid races.
514 self.shared_with_me.insert(buffer.clone());
515 }
516
517 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
518 for sender in senders {
519 sender.send(Ok(buffer.clone())).ok();
520 }
521 }
522 return Ok(Some(buffer));
523 }
524 }
525 }
526 return Ok(None);
527 }
528
529 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
530 self.loading_remote_buffers_by_id
531 .keys()
532 .copied()
533 .collect::<Vec<_>>()
534 }
535
536 pub fn deserialize_project_transaction(
537 &self,
538 message: proto::ProjectTransaction,
539 push_to_history: bool,
540 cx: &mut Context<BufferStore>,
541 ) -> Task<Result<ProjectTransaction>> {
542 cx.spawn(|this, mut cx| async move {
543 let mut project_transaction = ProjectTransaction::default();
544 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
545 {
546 let buffer_id = BufferId::new(buffer_id)?;
547 let buffer = this
548 .update(&mut cx, |this, cx| {
549 this.wait_for_remote_buffer(buffer_id, cx)
550 })?
551 .await?;
552 let transaction = language::proto::deserialize_transaction(transaction)?;
553 project_transaction.0.insert(buffer, transaction);
554 }
555
556 for (buffer, transaction) in &project_transaction.0 {
557 buffer
558 .update(&mut cx, |buffer, _| {
559 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
560 })?
561 .await?;
562
563 if push_to_history {
564 buffer.update(&mut cx, |buffer, _| {
565 buffer.push_transaction(transaction.clone(), Instant::now());
566 })?;
567 }
568 }
569
570 Ok(project_transaction)
571 })
572 }
573
574 fn open_buffer(
575 &self,
576 path: Arc<Path>,
577 worktree: Entity<Worktree>,
578 cx: &mut Context<BufferStore>,
579 ) -> Task<Result<Entity<Buffer>>> {
580 let worktree_id = worktree.read(cx).id().to_proto();
581 let project_id = self.project_id;
582 let client = self.upstream_client.clone();
583 let path_string = path.clone().to_string_lossy().to_string();
584 cx.spawn(move |this, mut cx| async move {
585 let response = client
586 .request(proto::OpenBufferByPath {
587 project_id,
588 worktree_id,
589 path: path_string,
590 })
591 .await?;
592 let buffer_id = BufferId::new(response.buffer_id)?;
593
594 let buffer = this
595 .update(&mut cx, {
596 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
597 })?
598 .await?;
599
600 Ok(buffer)
601 })
602 }
603
604 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
605 let create = self.upstream_client.request(proto::OpenNewBuffer {
606 project_id: self.project_id,
607 });
608 cx.spawn(|this, mut cx| async move {
609 let response = create.await?;
610 let buffer_id = BufferId::new(response.buffer_id)?;
611
612 this.update(&mut cx, |this, cx| {
613 this.wait_for_remote_buffer(buffer_id, cx)
614 })?
615 .await
616 })
617 }
618
619 fn reload_buffers(
620 &self,
621 buffers: HashSet<Entity<Buffer>>,
622 push_to_history: bool,
623 cx: &mut Context<BufferStore>,
624 ) -> Task<Result<ProjectTransaction>> {
625 let request = self.upstream_client.request(proto::ReloadBuffers {
626 project_id: self.project_id,
627 buffer_ids: buffers
628 .iter()
629 .map(|buffer| buffer.read(cx).remote_id().to_proto())
630 .collect(),
631 });
632
633 cx.spawn(|this, mut cx| async move {
634 let response = request
635 .await?
636 .transaction
637 .ok_or_else(|| anyhow!("missing transaction"))?;
638 this.update(&mut cx, |this, cx| {
639 this.deserialize_project_transaction(response, push_to_history, cx)
640 })?
641 .await
642 })
643 }
644}
645
646impl LocalBufferStore {
647 fn worktree_for_buffer(
648 &self,
649 buffer: &Entity<Buffer>,
650 cx: &App,
651 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
652 let file = buffer.read(cx).file()?;
653 let worktree_id = file.worktree_id(cx);
654 let path = file.path().clone();
655 let worktree = self
656 .worktree_store
657 .read(cx)
658 .worktree_for_id(worktree_id, cx)?;
659 Some((worktree, path))
660 }
661
662 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
663 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
664 worktree.read(cx).load_staged_file(path.as_ref(), cx)
665 } else {
666 return Task::ready(Err(anyhow!("no such worktree")));
667 }
668 }
669
670 fn load_committed_text(
671 &self,
672 buffer: &Entity<Buffer>,
673 cx: &App,
674 ) -> Task<Result<Option<String>>> {
675 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
676 worktree.read(cx).load_committed_file(path.as_ref(), cx)
677 } else {
678 Task::ready(Err(anyhow!("no such worktree")))
679 }
680 }
681
682 fn save_local_buffer(
683 &self,
684 buffer_handle: Entity<Buffer>,
685 worktree: Entity<Worktree>,
686 path: Arc<Path>,
687 mut has_changed_file: bool,
688 cx: &mut Context<BufferStore>,
689 ) -> Task<Result<()>> {
690 let buffer = buffer_handle.read(cx);
691
692 let text = buffer.as_rope().clone();
693 let line_ending = buffer.line_ending();
694 let version = buffer.version();
695 let buffer_id = buffer.remote_id();
696 if buffer
697 .file()
698 .is_some_and(|file| file.disk_state() == DiskState::New)
699 {
700 has_changed_file = true;
701 }
702
703 let save = worktree.update(cx, |worktree, cx| {
704 worktree.write_file(path.as_ref(), text, line_ending, cx)
705 });
706
707 cx.spawn(move |this, mut cx| async move {
708 let new_file = save.await?;
709 let mtime = new_file.disk_state().mtime();
710 this.update(&mut cx, |this, cx| {
711 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
712 if has_changed_file {
713 downstream_client
714 .send(proto::UpdateBufferFile {
715 project_id,
716 buffer_id: buffer_id.to_proto(),
717 file: Some(language::File::to_proto(&*new_file, cx)),
718 })
719 .log_err();
720 }
721 downstream_client
722 .send(proto::BufferSaved {
723 project_id,
724 buffer_id: buffer_id.to_proto(),
725 version: serialize_version(&version),
726 mtime: mtime.map(|time| time.into()),
727 })
728 .log_err();
729 }
730 })?;
731 buffer_handle.update(&mut cx, |buffer, cx| {
732 if has_changed_file {
733 buffer.file_updated(new_file, cx);
734 }
735 buffer.did_save(version.clone(), mtime, cx);
736 })
737 })
738 }
739
740 fn subscribe_to_worktree(
741 &mut self,
742 worktree: &Entity<Worktree>,
743 cx: &mut Context<BufferStore>,
744 ) {
745 cx.subscribe(worktree, |this, worktree, event, cx| {
746 if worktree.read(cx).is_local() {
747 match event {
748 worktree::Event::UpdatedEntries(changes) => {
749 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
750 }
751 worktree::Event::UpdatedGitRepositories(updated_repos) => {
752 Self::local_worktree_git_repos_changed(
753 this,
754 worktree.clone(),
755 updated_repos,
756 cx,
757 )
758 }
759 _ => {}
760 }
761 }
762 })
763 .detach();
764 }
765
766 fn local_worktree_entries_changed(
767 this: &mut BufferStore,
768 worktree_handle: &Entity<Worktree>,
769 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
770 cx: &mut Context<BufferStore>,
771 ) {
772 let snapshot = worktree_handle.read(cx).snapshot();
773 for (path, entry_id, _) in changes {
774 Self::local_worktree_entry_changed(
775 this,
776 *entry_id,
777 path,
778 worktree_handle,
779 &snapshot,
780 cx,
781 );
782 }
783 }
784
785 fn local_worktree_git_repos_changed(
786 this: &mut BufferStore,
787 worktree_handle: Entity<Worktree>,
788 changed_repos: &UpdatedGitRepositoriesSet,
789 cx: &mut Context<BufferStore>,
790 ) {
791 debug_assert!(worktree_handle.read(cx).is_local());
792
793 let mut diff_state_updates = Vec::new();
794 for buffer in this.opened_buffers.values() {
795 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
796 continue;
797 };
798 let Some(buffer) = buffer.upgrade() else {
799 continue;
800 };
801 let buffer = buffer.read(cx);
802 let Some(file) = File::from_dyn(buffer.file()) else {
803 continue;
804 };
805 if file.worktree != worktree_handle {
806 continue;
807 }
808 let diff_state = diff_state.read(cx);
809 if changed_repos
810 .iter()
811 .any(|(work_dir, _)| file.path.starts_with(work_dir))
812 {
813 let snapshot = buffer.text_snapshot();
814 diff_state_updates.push((
815 snapshot.clone(),
816 file.path.clone(),
817 diff_state
818 .unstaged_diff
819 .as_ref()
820 .and_then(|set| set.upgrade())
821 .is_some(),
822 diff_state
823 .uncommitted_diff
824 .as_ref()
825 .and_then(|set| set.upgrade())
826 .is_some(),
827 ))
828 }
829 }
830
831 if diff_state_updates.is_empty() {
832 return;
833 }
834
835 cx.spawn(move |this, mut cx| async move {
836 let snapshot =
837 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
838 let diff_bases_changes_by_buffer = cx
839 .background_executor()
840 .spawn(async move {
841 diff_state_updates
842 .into_iter()
843 .filter_map(
844 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
845 let local_repo = snapshot.local_repo_for_path(&path)?;
846 let relative_path = local_repo.relativize(&path).ok()?;
847 let staged_text = if needs_staged_text {
848 local_repo.repo().load_index_text(&relative_path)
849 } else {
850 None
851 };
852 let committed_text = if needs_committed_text {
853 local_repo.repo().load_committed_text(&relative_path)
854 } else {
855 None
856 };
857 let diff_bases_change =
858 match (needs_staged_text, needs_committed_text) {
859 (true, true) => Some(if staged_text == committed_text {
860 DiffBasesChange::SetBoth(committed_text)
861 } else {
862 DiffBasesChange::SetEach {
863 index: staged_text,
864 head: committed_text,
865 }
866 }),
867 (true, false) => {
868 Some(DiffBasesChange::SetIndex(staged_text))
869 }
870 (false, true) => {
871 Some(DiffBasesChange::SetHead(committed_text))
872 }
873 (false, false) => None,
874 };
875 Some((buffer_snapshot, diff_bases_change))
876 },
877 )
878 .collect::<Vec<_>>()
879 })
880 .await;
881
882 this.update(&mut cx, |this, cx| {
883 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
884 let Some(OpenBuffer::Complete { diff_state, .. }) =
885 this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
886 else {
887 continue;
888 };
889 let Some(diff_bases_change) = diff_bases_change else {
890 continue;
891 };
892
893 diff_state.update(cx, |diff_state, cx| {
894 use proto::update_diff_bases::Mode;
895
896 if let Some((client, project_id)) = this.downstream_client.as_ref() {
897 let buffer_id = buffer_snapshot.remote_id().to_proto();
898 let (staged_text, committed_text, mode) = match diff_bases_change
899 .clone()
900 {
901 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
902 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
903 DiffBasesChange::SetEach { index, head } => {
904 (index, head, Mode::IndexAndHead)
905 }
906 DiffBasesChange::SetBoth(text) => {
907 (None, text, Mode::IndexMatchesHead)
908 }
909 };
910 let message = proto::UpdateDiffBases {
911 project_id: *project_id,
912 buffer_id,
913 staged_text,
914 committed_text,
915 mode: mode as i32,
916 };
917
918 client.send(message).log_err();
919 }
920
921 let _ =
922 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
923 });
924 }
925 })
926 })
927 .detach_and_log_err(cx);
928 }
929
930 fn local_worktree_entry_changed(
931 this: &mut BufferStore,
932 entry_id: ProjectEntryId,
933 path: &Arc<Path>,
934 worktree: &Entity<worktree::Worktree>,
935 snapshot: &worktree::Snapshot,
936 cx: &mut Context<BufferStore>,
937 ) -> Option<()> {
938 let project_path = ProjectPath {
939 worktree_id: snapshot.id(),
940 path: path.clone(),
941 };
942
943 let buffer_id = {
944 let local = this.as_local_mut()?;
945 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
946 Some(&buffer_id) => buffer_id,
947 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
948 }
949 };
950
951 let buffer = if let Some(buffer) = this.get(buffer_id) {
952 Some(buffer)
953 } else {
954 this.opened_buffers.remove(&buffer_id);
955 None
956 };
957
958 let buffer = if let Some(buffer) = buffer {
959 buffer
960 } else {
961 let this = this.as_local_mut()?;
962 this.local_buffer_ids_by_path.remove(&project_path);
963 this.local_buffer_ids_by_entry_id.remove(&entry_id);
964 return None;
965 };
966
967 let events = buffer.update(cx, |buffer, cx| {
968 let local = this.as_local_mut()?;
969 let file = buffer.file()?;
970 let old_file = File::from_dyn(Some(file))?;
971 if old_file.worktree != *worktree {
972 return None;
973 }
974
975 let snapshot_entry = old_file
976 .entry_id
977 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
978 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
979
980 let new_file = if let Some(entry) = snapshot_entry {
981 File {
982 disk_state: match entry.mtime {
983 Some(mtime) => DiskState::Present { mtime },
984 None => old_file.disk_state,
985 },
986 is_local: true,
987 entry_id: Some(entry.id),
988 path: entry.path.clone(),
989 worktree: worktree.clone(),
990 is_private: entry.is_private,
991 }
992 } else {
993 File {
994 disk_state: DiskState::Deleted,
995 is_local: true,
996 entry_id: old_file.entry_id,
997 path: old_file.path.clone(),
998 worktree: worktree.clone(),
999 is_private: old_file.is_private,
1000 }
1001 };
1002
1003 if new_file == *old_file {
1004 return None;
1005 }
1006
1007 let mut events = Vec::new();
1008 if new_file.path != old_file.path {
1009 local.local_buffer_ids_by_path.remove(&ProjectPath {
1010 path: old_file.path.clone(),
1011 worktree_id: old_file.worktree_id(cx),
1012 });
1013 local.local_buffer_ids_by_path.insert(
1014 ProjectPath {
1015 worktree_id: new_file.worktree_id(cx),
1016 path: new_file.path.clone(),
1017 },
1018 buffer_id,
1019 );
1020 events.push(BufferStoreEvent::BufferChangedFilePath {
1021 buffer: cx.entity(),
1022 old_file: buffer.file().cloned(),
1023 });
1024 }
1025
1026 if new_file.entry_id != old_file.entry_id {
1027 if let Some(entry_id) = old_file.entry_id {
1028 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1029 }
1030 if let Some(entry_id) = new_file.entry_id {
1031 local
1032 .local_buffer_ids_by_entry_id
1033 .insert(entry_id, buffer_id);
1034 }
1035 }
1036
1037 if let Some((client, project_id)) = &this.downstream_client {
1038 client
1039 .send(proto::UpdateBufferFile {
1040 project_id: *project_id,
1041 buffer_id: buffer_id.to_proto(),
1042 file: Some(new_file.to_proto(cx)),
1043 })
1044 .ok();
1045 }
1046
1047 buffer.file_updated(Arc::new(new_file), cx);
1048 Some(events)
1049 })?;
1050
1051 for event in events {
1052 cx.emit(event);
1053 }
1054
1055 None
1056 }
1057
1058 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1059 let file = File::from_dyn(buffer.read(cx).file())?;
1060
1061 let remote_id = buffer.read(cx).remote_id();
1062 if let Some(entry_id) = file.entry_id {
1063 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1064 Some(_) => {
1065 return None;
1066 }
1067 None => {
1068 self.local_buffer_ids_by_entry_id
1069 .insert(entry_id, remote_id);
1070 }
1071 }
1072 };
1073 self.local_buffer_ids_by_path.insert(
1074 ProjectPath {
1075 worktree_id: file.worktree_id(cx),
1076 path: file.path.clone(),
1077 },
1078 remote_id,
1079 );
1080
1081 Some(())
1082 }
1083
1084 fn save_buffer(
1085 &self,
1086 buffer: Entity<Buffer>,
1087 cx: &mut Context<BufferStore>,
1088 ) -> Task<Result<()>> {
1089 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1090 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1091 };
1092 let worktree = file.worktree.clone();
1093 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1094 }
1095
1096 fn save_buffer_as(
1097 &self,
1098 buffer: Entity<Buffer>,
1099 path: ProjectPath,
1100 cx: &mut Context<BufferStore>,
1101 ) -> Task<Result<()>> {
1102 let Some(worktree) = self
1103 .worktree_store
1104 .read(cx)
1105 .worktree_for_id(path.worktree_id, cx)
1106 else {
1107 return Task::ready(Err(anyhow!("no such worktree")));
1108 };
1109 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1110 }
1111
1112 fn open_buffer(
1113 &self,
1114 path: Arc<Path>,
1115 worktree: Entity<Worktree>,
1116 cx: &mut Context<BufferStore>,
1117 ) -> Task<Result<Entity<Buffer>>> {
1118 let load_buffer = worktree.update(cx, |worktree, cx| {
1119 let load_file = worktree.load_file(path.as_ref(), cx);
1120 let reservation = cx.reserve_entity();
1121 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1122 cx.spawn(move |_, mut cx| async move {
1123 let loaded = load_file.await?;
1124 let text_buffer = cx
1125 .background_executor()
1126 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1127 .await;
1128 cx.insert_entity(reservation, |_| {
1129 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1130 })
1131 })
1132 });
1133
1134 cx.spawn(move |this, mut cx| async move {
1135 let buffer = match load_buffer.await {
1136 Ok(buffer) => Ok(buffer),
1137 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1138 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1139 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1140 Buffer::build(
1141 text_buffer,
1142 Some(Arc::new(File {
1143 worktree,
1144 path,
1145 disk_state: DiskState::New,
1146 entry_id: None,
1147 is_local: true,
1148 is_private: false,
1149 })),
1150 Capability::ReadWrite,
1151 )
1152 }),
1153 Err(e) => Err(e),
1154 }?;
1155 this.update(&mut cx, |this, cx| {
1156 this.add_buffer(buffer.clone(), cx)?;
1157 let buffer_id = buffer.read(cx).remote_id();
1158 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1159 let this = this.as_local_mut().unwrap();
1160 this.local_buffer_ids_by_path.insert(
1161 ProjectPath {
1162 worktree_id: file.worktree_id(cx),
1163 path: file.path.clone(),
1164 },
1165 buffer_id,
1166 );
1167
1168 if let Some(entry_id) = file.entry_id {
1169 this.local_buffer_ids_by_entry_id
1170 .insert(entry_id, buffer_id);
1171 }
1172 }
1173
1174 anyhow::Ok(())
1175 })??;
1176
1177 Ok(buffer)
1178 })
1179 }
1180
1181 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1182 cx.spawn(|buffer_store, mut cx| async move {
1183 let buffer =
1184 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1185 buffer_store.update(&mut cx, |buffer_store, cx| {
1186 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1187 })?;
1188 Ok(buffer)
1189 })
1190 }
1191
1192 fn reload_buffers(
1193 &self,
1194 buffers: HashSet<Entity<Buffer>>,
1195 push_to_history: bool,
1196 cx: &mut Context<BufferStore>,
1197 ) -> Task<Result<ProjectTransaction>> {
1198 cx.spawn(move |_, mut cx| async move {
1199 let mut project_transaction = ProjectTransaction::default();
1200 for buffer in buffers {
1201 let transaction = buffer
1202 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1203 .await?;
1204 buffer.update(&mut cx, |buffer, cx| {
1205 if let Some(transaction) = transaction {
1206 if !push_to_history {
1207 buffer.forget_transaction(transaction.id);
1208 }
1209 project_transaction.0.insert(cx.entity(), transaction);
1210 }
1211 })?;
1212 }
1213
1214 Ok(project_transaction)
1215 })
1216 }
1217}
1218
1219impl BufferStore {
1220 pub fn init(client: &AnyProtoClient) {
1221 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1222 client.add_entity_message_handler(Self::handle_buffer_saved);
1223 client.add_entity_message_handler(Self::handle_update_buffer_file);
1224 client.add_entity_request_handler(Self::handle_save_buffer);
1225 client.add_entity_request_handler(Self::handle_blame_buffer);
1226 client.add_entity_request_handler(Self::handle_reload_buffers);
1227 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1228 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1229 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1230 client.add_entity_message_handler(Self::handle_update_diff_bases);
1231 }
1232
1233 /// Creates a buffer store, optionally retaining its buffers.
1234 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1235 Self {
1236 state: BufferStoreState::Local(LocalBufferStore {
1237 local_buffer_ids_by_path: Default::default(),
1238 local_buffer_ids_by_entry_id: Default::default(),
1239 worktree_store: worktree_store.clone(),
1240 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1241 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1242 let this = this.as_local_mut().unwrap();
1243 this.subscribe_to_worktree(worktree, cx);
1244 }
1245 }),
1246 }),
1247 downstream_client: None,
1248 opened_buffers: Default::default(),
1249 shared_buffers: Default::default(),
1250 loading_buffers: Default::default(),
1251 loading_diffs: Default::default(),
1252 worktree_store,
1253 }
1254 }
1255
1256 pub fn remote(
1257 worktree_store: Entity<WorktreeStore>,
1258 upstream_client: AnyProtoClient,
1259 remote_id: u64,
1260 _cx: &mut Context<Self>,
1261 ) -> Self {
1262 Self {
1263 state: BufferStoreState::Remote(RemoteBufferStore {
1264 shared_with_me: Default::default(),
1265 loading_remote_buffers_by_id: Default::default(),
1266 remote_buffer_listeners: Default::default(),
1267 project_id: remote_id,
1268 upstream_client,
1269 worktree_store: worktree_store.clone(),
1270 }),
1271 downstream_client: None,
1272 opened_buffers: Default::default(),
1273 loading_buffers: Default::default(),
1274 loading_diffs: Default::default(),
1275 shared_buffers: Default::default(),
1276 worktree_store,
1277 }
1278 }
1279
1280 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1281 match &mut self.state {
1282 BufferStoreState::Local(state) => Some(state),
1283 _ => None,
1284 }
1285 }
1286
1287 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1288 match &mut self.state {
1289 BufferStoreState::Remote(state) => Some(state),
1290 _ => None,
1291 }
1292 }
1293
1294 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1295 match &self.state {
1296 BufferStoreState::Remote(state) => Some(state),
1297 _ => None,
1298 }
1299 }
1300
1301 pub fn open_buffer(
1302 &mut self,
1303 project_path: ProjectPath,
1304 cx: &mut Context<Self>,
1305 ) -> Task<Result<Entity<Buffer>>> {
1306 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1307 return Task::ready(Ok(buffer));
1308 }
1309
1310 let task = match self.loading_buffers.entry(project_path.clone()) {
1311 hash_map::Entry::Occupied(e) => e.get().clone(),
1312 hash_map::Entry::Vacant(entry) => {
1313 let path = project_path.path.clone();
1314 let Some(worktree) = self
1315 .worktree_store
1316 .read(cx)
1317 .worktree_for_id(project_path.worktree_id, cx)
1318 else {
1319 return Task::ready(Err(anyhow!("no such worktree")));
1320 };
1321 let load_buffer = match &self.state {
1322 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1323 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1324 };
1325
1326 entry
1327 .insert(
1328 cx.spawn(move |this, mut cx| async move {
1329 let load_result = load_buffer.await;
1330 this.update(&mut cx, |this, _cx| {
1331 // Record the fact that the buffer is no longer loading.
1332 this.loading_buffers.remove(&project_path);
1333 })
1334 .ok();
1335 load_result.map_err(Arc::new)
1336 })
1337 .shared(),
1338 )
1339 .clone()
1340 }
1341 };
1342
1343 cx.background_executor()
1344 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1345 }
1346
1347 pub fn open_unstaged_diff(
1348 &mut self,
1349 buffer: Entity<Buffer>,
1350 cx: &mut Context<Self>,
1351 ) -> Task<Result<Entity<BufferDiff>>> {
1352 let buffer_id = buffer.read(cx).remote_id();
1353 if let Some(diff) = self.get_unstaged_diff(buffer_id, cx) {
1354 return Task::ready(Ok(diff));
1355 }
1356
1357 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1358 hash_map::Entry::Occupied(e) => e.get().clone(),
1359 hash_map::Entry::Vacant(entry) => {
1360 let staged_text = match &self.state {
1361 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1362 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1363 };
1364
1365 entry
1366 .insert(
1367 cx.spawn(move |this, cx| async move {
1368 Self::open_diff_internal(
1369 this,
1370 DiffKind::Unstaged,
1371 staged_text.await.map(DiffBasesChange::SetIndex),
1372 buffer,
1373 cx,
1374 )
1375 .await
1376 .map_err(Arc::new)
1377 })
1378 .shared(),
1379 )
1380 .clone()
1381 }
1382 };
1383
1384 cx.background_executor()
1385 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1386 }
1387
1388 pub fn open_uncommitted_diff(
1389 &mut self,
1390 buffer: Entity<Buffer>,
1391 cx: &mut Context<Self>,
1392 ) -> Task<Result<Entity<BufferDiff>>> {
1393 let buffer_id = buffer.read(cx).remote_id();
1394 if let Some(diff) = self.get_uncommitted_diff(buffer_id, cx) {
1395 return Task::ready(Ok(diff));
1396 }
1397
1398 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1399 hash_map::Entry::Occupied(e) => e.get().clone(),
1400 hash_map::Entry::Vacant(entry) => {
1401 let changes = match &self.state {
1402 BufferStoreState::Local(this) => {
1403 let committed_text = this.load_committed_text(&buffer, cx);
1404 let staged_text = this.load_staged_text(&buffer, cx);
1405 cx.background_executor().spawn(async move {
1406 let committed_text = committed_text.await?;
1407 let staged_text = staged_text.await?;
1408 let diff_bases_change = if committed_text == staged_text {
1409 DiffBasesChange::SetBoth(committed_text)
1410 } else {
1411 DiffBasesChange::SetEach {
1412 index: staged_text,
1413 head: committed_text,
1414 }
1415 };
1416 Ok(diff_bases_change)
1417 })
1418 }
1419 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1420 };
1421
1422 entry
1423 .insert(
1424 cx.spawn(move |this, cx| async move {
1425 Self::open_diff_internal(
1426 this,
1427 DiffKind::Uncommitted,
1428 changes.await,
1429 buffer,
1430 cx,
1431 )
1432 .await
1433 .map_err(Arc::new)
1434 })
1435 .shared(),
1436 )
1437 .clone()
1438 }
1439 };
1440
1441 cx.background_executor()
1442 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1443 }
1444
1445 async fn open_diff_internal(
1446 this: WeakEntity<Self>,
1447 kind: DiffKind,
1448 texts: Result<DiffBasesChange>,
1449 buffer_entity: Entity<Buffer>,
1450 mut cx: AsyncApp,
1451 ) -> Result<Entity<BufferDiff>> {
1452 let diff_bases_change = match texts {
1453 Err(e) => {
1454 this.update(&mut cx, |this, cx| {
1455 let buffer = buffer_entity.read(cx);
1456 let buffer_id = buffer.remote_id();
1457 this.loading_diffs.remove(&(buffer_id, kind));
1458 })?;
1459 return Err(e);
1460 }
1461 Ok(change) => change,
1462 };
1463
1464 this.update(&mut cx, |this, cx| {
1465 let buffer = buffer_entity.read(cx);
1466 let buffer_id = buffer.remote_id();
1467 let language = buffer.language().cloned();
1468 let language_registry = buffer.language_registry();
1469 let text_snapshot = buffer.text_snapshot();
1470 this.loading_diffs.remove(&(buffer_id, kind));
1471
1472 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1473 this.opened_buffers.get_mut(&buffer_id)
1474 {
1475 diff_state.update(cx, |diff_state, cx| {
1476 diff_state.language = language;
1477 diff_state.language_registry = language_registry;
1478
1479 let diff = cx.new(|_| BufferDiff {
1480 buffer_id,
1481 snapshot: BufferDiffSnapshot::new(&text_snapshot),
1482 unstaged_diff: None,
1483 });
1484 match kind {
1485 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1486 DiffKind::Uncommitted => {
1487 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1488 diff
1489 } else {
1490 let unstaged_diff = cx.new(|_| BufferDiff {
1491 buffer_id,
1492 snapshot: BufferDiffSnapshot::new(&text_snapshot),
1493 unstaged_diff: None,
1494 });
1495 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1496 unstaged_diff
1497 };
1498
1499 diff.update(cx, |diff, _| {
1500 diff.unstaged_diff = Some(unstaged_diff);
1501 });
1502 diff_state.uncommitted_diff = Some(diff.downgrade())
1503 }
1504 };
1505
1506 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1507
1508 Ok(async move {
1509 rx.await.ok();
1510 Ok(diff)
1511 })
1512 })
1513 } else {
1514 Err(anyhow!("buffer was closed"))
1515 }
1516 })??
1517 .await
1518 }
1519
1520 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1521 match &self.state {
1522 BufferStoreState::Local(this) => this.create_buffer(cx),
1523 BufferStoreState::Remote(this) => this.create_buffer(cx),
1524 }
1525 }
1526
1527 pub fn save_buffer(
1528 &mut self,
1529 buffer: Entity<Buffer>,
1530 cx: &mut Context<Self>,
1531 ) -> Task<Result<()>> {
1532 match &mut self.state {
1533 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1534 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1535 }
1536 }
1537
1538 pub fn save_buffer_as(
1539 &mut self,
1540 buffer: Entity<Buffer>,
1541 path: ProjectPath,
1542 cx: &mut Context<Self>,
1543 ) -> Task<Result<()>> {
1544 let old_file = buffer.read(cx).file().cloned();
1545 let task = match &self.state {
1546 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1547 BufferStoreState::Remote(this) => {
1548 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1549 }
1550 };
1551 cx.spawn(|this, mut cx| async move {
1552 task.await?;
1553 this.update(&mut cx, |_, cx| {
1554 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1555 })
1556 })
1557 }
1558
1559 pub fn blame_buffer(
1560 &self,
1561 buffer: &Entity<Buffer>,
1562 version: Option<clock::Global>,
1563 cx: &App,
1564 ) -> Task<Result<Option<Blame>>> {
1565 let buffer = buffer.read(cx);
1566 let Some(file) = File::from_dyn(buffer.file()) else {
1567 return Task::ready(Err(anyhow!("buffer has no file")));
1568 };
1569
1570 match file.worktree.clone().read(cx) {
1571 Worktree::Local(worktree) => {
1572 let worktree = worktree.snapshot();
1573 let blame_params = maybe!({
1574 let local_repo = match worktree.local_repo_for_path(&file.path) {
1575 Some(repo_for_path) => repo_for_path,
1576 None => return Ok(None),
1577 };
1578
1579 let relative_path = local_repo
1580 .relativize(&file.path)
1581 .context("failed to relativize buffer path")?;
1582
1583 let repo = local_repo.repo().clone();
1584
1585 let content = match version {
1586 Some(version) => buffer.rope_for_version(&version).clone(),
1587 None => buffer.as_rope().clone(),
1588 };
1589
1590 anyhow::Ok(Some((repo, relative_path, content)))
1591 });
1592
1593 cx.background_executor().spawn(async move {
1594 let Some((repo, relative_path, content)) = blame_params? else {
1595 return Ok(None);
1596 };
1597 repo.blame(&relative_path, content)
1598 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1599 .map(Some)
1600 })
1601 }
1602 Worktree::Remote(worktree) => {
1603 let buffer_id = buffer.remote_id();
1604 let version = buffer.version();
1605 let project_id = worktree.project_id();
1606 let client = worktree.client();
1607 cx.spawn(|_| async move {
1608 let response = client
1609 .request(proto::BlameBuffer {
1610 project_id,
1611 buffer_id: buffer_id.into(),
1612 version: serialize_version(&version),
1613 })
1614 .await?;
1615 Ok(deserialize_blame_buffer_response(response))
1616 })
1617 }
1618 }
1619 }
1620
1621 pub fn get_permalink_to_line(
1622 &self,
1623 buffer: &Entity<Buffer>,
1624 selection: Range<u32>,
1625 cx: &App,
1626 ) -> Task<Result<url::Url>> {
1627 let buffer = buffer.read(cx);
1628 let Some(file) = File::from_dyn(buffer.file()) else {
1629 return Task::ready(Err(anyhow!("buffer has no file")));
1630 };
1631
1632 match file.worktree.read(cx) {
1633 Worktree::Local(worktree) => {
1634 let worktree_path = worktree.abs_path().clone();
1635 let Some((repo_entry, repo)) =
1636 worktree.repository_for_path(file.path()).and_then(|entry| {
1637 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1638 Some((entry, repo))
1639 })
1640 else {
1641 // If we're not in a Git repo, check whether this is a Rust source
1642 // file in the Cargo registry (presumably opened with go-to-definition
1643 // from a normal Rust file). If so, we can put together a permalink
1644 // using crate metadata.
1645 if !buffer
1646 .language()
1647 .is_some_and(|lang| lang.name() == "Rust".into())
1648 {
1649 return Task::ready(Err(anyhow!("no permalink available")));
1650 }
1651 let file_path = worktree_path.join(file.path());
1652 return cx.spawn(|cx| async move {
1653 let provider_registry =
1654 cx.update(GitHostingProviderRegistry::default_global)?;
1655 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1656 .map_err(|_| anyhow!("no permalink available"))
1657 });
1658 };
1659
1660 let path = match repo_entry.relativize(file.path()) {
1661 Ok(RepoPath(path)) => path,
1662 Err(e) => return Task::ready(Err(e)),
1663 };
1664
1665 cx.spawn(|cx| async move {
1666 const REMOTE_NAME: &str = "origin";
1667 let origin_url = repo
1668 .remote_url(REMOTE_NAME)
1669 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1670
1671 let sha = repo
1672 .head_sha()
1673 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1674
1675 let provider_registry =
1676 cx.update(GitHostingProviderRegistry::default_global)?;
1677
1678 let (provider, remote) =
1679 parse_git_remote_url(provider_registry, &origin_url)
1680 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1681
1682 let path = path
1683 .to_str()
1684 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1685
1686 Ok(provider.build_permalink(
1687 remote,
1688 BuildPermalinkParams {
1689 sha: &sha,
1690 path,
1691 selection: Some(selection),
1692 },
1693 ))
1694 })
1695 }
1696 Worktree::Remote(worktree) => {
1697 let buffer_id = buffer.remote_id();
1698 let project_id = worktree.project_id();
1699 let client = worktree.client();
1700 cx.spawn(|_| async move {
1701 let response = client
1702 .request(proto::GetPermalinkToLine {
1703 project_id,
1704 buffer_id: buffer_id.into(),
1705 selection: Some(proto::Range {
1706 start: selection.start as u64,
1707 end: selection.end as u64,
1708 }),
1709 })
1710 .await?;
1711
1712 url::Url::parse(&response.permalink).context("failed to parse permalink")
1713 })
1714 }
1715 }
1716 }
1717
1718 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1719 let buffer = buffer_entity.read(cx);
1720 let language = buffer.language().cloned();
1721 let language_registry = buffer.language_registry();
1722 let remote_id = buffer.remote_id();
1723 let is_remote = buffer.replica_id() != 0;
1724 let open_buffer = OpenBuffer::Complete {
1725 buffer: buffer_entity.downgrade(),
1726 diff_state: cx.new(|_| BufferDiffState {
1727 language,
1728 language_registry,
1729 ..Default::default()
1730 }),
1731 };
1732
1733 let handle = cx.entity().downgrade();
1734 buffer_entity.update(cx, move |_, cx| {
1735 cx.on_release(move |buffer, cx| {
1736 handle
1737 .update(cx, |_, cx| {
1738 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1739 })
1740 .ok();
1741 })
1742 .detach()
1743 });
1744
1745 match self.opened_buffers.entry(remote_id) {
1746 hash_map::Entry::Vacant(entry) => {
1747 entry.insert(open_buffer);
1748 }
1749 hash_map::Entry::Occupied(mut entry) => {
1750 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1751 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1752 } else if entry.get().upgrade().is_some() {
1753 if is_remote {
1754 return Ok(());
1755 } else {
1756 debug_panic!("buffer {} was already registered", remote_id);
1757 Err(anyhow!("buffer {} was already registered", remote_id))?;
1758 }
1759 }
1760 entry.insert(open_buffer);
1761 }
1762 }
1763
1764 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1765 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1766 Ok(())
1767 }
1768
1769 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1770 self.opened_buffers
1771 .values()
1772 .filter_map(|buffer| buffer.upgrade())
1773 }
1774
1775 pub fn loading_buffers(
1776 &self,
1777 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1778 self.loading_buffers.iter().map(|(path, task)| {
1779 let task = task.clone();
1780 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1781 })
1782 }
1783
1784 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1785 self.buffers().find_map(|buffer| {
1786 let file = File::from_dyn(buffer.read(cx).file())?;
1787 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1788 Some(buffer)
1789 } else {
1790 None
1791 }
1792 })
1793 }
1794
1795 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1796 self.opened_buffers.get(&buffer_id)?.upgrade()
1797 }
1798
1799 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1800 self.get(buffer_id)
1801 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1802 }
1803
1804 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1805 self.get(buffer_id).or_else(|| {
1806 self.as_remote()
1807 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1808 })
1809 }
1810
1811 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1812 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1813 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1814 } else {
1815 None
1816 }
1817 }
1818
1819 pub fn get_uncommitted_diff(
1820 &self,
1821 buffer_id: BufferId,
1822 cx: &App,
1823 ) -> Option<Entity<BufferDiff>> {
1824 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1825 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1826 } else {
1827 None
1828 }
1829 }
1830
1831 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1832 let buffers = self
1833 .buffers()
1834 .map(|buffer| {
1835 let buffer = buffer.read(cx);
1836 proto::BufferVersion {
1837 id: buffer.remote_id().into(),
1838 version: language::proto::serialize_version(&buffer.version),
1839 }
1840 })
1841 .collect();
1842 let incomplete_buffer_ids = self
1843 .as_remote()
1844 .map(|remote| remote.incomplete_buffer_ids())
1845 .unwrap_or_default();
1846 (buffers, incomplete_buffer_ids)
1847 }
1848
1849 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1850 for open_buffer in self.opened_buffers.values_mut() {
1851 if let Some(buffer) = open_buffer.upgrade() {
1852 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1853 }
1854 }
1855
1856 for buffer in self.buffers() {
1857 buffer.update(cx, |buffer, cx| {
1858 buffer.set_capability(Capability::ReadOnly, cx)
1859 });
1860 }
1861
1862 if let Some(remote) = self.as_remote_mut() {
1863 // Wake up all futures currently waiting on a buffer to get opened,
1864 // to give them a chance to fail now that we've disconnected.
1865 remote.remote_buffer_listeners.clear()
1866 }
1867 }
1868
1869 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1870 self.downstream_client = Some((downstream_client, remote_id));
1871 }
1872
1873 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1874 self.downstream_client.take();
1875 self.forget_shared_buffers();
1876 }
1877
1878 pub fn discard_incomplete(&mut self) {
1879 self.opened_buffers
1880 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1881 }
1882
1883 pub fn find_search_candidates(
1884 &mut self,
1885 query: &SearchQuery,
1886 mut limit: usize,
1887 fs: Arc<dyn Fs>,
1888 cx: &mut Context<Self>,
1889 ) -> Receiver<Entity<Buffer>> {
1890 let (tx, rx) = smol::channel::unbounded();
1891 let mut open_buffers = HashSet::default();
1892 let mut unnamed_buffers = Vec::new();
1893 for handle in self.buffers() {
1894 let buffer = handle.read(cx);
1895 if let Some(entry_id) = buffer.entry_id(cx) {
1896 open_buffers.insert(entry_id);
1897 } else {
1898 limit = limit.saturating_sub(1);
1899 unnamed_buffers.push(handle)
1900 };
1901 }
1902
1903 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1904 let project_paths_rx = self
1905 .worktree_store
1906 .update(cx, |worktree_store, cx| {
1907 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1908 })
1909 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1910
1911 cx.spawn(|this, mut cx| async move {
1912 for buffer in unnamed_buffers {
1913 tx.send(buffer).await.ok();
1914 }
1915
1916 let mut project_paths_rx = pin!(project_paths_rx);
1917 while let Some(project_paths) = project_paths_rx.next().await {
1918 let buffers = this.update(&mut cx, |this, cx| {
1919 project_paths
1920 .into_iter()
1921 .map(|project_path| this.open_buffer(project_path, cx))
1922 .collect::<Vec<_>>()
1923 })?;
1924 for buffer_task in buffers {
1925 if let Some(buffer) = buffer_task.await.log_err() {
1926 if tx.send(buffer).await.is_err() {
1927 return anyhow::Ok(());
1928 }
1929 }
1930 }
1931 }
1932 anyhow::Ok(())
1933 })
1934 .detach();
1935 rx
1936 }
1937
1938 pub fn recalculate_buffer_diffs(
1939 &mut self,
1940 buffers: Vec<Entity<Buffer>>,
1941 cx: &mut Context<Self>,
1942 ) -> impl Future<Output = ()> {
1943 let mut futures = Vec::new();
1944 for buffer in buffers {
1945 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1946 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1947 {
1948 let buffer = buffer.read(cx).text_snapshot();
1949 futures.push(diff_state.update(cx, |diff_state, cx| {
1950 diff_state.recalculate_diffs(buffer, cx)
1951 }));
1952 }
1953 }
1954 async move {
1955 futures::future::join_all(futures).await;
1956 }
1957 }
1958
1959 fn on_buffer_event(
1960 &mut self,
1961 buffer: Entity<Buffer>,
1962 event: &BufferEvent,
1963 cx: &mut Context<Self>,
1964 ) {
1965 match event {
1966 BufferEvent::FileHandleChanged => {
1967 if let Some(local) = self.as_local_mut() {
1968 local.buffer_changed_file(buffer, cx);
1969 }
1970 }
1971 BufferEvent::Reloaded => {
1972 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1973 return;
1974 };
1975 let buffer = buffer.read(cx);
1976 downstream_client
1977 .send(proto::BufferReloaded {
1978 project_id: *project_id,
1979 buffer_id: buffer.remote_id().to_proto(),
1980 version: serialize_version(&buffer.version()),
1981 mtime: buffer.saved_mtime().map(|t| t.into()),
1982 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1983 })
1984 .log_err();
1985 }
1986 BufferEvent::LanguageChanged => {
1987 let buffer_id = buffer.read(cx).remote_id();
1988 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1989 self.opened_buffers.get(&buffer_id)
1990 {
1991 diff_state.update(cx, |diff_state, cx| {
1992 diff_state.buffer_language_changed(buffer, cx);
1993 });
1994 }
1995 }
1996 _ => {}
1997 }
1998 }
1999
2000 pub async fn handle_update_buffer(
2001 this: Entity<Self>,
2002 envelope: TypedEnvelope<proto::UpdateBuffer>,
2003 mut cx: AsyncApp,
2004 ) -> Result<proto::Ack> {
2005 let payload = envelope.payload.clone();
2006 let buffer_id = BufferId::new(payload.buffer_id)?;
2007 let ops = payload
2008 .operations
2009 .into_iter()
2010 .map(language::proto::deserialize_operation)
2011 .collect::<Result<Vec<_>, _>>()?;
2012 this.update(&mut cx, |this, cx| {
2013 match this.opened_buffers.entry(buffer_id) {
2014 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2015 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2016 OpenBuffer::Complete { buffer, .. } => {
2017 if let Some(buffer) = buffer.upgrade() {
2018 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2019 }
2020 }
2021 },
2022 hash_map::Entry::Vacant(e) => {
2023 e.insert(OpenBuffer::Operations(ops));
2024 }
2025 }
2026 Ok(proto::Ack {})
2027 })?
2028 }
2029
2030 pub fn register_shared_lsp_handle(
2031 &mut self,
2032 peer_id: proto::PeerId,
2033 buffer_id: BufferId,
2034 handle: OpenLspBufferHandle,
2035 ) {
2036 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2037 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2038 buffer.lsp_handle = Some(handle);
2039 return;
2040 }
2041 }
2042 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2043 }
2044
2045 pub fn handle_synchronize_buffers(
2046 &mut self,
2047 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2048 cx: &mut Context<Self>,
2049 client: Arc<Client>,
2050 ) -> Result<proto::SynchronizeBuffersResponse> {
2051 let project_id = envelope.payload.project_id;
2052 let mut response = proto::SynchronizeBuffersResponse {
2053 buffers: Default::default(),
2054 };
2055 let Some(guest_id) = envelope.original_sender_id else {
2056 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2057 };
2058
2059 self.shared_buffers.entry(guest_id).or_default().clear();
2060 for buffer in envelope.payload.buffers {
2061 let buffer_id = BufferId::new(buffer.id)?;
2062 let remote_version = language::proto::deserialize_version(&buffer.version);
2063 if let Some(buffer) = self.get(buffer_id) {
2064 self.shared_buffers
2065 .entry(guest_id)
2066 .or_default()
2067 .entry(buffer_id)
2068 .or_insert_with(|| SharedBuffer {
2069 buffer: buffer.clone(),
2070 diff: None,
2071 lsp_handle: None,
2072 });
2073
2074 let buffer = buffer.read(cx);
2075 response.buffers.push(proto::BufferVersion {
2076 id: buffer_id.into(),
2077 version: language::proto::serialize_version(&buffer.version),
2078 });
2079
2080 let operations = buffer.serialize_ops(Some(remote_version), cx);
2081 let client = client.clone();
2082 if let Some(file) = buffer.file() {
2083 client
2084 .send(proto::UpdateBufferFile {
2085 project_id,
2086 buffer_id: buffer_id.into(),
2087 file: Some(file.to_proto(cx)),
2088 })
2089 .log_err();
2090 }
2091
2092 // TODO(max): do something
2093 // client
2094 // .send(proto::UpdateStagedText {
2095 // project_id,
2096 // buffer_id: buffer_id.into(),
2097 // diff_base: buffer.diff_base().map(ToString::to_string),
2098 // })
2099 // .log_err();
2100
2101 client
2102 .send(proto::BufferReloaded {
2103 project_id,
2104 buffer_id: buffer_id.into(),
2105 version: language::proto::serialize_version(buffer.saved_version()),
2106 mtime: buffer.saved_mtime().map(|time| time.into()),
2107 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2108 as i32,
2109 })
2110 .log_err();
2111
2112 cx.background_executor()
2113 .spawn(
2114 async move {
2115 let operations = operations.await;
2116 for chunk in split_operations(operations) {
2117 client
2118 .request(proto::UpdateBuffer {
2119 project_id,
2120 buffer_id: buffer_id.into(),
2121 operations: chunk,
2122 })
2123 .await?;
2124 }
2125 anyhow::Ok(())
2126 }
2127 .log_err(),
2128 )
2129 .detach();
2130 }
2131 }
2132 Ok(response)
2133 }
2134
2135 pub fn handle_create_buffer_for_peer(
2136 &mut self,
2137 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2138 replica_id: u16,
2139 capability: Capability,
2140 cx: &mut Context<Self>,
2141 ) -> Result<()> {
2142 let Some(remote) = self.as_remote_mut() else {
2143 return Err(anyhow!("buffer store is not a remote"));
2144 };
2145
2146 if let Some(buffer) =
2147 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2148 {
2149 self.add_buffer(buffer, cx)?;
2150 }
2151
2152 Ok(())
2153 }
2154
2155 pub async fn handle_update_buffer_file(
2156 this: Entity<Self>,
2157 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2158 mut cx: AsyncApp,
2159 ) -> Result<()> {
2160 let buffer_id = envelope.payload.buffer_id;
2161 let buffer_id = BufferId::new(buffer_id)?;
2162
2163 this.update(&mut cx, |this, cx| {
2164 let payload = envelope.payload.clone();
2165 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2166 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2167 let worktree = this
2168 .worktree_store
2169 .read(cx)
2170 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2171 .ok_or_else(|| anyhow!("no such worktree"))?;
2172 let file = File::from_proto(file, worktree, cx)?;
2173 let old_file = buffer.update(cx, |buffer, cx| {
2174 let old_file = buffer.file().cloned();
2175 let new_path = file.path.clone();
2176 buffer.file_updated(Arc::new(file), cx);
2177 if old_file
2178 .as_ref()
2179 .map_or(true, |old| *old.path() != new_path)
2180 {
2181 Some(old_file)
2182 } else {
2183 None
2184 }
2185 });
2186 if let Some(old_file) = old_file {
2187 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2188 }
2189 }
2190 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2191 downstream_client
2192 .send(proto::UpdateBufferFile {
2193 project_id: *project_id,
2194 buffer_id: buffer_id.into(),
2195 file: envelope.payload.file,
2196 })
2197 .log_err();
2198 }
2199 Ok(())
2200 })?
2201 }
2202
2203 pub async fn handle_save_buffer(
2204 this: Entity<Self>,
2205 envelope: TypedEnvelope<proto::SaveBuffer>,
2206 mut cx: AsyncApp,
2207 ) -> Result<proto::BufferSaved> {
2208 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2209 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2210 anyhow::Ok((
2211 this.get_existing(buffer_id)?,
2212 this.downstream_client
2213 .as_ref()
2214 .map(|(_, project_id)| *project_id)
2215 .context("project is not shared")?,
2216 ))
2217 })??;
2218 buffer
2219 .update(&mut cx, |buffer, _| {
2220 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2221 })?
2222 .await?;
2223 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2224
2225 if let Some(new_path) = envelope.payload.new_path {
2226 let new_path = ProjectPath::from_proto(new_path);
2227 this.update(&mut cx, |this, cx| {
2228 this.save_buffer_as(buffer.clone(), new_path, cx)
2229 })?
2230 .await?;
2231 } else {
2232 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2233 .await?;
2234 }
2235
2236 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2237 project_id,
2238 buffer_id: buffer_id.into(),
2239 version: serialize_version(buffer.saved_version()),
2240 mtime: buffer.saved_mtime().map(|time| time.into()),
2241 })
2242 }
2243
2244 pub async fn handle_close_buffer(
2245 this: Entity<Self>,
2246 envelope: TypedEnvelope<proto::CloseBuffer>,
2247 mut cx: AsyncApp,
2248 ) -> Result<()> {
2249 let peer_id = envelope.sender_id;
2250 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2251 this.update(&mut cx, |this, _| {
2252 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2253 if shared.remove(&buffer_id).is_some() {
2254 if shared.is_empty() {
2255 this.shared_buffers.remove(&peer_id);
2256 }
2257 return;
2258 }
2259 }
2260 debug_panic!(
2261 "peer_id {} closed buffer_id {} which was either not open or already closed",
2262 peer_id,
2263 buffer_id
2264 )
2265 })
2266 }
2267
2268 pub async fn handle_buffer_saved(
2269 this: Entity<Self>,
2270 envelope: TypedEnvelope<proto::BufferSaved>,
2271 mut cx: AsyncApp,
2272 ) -> Result<()> {
2273 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2274 let version = deserialize_version(&envelope.payload.version);
2275 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2276 this.update(&mut cx, move |this, cx| {
2277 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2278 buffer.update(cx, |buffer, cx| {
2279 buffer.did_save(version, mtime, cx);
2280 });
2281 }
2282
2283 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2284 downstream_client
2285 .send(proto::BufferSaved {
2286 project_id: *project_id,
2287 buffer_id: buffer_id.into(),
2288 mtime: envelope.payload.mtime,
2289 version: envelope.payload.version,
2290 })
2291 .log_err();
2292 }
2293 })
2294 }
2295
2296 pub async fn handle_buffer_reloaded(
2297 this: Entity<Self>,
2298 envelope: TypedEnvelope<proto::BufferReloaded>,
2299 mut cx: AsyncApp,
2300 ) -> Result<()> {
2301 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2302 let version = deserialize_version(&envelope.payload.version);
2303 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2304 let line_ending = deserialize_line_ending(
2305 proto::LineEnding::from_i32(envelope.payload.line_ending)
2306 .ok_or_else(|| anyhow!("missing line ending"))?,
2307 );
2308 this.update(&mut cx, |this, cx| {
2309 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2310 buffer.update(cx, |buffer, cx| {
2311 buffer.did_reload(version, line_ending, mtime, cx);
2312 });
2313 }
2314
2315 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2316 downstream_client
2317 .send(proto::BufferReloaded {
2318 project_id: *project_id,
2319 buffer_id: buffer_id.into(),
2320 mtime: envelope.payload.mtime,
2321 version: envelope.payload.version,
2322 line_ending: envelope.payload.line_ending,
2323 })
2324 .log_err();
2325 }
2326 })
2327 }
2328
2329 pub async fn handle_blame_buffer(
2330 this: Entity<Self>,
2331 envelope: TypedEnvelope<proto::BlameBuffer>,
2332 mut cx: AsyncApp,
2333 ) -> Result<proto::BlameBufferResponse> {
2334 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2335 let version = deserialize_version(&envelope.payload.version);
2336 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2337 buffer
2338 .update(&mut cx, |buffer, _| {
2339 buffer.wait_for_version(version.clone())
2340 })?
2341 .await?;
2342 let blame = this
2343 .update(&mut cx, |this, cx| {
2344 this.blame_buffer(&buffer, Some(version), cx)
2345 })?
2346 .await?;
2347 Ok(serialize_blame_buffer_response(blame))
2348 }
2349
2350 pub async fn handle_get_permalink_to_line(
2351 this: Entity<Self>,
2352 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2353 mut cx: AsyncApp,
2354 ) -> Result<proto::GetPermalinkToLineResponse> {
2355 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2356 // let version = deserialize_version(&envelope.payload.version);
2357 let selection = {
2358 let proto_selection = envelope
2359 .payload
2360 .selection
2361 .context("no selection to get permalink for defined")?;
2362 proto_selection.start as u32..proto_selection.end as u32
2363 };
2364 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2365 let permalink = this
2366 .update(&mut cx, |this, cx| {
2367 this.get_permalink_to_line(&buffer, selection, cx)
2368 })?
2369 .await?;
2370 Ok(proto::GetPermalinkToLineResponse {
2371 permalink: permalink.to_string(),
2372 })
2373 }
2374
2375 pub async fn handle_open_unstaged_diff(
2376 this: Entity<Self>,
2377 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2378 mut cx: AsyncApp,
2379 ) -> Result<proto::OpenUnstagedDiffResponse> {
2380 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2381 let diff = this
2382 .update(&mut cx, |this, cx| {
2383 let buffer = this.get(buffer_id)?;
2384 Some(this.open_unstaged_diff(buffer, cx))
2385 })?
2386 .ok_or_else(|| anyhow!("no such buffer"))?
2387 .await?;
2388 this.update(&mut cx, |this, _| {
2389 let shared_buffers = this
2390 .shared_buffers
2391 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2392 .or_default();
2393 debug_assert!(shared_buffers.contains_key(&buffer_id));
2394 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2395 shared.diff = Some(diff.clone());
2396 }
2397 })?;
2398 let staged_text = diff.read_with(&cx, |diff, _| {
2399 diff.snapshot.base_text.as_ref().map(|buffer| buffer.text())
2400 })?;
2401 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2402 }
2403
2404 pub async fn handle_open_uncommitted_diff(
2405 this: Entity<Self>,
2406 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2407 mut cx: AsyncApp,
2408 ) -> Result<proto::OpenUncommittedDiffResponse> {
2409 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2410 let diff = this
2411 .update(&mut cx, |this, cx| {
2412 let buffer = this.get(buffer_id)?;
2413 Some(this.open_uncommitted_diff(buffer, cx))
2414 })?
2415 .ok_or_else(|| anyhow!("no such buffer"))?
2416 .await?;
2417 this.update(&mut cx, |this, _| {
2418 let shared_buffers = this
2419 .shared_buffers
2420 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2421 .or_default();
2422 debug_assert!(shared_buffers.contains_key(&buffer_id));
2423 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2424 shared.diff = Some(diff.clone());
2425 }
2426 })?;
2427 diff.read_with(&cx, |diff, cx| {
2428 use proto::open_uncommitted_diff_response::Mode;
2429
2430 let staged_buffer = diff
2431 .unstaged_diff
2432 .as_ref()
2433 .and_then(|diff| diff.read(cx).snapshot.base_text.as_ref());
2434
2435 let mode;
2436 let staged_text;
2437 let committed_text;
2438 if let Some(committed_buffer) = &diff.snapshot.base_text {
2439 committed_text = Some(committed_buffer.text());
2440 if let Some(staged_buffer) = staged_buffer {
2441 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2442 mode = Mode::IndexMatchesHead;
2443 staged_text = None;
2444 } else {
2445 mode = Mode::IndexAndHead;
2446 staged_text = Some(staged_buffer.text());
2447 }
2448 } else {
2449 mode = Mode::IndexAndHead;
2450 staged_text = None;
2451 }
2452 } else {
2453 mode = Mode::IndexAndHead;
2454 committed_text = None;
2455 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2456 }
2457
2458 proto::OpenUncommittedDiffResponse {
2459 committed_text,
2460 staged_text,
2461 mode: mode.into(),
2462 }
2463 })
2464 }
2465
2466 pub async fn handle_update_diff_bases(
2467 this: Entity<Self>,
2468 request: TypedEnvelope<proto::UpdateDiffBases>,
2469 mut cx: AsyncApp,
2470 ) -> Result<()> {
2471 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2472 this.update(&mut cx, |this, cx| {
2473 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2474 this.opened_buffers.get_mut(&buffer_id)
2475 {
2476 if let Some(buffer) = buffer.upgrade() {
2477 let buffer = buffer.read(cx).text_snapshot();
2478 diff_state.update(cx, |diff_state, cx| {
2479 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2480 })
2481 }
2482 }
2483 })
2484 }
2485
2486 pub fn reload_buffers(
2487 &self,
2488 buffers: HashSet<Entity<Buffer>>,
2489 push_to_history: bool,
2490 cx: &mut Context<Self>,
2491 ) -> Task<Result<ProjectTransaction>> {
2492 if buffers.is_empty() {
2493 return Task::ready(Ok(ProjectTransaction::default()));
2494 }
2495 match &self.state {
2496 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2497 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2498 }
2499 }
2500
2501 async fn handle_reload_buffers(
2502 this: Entity<Self>,
2503 envelope: TypedEnvelope<proto::ReloadBuffers>,
2504 mut cx: AsyncApp,
2505 ) -> Result<proto::ReloadBuffersResponse> {
2506 let sender_id = envelope.original_sender_id().unwrap_or_default();
2507 let reload = this.update(&mut cx, |this, cx| {
2508 let mut buffers = HashSet::default();
2509 for buffer_id in &envelope.payload.buffer_ids {
2510 let buffer_id = BufferId::new(*buffer_id)?;
2511 buffers.insert(this.get_existing(buffer_id)?);
2512 }
2513 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2514 })??;
2515
2516 let project_transaction = reload.await?;
2517 let project_transaction = this.update(&mut cx, |this, cx| {
2518 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2519 })?;
2520 Ok(proto::ReloadBuffersResponse {
2521 transaction: Some(project_transaction),
2522 })
2523 }
2524
2525 pub fn create_buffer_for_peer(
2526 &mut self,
2527 buffer: &Entity<Buffer>,
2528 peer_id: proto::PeerId,
2529 cx: &mut Context<Self>,
2530 ) -> Task<Result<()>> {
2531 let buffer_id = buffer.read(cx).remote_id();
2532 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2533 if shared_buffers.contains_key(&buffer_id) {
2534 return Task::ready(Ok(()));
2535 }
2536 shared_buffers.insert(
2537 buffer_id,
2538 SharedBuffer {
2539 buffer: buffer.clone(),
2540 diff: None,
2541 lsp_handle: None,
2542 },
2543 );
2544
2545 let Some((client, project_id)) = self.downstream_client.clone() else {
2546 return Task::ready(Ok(()));
2547 };
2548
2549 cx.spawn(|this, mut cx| async move {
2550 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2551 return anyhow::Ok(());
2552 };
2553
2554 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2555 let operations = operations.await;
2556 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2557
2558 let initial_state = proto::CreateBufferForPeer {
2559 project_id,
2560 peer_id: Some(peer_id),
2561 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2562 };
2563
2564 if client.send(initial_state).log_err().is_some() {
2565 let client = client.clone();
2566 cx.background_executor()
2567 .spawn(async move {
2568 let mut chunks = split_operations(operations).peekable();
2569 while let Some(chunk) = chunks.next() {
2570 let is_last = chunks.peek().is_none();
2571 client.send(proto::CreateBufferForPeer {
2572 project_id,
2573 peer_id: Some(peer_id),
2574 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2575 proto::BufferChunk {
2576 buffer_id: buffer_id.into(),
2577 operations: chunk,
2578 is_last,
2579 },
2580 )),
2581 })?;
2582 }
2583 anyhow::Ok(())
2584 })
2585 .await
2586 .log_err();
2587 }
2588 Ok(())
2589 })
2590 }
2591
2592 pub fn forget_shared_buffers(&mut self) {
2593 self.shared_buffers.clear();
2594 }
2595
2596 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2597 self.shared_buffers.remove(peer_id);
2598 }
2599
2600 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2601 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2602 self.shared_buffers.insert(new_peer_id, buffers);
2603 }
2604 }
2605
2606 pub fn has_shared_buffers(&self) -> bool {
2607 !self.shared_buffers.is_empty()
2608 }
2609
2610 pub fn create_local_buffer(
2611 &mut self,
2612 text: &str,
2613 language: Option<Arc<Language>>,
2614 cx: &mut Context<Self>,
2615 ) -> Entity<Buffer> {
2616 let buffer = cx.new(|cx| {
2617 Buffer::local(text, cx)
2618 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2619 });
2620
2621 self.add_buffer(buffer.clone(), cx).log_err();
2622 let buffer_id = buffer.read(cx).remote_id();
2623
2624 let this = self
2625 .as_local_mut()
2626 .expect("local-only method called in a non-local context");
2627 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2628 this.local_buffer_ids_by_path.insert(
2629 ProjectPath {
2630 worktree_id: file.worktree_id(cx),
2631 path: file.path.clone(),
2632 },
2633 buffer_id,
2634 );
2635
2636 if let Some(entry_id) = file.entry_id {
2637 this.local_buffer_ids_by_entry_id
2638 .insert(entry_id, buffer_id);
2639 }
2640 }
2641 buffer
2642 }
2643
2644 pub fn deserialize_project_transaction(
2645 &mut self,
2646 message: proto::ProjectTransaction,
2647 push_to_history: bool,
2648 cx: &mut Context<Self>,
2649 ) -> Task<Result<ProjectTransaction>> {
2650 if let Some(this) = self.as_remote_mut() {
2651 this.deserialize_project_transaction(message, push_to_history, cx)
2652 } else {
2653 debug_panic!("not a remote buffer store");
2654 Task::ready(Err(anyhow!("not a remote buffer store")))
2655 }
2656 }
2657
2658 pub fn wait_for_remote_buffer(
2659 &mut self,
2660 id: BufferId,
2661 cx: &mut Context<BufferStore>,
2662 ) -> Task<Result<Entity<Buffer>>> {
2663 if let Some(this) = self.as_remote_mut() {
2664 this.wait_for_remote_buffer(id, cx)
2665 } else {
2666 debug_panic!("not a remote buffer store");
2667 Task::ready(Err(anyhow!("not a remote buffer store")))
2668 }
2669 }
2670
2671 pub fn serialize_project_transaction_for_peer(
2672 &mut self,
2673 project_transaction: ProjectTransaction,
2674 peer_id: proto::PeerId,
2675 cx: &mut Context<Self>,
2676 ) -> proto::ProjectTransaction {
2677 let mut serialized_transaction = proto::ProjectTransaction {
2678 buffer_ids: Default::default(),
2679 transactions: Default::default(),
2680 };
2681 for (buffer, transaction) in project_transaction.0 {
2682 self.create_buffer_for_peer(&buffer, peer_id, cx)
2683 .detach_and_log_err(cx);
2684 serialized_transaction
2685 .buffer_ids
2686 .push(buffer.read(cx).remote_id().into());
2687 serialized_transaction
2688 .transactions
2689 .push(language::proto::serialize_transaction(&transaction));
2690 }
2691 serialized_transaction
2692 }
2693}
2694
2695impl OpenBuffer {
2696 fn upgrade(&self) -> Option<Entity<Buffer>> {
2697 match self {
2698 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2699 OpenBuffer::Operations(_) => None,
2700 }
2701 }
2702}
2703
2704fn is_not_found_error(error: &anyhow::Error) -> bool {
2705 error
2706 .root_cause()
2707 .downcast_ref::<io::Error>()
2708 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2709}
2710
2711fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2712 let Some(blame) = blame else {
2713 return proto::BlameBufferResponse {
2714 blame_response: None,
2715 };
2716 };
2717
2718 let entries = blame
2719 .entries
2720 .into_iter()
2721 .map(|entry| proto::BlameEntry {
2722 sha: entry.sha.as_bytes().into(),
2723 start_line: entry.range.start,
2724 end_line: entry.range.end,
2725 original_line_number: entry.original_line_number,
2726 author: entry.author.clone(),
2727 author_mail: entry.author_mail.clone(),
2728 author_time: entry.author_time,
2729 author_tz: entry.author_tz.clone(),
2730 committer: entry.committer.clone(),
2731 committer_mail: entry.committer_mail.clone(),
2732 committer_time: entry.committer_time,
2733 committer_tz: entry.committer_tz.clone(),
2734 summary: entry.summary.clone(),
2735 previous: entry.previous.clone(),
2736 filename: entry.filename.clone(),
2737 })
2738 .collect::<Vec<_>>();
2739
2740 let messages = blame
2741 .messages
2742 .into_iter()
2743 .map(|(oid, message)| proto::CommitMessage {
2744 oid: oid.as_bytes().into(),
2745 message,
2746 })
2747 .collect::<Vec<_>>();
2748
2749 let permalinks = blame
2750 .permalinks
2751 .into_iter()
2752 .map(|(oid, url)| proto::CommitPermalink {
2753 oid: oid.as_bytes().into(),
2754 permalink: url.to_string(),
2755 })
2756 .collect::<Vec<_>>();
2757
2758 proto::BlameBufferResponse {
2759 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2760 entries,
2761 messages,
2762 permalinks,
2763 remote_url: blame.remote_url,
2764 }),
2765 }
2766}
2767
2768fn deserialize_blame_buffer_response(
2769 response: proto::BlameBufferResponse,
2770) -> Option<git::blame::Blame> {
2771 let response = response.blame_response?;
2772 let entries = response
2773 .entries
2774 .into_iter()
2775 .filter_map(|entry| {
2776 Some(git::blame::BlameEntry {
2777 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2778 range: entry.start_line..entry.end_line,
2779 original_line_number: entry.original_line_number,
2780 committer: entry.committer,
2781 committer_time: entry.committer_time,
2782 committer_tz: entry.committer_tz,
2783 committer_mail: entry.committer_mail,
2784 author: entry.author,
2785 author_mail: entry.author_mail,
2786 author_time: entry.author_time,
2787 author_tz: entry.author_tz,
2788 summary: entry.summary,
2789 previous: entry.previous,
2790 filename: entry.filename,
2791 })
2792 })
2793 .collect::<Vec<_>>();
2794
2795 let messages = response
2796 .messages
2797 .into_iter()
2798 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2799 .collect::<HashMap<_, _>>();
2800
2801 let permalinks = response
2802 .permalinks
2803 .into_iter()
2804 .filter_map(|permalink| {
2805 Some((
2806 git::Oid::from_bytes(&permalink.oid).ok()?,
2807 Url::from_str(&permalink.permalink).ok()?,
2808 ))
2809 })
2810 .collect::<HashMap<_, _>>();
2811
2812 Some(Blame {
2813 entries,
2814 permalinks,
2815 messages,
2816 remote_url: response.remote_url,
2817 })
2818}
2819
2820fn get_permalink_in_rust_registry_src(
2821 provider_registry: Arc<GitHostingProviderRegistry>,
2822 path: PathBuf,
2823 selection: Range<u32>,
2824) -> Result<url::Url> {
2825 #[derive(Deserialize)]
2826 struct CargoVcsGit {
2827 sha1: String,
2828 }
2829
2830 #[derive(Deserialize)]
2831 struct CargoVcsInfo {
2832 git: CargoVcsGit,
2833 path_in_vcs: String,
2834 }
2835
2836 #[derive(Deserialize)]
2837 struct CargoPackage {
2838 repository: String,
2839 }
2840
2841 #[derive(Deserialize)]
2842 struct CargoToml {
2843 package: CargoPackage,
2844 }
2845
2846 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2847 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2848 Some((dir, json))
2849 }) else {
2850 bail!("No .cargo_vcs_info.json found in parent directories")
2851 };
2852 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2853 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2854 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2855 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2856 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2857 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2858 let permalink = provider.build_permalink(
2859 remote,
2860 BuildPermalinkParams {
2861 sha: &cargo_vcs_info.git.sha1,
2862 path: &path.to_string_lossy(),
2863 selection: Some(selection),
2864 },
2865 );
2866 Ok(permalink)
2867}