1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use buffer_diff::{BufferDiff, BufferDiffEvent};
10use client::Client;
11use collections::{hash_map, HashMap, HashSet};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{
27 proto::{self, ToProto},
28 AnyProtoClient, ErrorExt as _, TypedEnvelope,
29};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::BufferId;
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum DiffKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_diffs:
58 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
59 worktree_store: Entity<WorktreeStore>,
60 opened_buffers: HashMap<BufferId, OpenBuffer>,
61 downstream_client: Option<(AnyProtoClient, u64)>,
62 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
63}
64
65#[derive(Hash, Eq, PartialEq, Clone)]
66struct SharedBuffer {
67 buffer: Entity<Buffer>,
68 diff: Option<Entity<BufferDiff>>,
69 lsp_handle: Option<OpenLspBufferHandle>,
70}
71
72#[derive(Default)]
73struct BufferDiffState {
74 unstaged_diff: Option<WeakEntity<BufferDiff>>,
75 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
76 recalculate_diff_task: Option<Task<Result<()>>>,
77 language: Option<Arc<Language>>,
78 language_registry: Option<Arc<LanguageRegistry>>,
79 diff_updated_futures: Vec<oneshot::Sender<()>>,
80
81 head_text: Option<Arc<String>>,
82 index_text: Option<Arc<String>>,
83 head_changed: bool,
84 index_changed: bool,
85 language_changed: bool,
86}
87
88#[derive(Clone, Debug)]
89enum DiffBasesChange {
90 SetIndex(Option<String>),
91 SetHead(Option<String>),
92 SetEach {
93 index: Option<String>,
94 head: Option<String>,
95 },
96 SetBoth(Option<String>),
97}
98
99impl BufferDiffState {
100 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
101 self.language = buffer.read(cx).language().cloned();
102 self.language_changed = true;
103 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
104 }
105
106 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
107 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
108 }
109
110 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
111 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
112 }
113
114 fn handle_base_texts_updated(
115 &mut self,
116 buffer: text::BufferSnapshot,
117 message: proto::UpdateDiffBases,
118 cx: &mut Context<Self>,
119 ) {
120 use proto::update_diff_bases::Mode;
121
122 let Some(mode) = Mode::from_i32(message.mode) else {
123 return;
124 };
125
126 let diff_bases_change = match mode {
127 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
128 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
129 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
130 Mode::IndexAndHead => DiffBasesChange::SetEach {
131 index: message.staged_text,
132 head: message.committed_text,
133 },
134 };
135
136 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
137 }
138
139 fn diff_bases_changed(
140 &mut self,
141 buffer: text::BufferSnapshot,
142 diff_bases_change: DiffBasesChange,
143 cx: &mut Context<Self>,
144 ) -> oneshot::Receiver<()> {
145 match diff_bases_change {
146 DiffBasesChange::SetIndex(index) => {
147 self.index_text = index.map(|mut index| {
148 text::LineEnding::normalize(&mut index);
149 Arc::new(index)
150 });
151 self.index_changed = true;
152 }
153 DiffBasesChange::SetHead(head) => {
154 self.head_text = head.map(|mut head| {
155 text::LineEnding::normalize(&mut head);
156 Arc::new(head)
157 });
158 self.head_changed = true;
159 }
160 DiffBasesChange::SetBoth(text) => {
161 let text = text.map(|mut text| {
162 text::LineEnding::normalize(&mut text);
163 Arc::new(text)
164 });
165 self.head_text = text.clone();
166 self.index_text = text;
167 self.head_changed = true;
168 self.index_changed = true;
169 }
170 DiffBasesChange::SetEach { index, head } => {
171 self.index_text = index.map(|mut index| {
172 text::LineEnding::normalize(&mut index);
173 Arc::new(index)
174 });
175 self.index_changed = true;
176 self.head_text = head.map(|mut head| {
177 text::LineEnding::normalize(&mut head);
178 Arc::new(head)
179 });
180 self.head_changed = true;
181 }
182 }
183
184 self.recalculate_diffs(buffer, cx)
185 }
186
187 fn recalculate_diffs(
188 &mut self,
189 buffer: text::BufferSnapshot,
190 cx: &mut Context<Self>,
191 ) -> oneshot::Receiver<()> {
192 log::debug!("recalculate diffs");
193 let (tx, rx) = oneshot::channel();
194 self.diff_updated_futures.push(tx);
195
196 let language = self.language.clone();
197 let language_registry = self.language_registry.clone();
198 let unstaged_diff = self.unstaged_diff();
199 let uncommitted_diff = self.uncommitted_diff();
200 let head = self.head_text.clone();
201 let index = self.index_text.clone();
202 let index_changed = self.index_changed;
203 let head_changed = self.head_changed;
204 let language_changed = self.language_changed;
205 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
206 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
207 (None, None) => true,
208 _ => false,
209 };
210 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
211 let mut unstaged_changed_range = None;
212 if let Some(unstaged_diff) = &unstaged_diff {
213 unstaged_changed_range = BufferDiff::update_diff(
214 unstaged_diff.clone(),
215 buffer.clone(),
216 index,
217 index_changed,
218 language_changed,
219 language.clone(),
220 language_registry.clone(),
221 &mut cx,
222 )
223 .await?;
224
225 unstaged_diff.update(&mut cx, |_, cx| {
226 if language_changed {
227 cx.emit(BufferDiffEvent::LanguageChanged);
228 }
229 if let Some(changed_range) = unstaged_changed_range.clone() {
230 cx.emit(BufferDiffEvent::DiffChanged {
231 changed_range: Some(changed_range),
232 })
233 }
234 })?;
235 }
236
237 if let Some(uncommitted_diff) = &uncommitted_diff {
238 let uncommitted_changed_range =
239 if let (Some(unstaged_diff), true) = (&unstaged_diff, index_matches_head) {
240 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
241 uncommitted_diff.update_diff_from(&buffer, unstaged_diff, cx)
242 })?
243 } else {
244 BufferDiff::update_diff(
245 uncommitted_diff.clone(),
246 buffer.clone(),
247 head,
248 head_changed,
249 language_changed,
250 language.clone(),
251 language_registry.clone(),
252 &mut cx,
253 )
254 .await?
255 };
256
257 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
258 if language_changed {
259 cx.emit(BufferDiffEvent::LanguageChanged);
260 }
261 let changed_range = match (unstaged_changed_range, uncommitted_changed_range) {
262 (None, None) => None,
263 (Some(unstaged_range), None) => {
264 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
265 }
266 (None, Some(uncommitted_range)) => Some(uncommitted_range),
267 (Some(unstaged_range), Some(uncommitted_range)) => {
268 let mut start = uncommitted_range.start;
269 let mut end = uncommitted_range.end;
270 if let Some(unstaged_range) =
271 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
272 {
273 start = unstaged_range.start.min(&uncommitted_range.start, &buffer);
274 end = unstaged_range.end.max(&uncommitted_range.end, &buffer);
275 }
276 Some(start..end)
277 }
278 };
279 cx.emit(BufferDiffEvent::DiffChanged { changed_range });
280 })?;
281 }
282
283 if let Some(this) = this.upgrade() {
284 this.update(&mut cx, |this, _| {
285 this.index_changed = false;
286 this.head_changed = false;
287 this.language_changed = false;
288 for tx in this.diff_updated_futures.drain(..) {
289 tx.send(()).ok();
290 }
291 })?;
292 }
293
294 Ok(())
295 }));
296
297 rx
298 }
299}
300
301enum BufferStoreState {
302 Local(LocalBufferStore),
303 Remote(RemoteBufferStore),
304}
305
306struct RemoteBufferStore {
307 shared_with_me: HashSet<Entity<Buffer>>,
308 upstream_client: AnyProtoClient,
309 project_id: u64,
310 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
311 remote_buffer_listeners:
312 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
313 worktree_store: Entity<WorktreeStore>,
314}
315
316struct LocalBufferStore {
317 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
318 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
319 worktree_store: Entity<WorktreeStore>,
320 _subscription: Subscription,
321}
322
323enum OpenBuffer {
324 Complete {
325 buffer: WeakEntity<Buffer>,
326 diff_state: Entity<BufferDiffState>,
327 },
328 Operations(Vec<Operation>),
329}
330
331pub enum BufferStoreEvent {
332 BufferAdded(Entity<Buffer>),
333 BufferDropped(BufferId),
334 BufferChangedFilePath {
335 buffer: Entity<Buffer>,
336 old_file: Option<Arc<dyn language::File>>,
337 },
338}
339
340#[derive(Default, Debug)]
341pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
342
343impl EventEmitter<BufferStoreEvent> for BufferStore {}
344
345impl RemoteBufferStore {
346 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
347 let project_id = self.project_id;
348 let client = self.upstream_client.clone();
349 cx.background_executor().spawn(async move {
350 let response = client
351 .request(proto::OpenUnstagedDiff {
352 project_id,
353 buffer_id: buffer_id.to_proto(),
354 })
355 .await?;
356 Ok(response.staged_text)
357 })
358 }
359
360 fn open_uncommitted_diff(
361 &self,
362 buffer_id: BufferId,
363 cx: &App,
364 ) -> Task<Result<DiffBasesChange>> {
365 use proto::open_uncommitted_diff_response::Mode;
366
367 let project_id = self.project_id;
368 let client = self.upstream_client.clone();
369 cx.background_executor().spawn(async move {
370 let response = client
371 .request(proto::OpenUncommittedDiff {
372 project_id,
373 buffer_id: buffer_id.to_proto(),
374 })
375 .await?;
376 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
377 let bases = match mode {
378 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
379 Mode::IndexAndHead => DiffBasesChange::SetEach {
380 head: response.committed_text,
381 index: response.staged_text,
382 },
383 };
384 Ok(bases)
385 })
386 }
387
388 pub fn wait_for_remote_buffer(
389 &mut self,
390 id: BufferId,
391 cx: &mut Context<BufferStore>,
392 ) -> Task<Result<Entity<Buffer>>> {
393 let (tx, rx) = oneshot::channel();
394 self.remote_buffer_listeners.entry(id).or_default().push(tx);
395
396 cx.spawn(|this, cx| async move {
397 if let Some(buffer) = this
398 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
399 .ok()
400 .flatten()
401 {
402 return Ok(buffer);
403 }
404
405 cx.background_executor()
406 .spawn(async move { rx.await? })
407 .await
408 })
409 }
410
411 fn save_remote_buffer(
412 &self,
413 buffer_handle: Entity<Buffer>,
414 new_path: Option<proto::ProjectPath>,
415 cx: &Context<BufferStore>,
416 ) -> Task<Result<()>> {
417 let buffer = buffer_handle.read(cx);
418 let buffer_id = buffer.remote_id().into();
419 let version = buffer.version();
420 let rpc = self.upstream_client.clone();
421 let project_id = self.project_id;
422 cx.spawn(move |_, mut cx| async move {
423 let response = rpc
424 .request(proto::SaveBuffer {
425 project_id,
426 buffer_id,
427 new_path,
428 version: serialize_version(&version),
429 })
430 .await?;
431 let version = deserialize_version(&response.version);
432 let mtime = response.mtime.map(|mtime| mtime.into());
433
434 buffer_handle.update(&mut cx, |buffer, cx| {
435 buffer.did_save(version.clone(), mtime, cx);
436 })?;
437
438 Ok(())
439 })
440 }
441
442 pub fn handle_create_buffer_for_peer(
443 &mut self,
444 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
445 replica_id: u16,
446 capability: Capability,
447 cx: &mut Context<BufferStore>,
448 ) -> Result<Option<Entity<Buffer>>> {
449 match envelope
450 .payload
451 .variant
452 .ok_or_else(|| anyhow!("missing variant"))?
453 {
454 proto::create_buffer_for_peer::Variant::State(mut state) => {
455 let buffer_id = BufferId::new(state.id)?;
456
457 let buffer_result = maybe!({
458 let mut buffer_file = None;
459 if let Some(file) = state.file.take() {
460 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
461 let worktree = self
462 .worktree_store
463 .read(cx)
464 .worktree_for_id(worktree_id, cx)
465 .ok_or_else(|| {
466 anyhow!("no worktree found for id {}", file.worktree_id)
467 })?;
468 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
469 as Arc<dyn language::File>);
470 }
471 Buffer::from_proto(replica_id, capability, state, buffer_file)
472 });
473
474 match buffer_result {
475 Ok(buffer) => {
476 let buffer = cx.new(|_| buffer);
477 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
478 }
479 Err(error) => {
480 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
481 for listener in listeners {
482 listener.send(Err(anyhow!(error.cloned()))).ok();
483 }
484 }
485 }
486 }
487 }
488 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
489 let buffer_id = BufferId::new(chunk.buffer_id)?;
490 let buffer = self
491 .loading_remote_buffers_by_id
492 .get(&buffer_id)
493 .cloned()
494 .ok_or_else(|| {
495 anyhow!(
496 "received chunk for buffer {} without initial state",
497 chunk.buffer_id
498 )
499 })?;
500
501 let result = maybe!({
502 let operations = chunk
503 .operations
504 .into_iter()
505 .map(language::proto::deserialize_operation)
506 .collect::<Result<Vec<_>>>()?;
507 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
508 anyhow::Ok(())
509 });
510
511 if let Err(error) = result {
512 self.loading_remote_buffers_by_id.remove(&buffer_id);
513 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
514 for listener in listeners {
515 listener.send(Err(error.cloned())).ok();
516 }
517 }
518 } else if chunk.is_last {
519 self.loading_remote_buffers_by_id.remove(&buffer_id);
520 if self.upstream_client.is_via_collab() {
521 // retain buffers sent by peers to avoid races.
522 self.shared_with_me.insert(buffer.clone());
523 }
524
525 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
526 for sender in senders {
527 sender.send(Ok(buffer.clone())).ok();
528 }
529 }
530 return Ok(Some(buffer));
531 }
532 }
533 }
534 return Ok(None);
535 }
536
537 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
538 self.loading_remote_buffers_by_id
539 .keys()
540 .copied()
541 .collect::<Vec<_>>()
542 }
543
544 pub fn deserialize_project_transaction(
545 &self,
546 message: proto::ProjectTransaction,
547 push_to_history: bool,
548 cx: &mut Context<BufferStore>,
549 ) -> Task<Result<ProjectTransaction>> {
550 cx.spawn(|this, mut cx| async move {
551 let mut project_transaction = ProjectTransaction::default();
552 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
553 {
554 let buffer_id = BufferId::new(buffer_id)?;
555 let buffer = this
556 .update(&mut cx, |this, cx| {
557 this.wait_for_remote_buffer(buffer_id, cx)
558 })?
559 .await?;
560 let transaction = language::proto::deserialize_transaction(transaction)?;
561 project_transaction.0.insert(buffer, transaction);
562 }
563
564 for (buffer, transaction) in &project_transaction.0 {
565 buffer
566 .update(&mut cx, |buffer, _| {
567 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
568 })?
569 .await?;
570
571 if push_to_history {
572 buffer.update(&mut cx, |buffer, _| {
573 buffer.push_transaction(transaction.clone(), Instant::now());
574 })?;
575 }
576 }
577
578 Ok(project_transaction)
579 })
580 }
581
582 fn open_buffer(
583 &self,
584 path: Arc<Path>,
585 worktree: Entity<Worktree>,
586 cx: &mut Context<BufferStore>,
587 ) -> Task<Result<Entity<Buffer>>> {
588 let worktree_id = worktree.read(cx).id().to_proto();
589 let project_id = self.project_id;
590 let client = self.upstream_client.clone();
591 cx.spawn(move |this, mut cx| async move {
592 let response = client
593 .request(proto::OpenBufferByPath {
594 project_id,
595 worktree_id,
596 path: path.to_proto(),
597 })
598 .await?;
599 let buffer_id = BufferId::new(response.buffer_id)?;
600
601 let buffer = this
602 .update(&mut cx, {
603 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
604 })?
605 .await?;
606
607 Ok(buffer)
608 })
609 }
610
611 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
612 let create = self.upstream_client.request(proto::OpenNewBuffer {
613 project_id: self.project_id,
614 });
615 cx.spawn(|this, mut cx| async move {
616 let response = create.await?;
617 let buffer_id = BufferId::new(response.buffer_id)?;
618
619 this.update(&mut cx, |this, cx| {
620 this.wait_for_remote_buffer(buffer_id, cx)
621 })?
622 .await
623 })
624 }
625
626 fn reload_buffers(
627 &self,
628 buffers: HashSet<Entity<Buffer>>,
629 push_to_history: bool,
630 cx: &mut Context<BufferStore>,
631 ) -> Task<Result<ProjectTransaction>> {
632 let request = self.upstream_client.request(proto::ReloadBuffers {
633 project_id: self.project_id,
634 buffer_ids: buffers
635 .iter()
636 .map(|buffer| buffer.read(cx).remote_id().to_proto())
637 .collect(),
638 });
639
640 cx.spawn(|this, mut cx| async move {
641 let response = request
642 .await?
643 .transaction
644 .ok_or_else(|| anyhow!("missing transaction"))?;
645 this.update(&mut cx, |this, cx| {
646 this.deserialize_project_transaction(response, push_to_history, cx)
647 })?
648 .await
649 })
650 }
651}
652
653impl LocalBufferStore {
654 fn worktree_for_buffer(
655 &self,
656 buffer: &Entity<Buffer>,
657 cx: &App,
658 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
659 let file = buffer.read(cx).file()?;
660 let worktree_id = file.worktree_id(cx);
661 let path = file.path().clone();
662 let worktree = self
663 .worktree_store
664 .read(cx)
665 .worktree_for_id(worktree_id, cx)?;
666 Some((worktree, path))
667 }
668
669 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
670 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
671 worktree.read(cx).load_staged_file(path.as_ref(), cx)
672 } else {
673 return Task::ready(Err(anyhow!("no such worktree")));
674 }
675 }
676
677 fn load_committed_text(
678 &self,
679 buffer: &Entity<Buffer>,
680 cx: &App,
681 ) -> Task<Result<Option<String>>> {
682 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
683 worktree.read(cx).load_committed_file(path.as_ref(), cx)
684 } else {
685 Task::ready(Err(anyhow!("no such worktree")))
686 }
687 }
688
689 fn save_local_buffer(
690 &self,
691 buffer_handle: Entity<Buffer>,
692 worktree: Entity<Worktree>,
693 path: Arc<Path>,
694 mut has_changed_file: bool,
695 cx: &mut Context<BufferStore>,
696 ) -> Task<Result<()>> {
697 let buffer = buffer_handle.read(cx);
698
699 let text = buffer.as_rope().clone();
700 let line_ending = buffer.line_ending();
701 let version = buffer.version();
702 let buffer_id = buffer.remote_id();
703 if buffer
704 .file()
705 .is_some_and(|file| file.disk_state() == DiskState::New)
706 {
707 has_changed_file = true;
708 }
709
710 let save = worktree.update(cx, |worktree, cx| {
711 worktree.write_file(path.as_ref(), text, line_ending, cx)
712 });
713
714 cx.spawn(move |this, mut cx| async move {
715 let new_file = save.await?;
716 let mtime = new_file.disk_state().mtime();
717 this.update(&mut cx, |this, cx| {
718 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
719 if has_changed_file {
720 downstream_client
721 .send(proto::UpdateBufferFile {
722 project_id,
723 buffer_id: buffer_id.to_proto(),
724 file: Some(language::File::to_proto(&*new_file, cx)),
725 })
726 .log_err();
727 }
728 downstream_client
729 .send(proto::BufferSaved {
730 project_id,
731 buffer_id: buffer_id.to_proto(),
732 version: serialize_version(&version),
733 mtime: mtime.map(|time| time.into()),
734 })
735 .log_err();
736 }
737 })?;
738 buffer_handle.update(&mut cx, |buffer, cx| {
739 if has_changed_file {
740 buffer.file_updated(new_file, cx);
741 }
742 buffer.did_save(version.clone(), mtime, cx);
743 })
744 })
745 }
746
747 fn subscribe_to_worktree(
748 &mut self,
749 worktree: &Entity<Worktree>,
750 cx: &mut Context<BufferStore>,
751 ) {
752 cx.subscribe(worktree, |this, worktree, event, cx| {
753 if worktree.read(cx).is_local() {
754 match event {
755 worktree::Event::UpdatedEntries(changes) => {
756 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
757 }
758 worktree::Event::UpdatedGitRepositories(updated_repos) => {
759 Self::local_worktree_git_repos_changed(
760 this,
761 worktree.clone(),
762 updated_repos,
763 cx,
764 )
765 }
766 _ => {}
767 }
768 }
769 })
770 .detach();
771 }
772
773 fn local_worktree_entries_changed(
774 this: &mut BufferStore,
775 worktree_handle: &Entity<Worktree>,
776 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
777 cx: &mut Context<BufferStore>,
778 ) {
779 let snapshot = worktree_handle.read(cx).snapshot();
780 for (path, entry_id, _) in changes {
781 Self::local_worktree_entry_changed(
782 this,
783 *entry_id,
784 path,
785 worktree_handle,
786 &snapshot,
787 cx,
788 );
789 }
790 }
791
792 fn local_worktree_git_repos_changed(
793 this: &mut BufferStore,
794 worktree_handle: Entity<Worktree>,
795 changed_repos: &UpdatedGitRepositoriesSet,
796 cx: &mut Context<BufferStore>,
797 ) {
798 debug_assert!(worktree_handle.read(cx).is_local());
799
800 let mut diff_state_updates = Vec::new();
801 for buffer in this.opened_buffers.values() {
802 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
803 continue;
804 };
805 let Some(buffer) = buffer.upgrade() else {
806 continue;
807 };
808 let buffer = buffer.read(cx);
809 let Some(file) = File::from_dyn(buffer.file()) else {
810 continue;
811 };
812 if file.worktree != worktree_handle {
813 continue;
814 }
815 let diff_state = diff_state.read(cx);
816 if changed_repos
817 .iter()
818 .any(|(work_dir, _)| file.path.starts_with(work_dir))
819 {
820 let snapshot = buffer.text_snapshot();
821 diff_state_updates.push((
822 snapshot.clone(),
823 file.path.clone(),
824 diff_state
825 .unstaged_diff
826 .as_ref()
827 .and_then(|set| set.upgrade())
828 .is_some(),
829 diff_state
830 .uncommitted_diff
831 .as_ref()
832 .and_then(|set| set.upgrade())
833 .is_some(),
834 ))
835 }
836 }
837
838 if diff_state_updates.is_empty() {
839 return;
840 }
841
842 cx.spawn(move |this, mut cx| async move {
843 let snapshot =
844 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
845 let diff_bases_changes_by_buffer = cx
846 .background_executor()
847 .spawn(async move {
848 diff_state_updates
849 .into_iter()
850 .filter_map(
851 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
852 let local_repo = snapshot.local_repo_for_path(&path)?;
853 let relative_path = local_repo.relativize(&path).ok()?;
854 let staged_text = if needs_staged_text {
855 local_repo.repo().load_index_text(&relative_path)
856 } else {
857 None
858 };
859 let committed_text = if needs_committed_text {
860 local_repo.repo().load_committed_text(&relative_path)
861 } else {
862 None
863 };
864 let diff_bases_change =
865 match (needs_staged_text, needs_committed_text) {
866 (true, true) => Some(if staged_text == committed_text {
867 DiffBasesChange::SetBoth(committed_text)
868 } else {
869 DiffBasesChange::SetEach {
870 index: staged_text,
871 head: committed_text,
872 }
873 }),
874 (true, false) => {
875 Some(DiffBasesChange::SetIndex(staged_text))
876 }
877 (false, true) => {
878 Some(DiffBasesChange::SetHead(committed_text))
879 }
880 (false, false) => None,
881 };
882 Some((buffer_snapshot, diff_bases_change))
883 },
884 )
885 .collect::<Vec<_>>()
886 })
887 .await;
888
889 this.update(&mut cx, |this, cx| {
890 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
891 let Some(OpenBuffer::Complete { diff_state, .. }) =
892 this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
893 else {
894 continue;
895 };
896 let Some(diff_bases_change) = diff_bases_change else {
897 continue;
898 };
899
900 diff_state.update(cx, |diff_state, cx| {
901 use proto::update_diff_bases::Mode;
902
903 if let Some((client, project_id)) = this.downstream_client.as_ref() {
904 let buffer_id = buffer_snapshot.remote_id().to_proto();
905 let (staged_text, committed_text, mode) = match diff_bases_change
906 .clone()
907 {
908 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
909 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
910 DiffBasesChange::SetEach { index, head } => {
911 (index, head, Mode::IndexAndHead)
912 }
913 DiffBasesChange::SetBoth(text) => {
914 (None, text, Mode::IndexMatchesHead)
915 }
916 };
917 let message = proto::UpdateDiffBases {
918 project_id: *project_id,
919 buffer_id,
920 staged_text,
921 committed_text,
922 mode: mode as i32,
923 };
924
925 client.send(message).log_err();
926 }
927
928 let _ =
929 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
930 });
931 }
932 })
933 })
934 .detach_and_log_err(cx);
935 }
936
937 fn local_worktree_entry_changed(
938 this: &mut BufferStore,
939 entry_id: ProjectEntryId,
940 path: &Arc<Path>,
941 worktree: &Entity<worktree::Worktree>,
942 snapshot: &worktree::Snapshot,
943 cx: &mut Context<BufferStore>,
944 ) -> Option<()> {
945 let project_path = ProjectPath {
946 worktree_id: snapshot.id(),
947 path: path.clone(),
948 };
949
950 let buffer_id = {
951 let local = this.as_local_mut()?;
952 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
953 Some(&buffer_id) => buffer_id,
954 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
955 }
956 };
957
958 let buffer = if let Some(buffer) = this.get(buffer_id) {
959 Some(buffer)
960 } else {
961 this.opened_buffers.remove(&buffer_id);
962 None
963 };
964
965 let buffer = if let Some(buffer) = buffer {
966 buffer
967 } else {
968 let this = this.as_local_mut()?;
969 this.local_buffer_ids_by_path.remove(&project_path);
970 this.local_buffer_ids_by_entry_id.remove(&entry_id);
971 return None;
972 };
973
974 let events = buffer.update(cx, |buffer, cx| {
975 let local = this.as_local_mut()?;
976 let file = buffer.file()?;
977 let old_file = File::from_dyn(Some(file))?;
978 if old_file.worktree != *worktree {
979 return None;
980 }
981
982 let snapshot_entry = old_file
983 .entry_id
984 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
985 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
986
987 let new_file = if let Some(entry) = snapshot_entry {
988 File {
989 disk_state: match entry.mtime {
990 Some(mtime) => DiskState::Present { mtime },
991 None => old_file.disk_state,
992 },
993 is_local: true,
994 entry_id: Some(entry.id),
995 path: entry.path.clone(),
996 worktree: worktree.clone(),
997 is_private: entry.is_private,
998 }
999 } else {
1000 File {
1001 disk_state: DiskState::Deleted,
1002 is_local: true,
1003 entry_id: old_file.entry_id,
1004 path: old_file.path.clone(),
1005 worktree: worktree.clone(),
1006 is_private: old_file.is_private,
1007 }
1008 };
1009
1010 if new_file == *old_file {
1011 return None;
1012 }
1013
1014 let mut events = Vec::new();
1015 if new_file.path != old_file.path {
1016 local.local_buffer_ids_by_path.remove(&ProjectPath {
1017 path: old_file.path.clone(),
1018 worktree_id: old_file.worktree_id(cx),
1019 });
1020 local.local_buffer_ids_by_path.insert(
1021 ProjectPath {
1022 worktree_id: new_file.worktree_id(cx),
1023 path: new_file.path.clone(),
1024 },
1025 buffer_id,
1026 );
1027 events.push(BufferStoreEvent::BufferChangedFilePath {
1028 buffer: cx.entity(),
1029 old_file: buffer.file().cloned(),
1030 });
1031 }
1032
1033 if new_file.entry_id != old_file.entry_id {
1034 if let Some(entry_id) = old_file.entry_id {
1035 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1036 }
1037 if let Some(entry_id) = new_file.entry_id {
1038 local
1039 .local_buffer_ids_by_entry_id
1040 .insert(entry_id, buffer_id);
1041 }
1042 }
1043
1044 if let Some((client, project_id)) = &this.downstream_client {
1045 client
1046 .send(proto::UpdateBufferFile {
1047 project_id: *project_id,
1048 buffer_id: buffer_id.to_proto(),
1049 file: Some(new_file.to_proto(cx)),
1050 })
1051 .ok();
1052 }
1053
1054 buffer.file_updated(Arc::new(new_file), cx);
1055 Some(events)
1056 })?;
1057
1058 for event in events {
1059 cx.emit(event);
1060 }
1061
1062 None
1063 }
1064
1065 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1066 let file = File::from_dyn(buffer.read(cx).file())?;
1067
1068 let remote_id = buffer.read(cx).remote_id();
1069 if let Some(entry_id) = file.entry_id {
1070 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1071 Some(_) => {
1072 return None;
1073 }
1074 None => {
1075 self.local_buffer_ids_by_entry_id
1076 .insert(entry_id, remote_id);
1077 }
1078 }
1079 };
1080 self.local_buffer_ids_by_path.insert(
1081 ProjectPath {
1082 worktree_id: file.worktree_id(cx),
1083 path: file.path.clone(),
1084 },
1085 remote_id,
1086 );
1087
1088 Some(())
1089 }
1090
1091 fn save_buffer(
1092 &self,
1093 buffer: Entity<Buffer>,
1094 cx: &mut Context<BufferStore>,
1095 ) -> Task<Result<()>> {
1096 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1097 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1098 };
1099 let worktree = file.worktree.clone();
1100 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1101 }
1102
1103 fn save_buffer_as(
1104 &self,
1105 buffer: Entity<Buffer>,
1106 path: ProjectPath,
1107 cx: &mut Context<BufferStore>,
1108 ) -> Task<Result<()>> {
1109 let Some(worktree) = self
1110 .worktree_store
1111 .read(cx)
1112 .worktree_for_id(path.worktree_id, cx)
1113 else {
1114 return Task::ready(Err(anyhow!("no such worktree")));
1115 };
1116 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1117 }
1118
1119 fn open_buffer(
1120 &self,
1121 path: Arc<Path>,
1122 worktree: Entity<Worktree>,
1123 cx: &mut Context<BufferStore>,
1124 ) -> Task<Result<Entity<Buffer>>> {
1125 let load_buffer = worktree.update(cx, |worktree, cx| {
1126 let load_file = worktree.load_file(path.as_ref(), cx);
1127 let reservation = cx.reserve_entity();
1128 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1129 cx.spawn(move |_, mut cx| async move {
1130 let loaded = load_file.await?;
1131 let text_buffer = cx
1132 .background_executor()
1133 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1134 .await;
1135 cx.insert_entity(reservation, |_| {
1136 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1137 })
1138 })
1139 });
1140
1141 cx.spawn(move |this, mut cx| async move {
1142 let buffer = match load_buffer.await {
1143 Ok(buffer) => Ok(buffer),
1144 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1145 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1146 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1147 Buffer::build(
1148 text_buffer,
1149 Some(Arc::new(File {
1150 worktree,
1151 path,
1152 disk_state: DiskState::New,
1153 entry_id: None,
1154 is_local: true,
1155 is_private: false,
1156 })),
1157 Capability::ReadWrite,
1158 )
1159 }),
1160 Err(e) => Err(e),
1161 }?;
1162 this.update(&mut cx, |this, cx| {
1163 this.add_buffer(buffer.clone(), cx)?;
1164 let buffer_id = buffer.read(cx).remote_id();
1165 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1166 let this = this.as_local_mut().unwrap();
1167 this.local_buffer_ids_by_path.insert(
1168 ProjectPath {
1169 worktree_id: file.worktree_id(cx),
1170 path: file.path.clone(),
1171 },
1172 buffer_id,
1173 );
1174
1175 if let Some(entry_id) = file.entry_id {
1176 this.local_buffer_ids_by_entry_id
1177 .insert(entry_id, buffer_id);
1178 }
1179 }
1180
1181 anyhow::Ok(())
1182 })??;
1183
1184 Ok(buffer)
1185 })
1186 }
1187
1188 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1189 cx.spawn(|buffer_store, mut cx| async move {
1190 let buffer =
1191 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1192 buffer_store.update(&mut cx, |buffer_store, cx| {
1193 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1194 })?;
1195 Ok(buffer)
1196 })
1197 }
1198
1199 fn reload_buffers(
1200 &self,
1201 buffers: HashSet<Entity<Buffer>>,
1202 push_to_history: bool,
1203 cx: &mut Context<BufferStore>,
1204 ) -> Task<Result<ProjectTransaction>> {
1205 cx.spawn(move |_, mut cx| async move {
1206 let mut project_transaction = ProjectTransaction::default();
1207 for buffer in buffers {
1208 let transaction = buffer
1209 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1210 .await?;
1211 buffer.update(&mut cx, |buffer, cx| {
1212 if let Some(transaction) = transaction {
1213 if !push_to_history {
1214 buffer.forget_transaction(transaction.id);
1215 }
1216 project_transaction.0.insert(cx.entity(), transaction);
1217 }
1218 })?;
1219 }
1220
1221 Ok(project_transaction)
1222 })
1223 }
1224}
1225
1226impl BufferStore {
1227 pub fn init(client: &AnyProtoClient) {
1228 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1229 client.add_entity_message_handler(Self::handle_buffer_saved);
1230 client.add_entity_message_handler(Self::handle_update_buffer_file);
1231 client.add_entity_request_handler(Self::handle_save_buffer);
1232 client.add_entity_request_handler(Self::handle_blame_buffer);
1233 client.add_entity_request_handler(Self::handle_reload_buffers);
1234 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1235 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1236 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1237 client.add_entity_message_handler(Self::handle_update_diff_bases);
1238 }
1239
1240 /// Creates a buffer store, optionally retaining its buffers.
1241 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1242 Self {
1243 state: BufferStoreState::Local(LocalBufferStore {
1244 local_buffer_ids_by_path: Default::default(),
1245 local_buffer_ids_by_entry_id: Default::default(),
1246 worktree_store: worktree_store.clone(),
1247 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1248 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1249 let this = this.as_local_mut().unwrap();
1250 this.subscribe_to_worktree(worktree, cx);
1251 }
1252 }),
1253 }),
1254 downstream_client: None,
1255 opened_buffers: Default::default(),
1256 shared_buffers: Default::default(),
1257 loading_buffers: Default::default(),
1258 loading_diffs: Default::default(),
1259 worktree_store,
1260 }
1261 }
1262
1263 pub fn remote(
1264 worktree_store: Entity<WorktreeStore>,
1265 upstream_client: AnyProtoClient,
1266 remote_id: u64,
1267 _cx: &mut Context<Self>,
1268 ) -> Self {
1269 Self {
1270 state: BufferStoreState::Remote(RemoteBufferStore {
1271 shared_with_me: Default::default(),
1272 loading_remote_buffers_by_id: Default::default(),
1273 remote_buffer_listeners: Default::default(),
1274 project_id: remote_id,
1275 upstream_client,
1276 worktree_store: worktree_store.clone(),
1277 }),
1278 downstream_client: None,
1279 opened_buffers: Default::default(),
1280 loading_buffers: Default::default(),
1281 loading_diffs: Default::default(),
1282 shared_buffers: Default::default(),
1283 worktree_store,
1284 }
1285 }
1286
1287 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1288 match &mut self.state {
1289 BufferStoreState::Local(state) => Some(state),
1290 _ => None,
1291 }
1292 }
1293
1294 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1295 match &mut self.state {
1296 BufferStoreState::Remote(state) => Some(state),
1297 _ => None,
1298 }
1299 }
1300
1301 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1302 match &self.state {
1303 BufferStoreState::Remote(state) => Some(state),
1304 _ => None,
1305 }
1306 }
1307
1308 pub fn open_buffer(
1309 &mut self,
1310 project_path: ProjectPath,
1311 cx: &mut Context<Self>,
1312 ) -> Task<Result<Entity<Buffer>>> {
1313 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1314 return Task::ready(Ok(buffer));
1315 }
1316
1317 let task = match self.loading_buffers.entry(project_path.clone()) {
1318 hash_map::Entry::Occupied(e) => e.get().clone(),
1319 hash_map::Entry::Vacant(entry) => {
1320 let path = project_path.path.clone();
1321 let Some(worktree) = self
1322 .worktree_store
1323 .read(cx)
1324 .worktree_for_id(project_path.worktree_id, cx)
1325 else {
1326 return Task::ready(Err(anyhow!("no such worktree")));
1327 };
1328 let load_buffer = match &self.state {
1329 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1330 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1331 };
1332
1333 entry
1334 .insert(
1335 cx.spawn(move |this, mut cx| async move {
1336 let load_result = load_buffer.await;
1337 this.update(&mut cx, |this, _cx| {
1338 // Record the fact that the buffer is no longer loading.
1339 this.loading_buffers.remove(&project_path);
1340 })
1341 .ok();
1342 load_result.map_err(Arc::new)
1343 })
1344 .shared(),
1345 )
1346 .clone()
1347 }
1348 };
1349
1350 cx.background_executor()
1351 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1352 }
1353
1354 pub fn open_unstaged_diff(
1355 &mut self,
1356 buffer: Entity<Buffer>,
1357 cx: &mut Context<Self>,
1358 ) -> Task<Result<Entity<BufferDiff>>> {
1359 let buffer_id = buffer.read(cx).remote_id();
1360 if let Some(diff) = self.get_unstaged_diff(buffer_id, cx) {
1361 return Task::ready(Ok(diff));
1362 }
1363
1364 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1365 hash_map::Entry::Occupied(e) => e.get().clone(),
1366 hash_map::Entry::Vacant(entry) => {
1367 let staged_text = match &self.state {
1368 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1369 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1370 };
1371
1372 entry
1373 .insert(
1374 cx.spawn(move |this, cx| async move {
1375 Self::open_diff_internal(
1376 this,
1377 DiffKind::Unstaged,
1378 staged_text.await.map(DiffBasesChange::SetIndex),
1379 buffer,
1380 cx,
1381 )
1382 .await
1383 .map_err(Arc::new)
1384 })
1385 .shared(),
1386 )
1387 .clone()
1388 }
1389 };
1390
1391 cx.background_executor()
1392 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1393 }
1394
1395 pub fn open_uncommitted_diff(
1396 &mut self,
1397 buffer: Entity<Buffer>,
1398 cx: &mut Context<Self>,
1399 ) -> Task<Result<Entity<BufferDiff>>> {
1400 let buffer_id = buffer.read(cx).remote_id();
1401 if let Some(diff) = self.get_uncommitted_diff(buffer_id, cx) {
1402 return Task::ready(Ok(diff));
1403 }
1404
1405 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1406 hash_map::Entry::Occupied(e) => e.get().clone(),
1407 hash_map::Entry::Vacant(entry) => {
1408 let changes = match &self.state {
1409 BufferStoreState::Local(this) => {
1410 let committed_text = this.load_committed_text(&buffer, cx);
1411 let staged_text = this.load_staged_text(&buffer, cx);
1412 cx.background_executor().spawn(async move {
1413 let committed_text = committed_text.await?;
1414 let staged_text = staged_text.await?;
1415 let diff_bases_change = if committed_text == staged_text {
1416 DiffBasesChange::SetBoth(committed_text)
1417 } else {
1418 DiffBasesChange::SetEach {
1419 index: staged_text,
1420 head: committed_text,
1421 }
1422 };
1423 Ok(diff_bases_change)
1424 })
1425 }
1426 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1427 };
1428
1429 entry
1430 .insert(
1431 cx.spawn(move |this, cx| async move {
1432 Self::open_diff_internal(
1433 this,
1434 DiffKind::Uncommitted,
1435 changes.await,
1436 buffer,
1437 cx,
1438 )
1439 .await
1440 .map_err(Arc::new)
1441 })
1442 .shared(),
1443 )
1444 .clone()
1445 }
1446 };
1447
1448 cx.background_executor()
1449 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1450 }
1451
1452 async fn open_diff_internal(
1453 this: WeakEntity<Self>,
1454 kind: DiffKind,
1455 texts: Result<DiffBasesChange>,
1456 buffer_entity: Entity<Buffer>,
1457 mut cx: AsyncApp,
1458 ) -> Result<Entity<BufferDiff>> {
1459 let diff_bases_change = match texts {
1460 Err(e) => {
1461 this.update(&mut cx, |this, cx| {
1462 let buffer = buffer_entity.read(cx);
1463 let buffer_id = buffer.remote_id();
1464 this.loading_diffs.remove(&(buffer_id, kind));
1465 })?;
1466 return Err(e);
1467 }
1468 Ok(change) => change,
1469 };
1470
1471 this.update(&mut cx, |this, cx| {
1472 let buffer = buffer_entity.read(cx);
1473 let buffer_id = buffer.remote_id();
1474 let language = buffer.language().cloned();
1475 let language_registry = buffer.language_registry();
1476 let text_snapshot = buffer.text_snapshot();
1477 this.loading_diffs.remove(&(buffer_id, kind));
1478
1479 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1480 this.opened_buffers.get_mut(&buffer_id)
1481 {
1482 diff_state.update(cx, |diff_state, cx| {
1483 diff_state.language = language;
1484 diff_state.language_registry = language_registry;
1485
1486 let diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1487 match kind {
1488 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1489 DiffKind::Uncommitted => {
1490 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1491 diff
1492 } else {
1493 let unstaged_diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1494 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1495 unstaged_diff
1496 };
1497
1498 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
1499 diff_state.uncommitted_diff = Some(diff.downgrade())
1500 }
1501 };
1502
1503 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1504
1505 Ok(async move {
1506 rx.await.ok();
1507 Ok(diff)
1508 })
1509 })
1510 } else {
1511 Err(anyhow!("buffer was closed"))
1512 }
1513 })??
1514 .await
1515 }
1516
1517 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1518 match &self.state {
1519 BufferStoreState::Local(this) => this.create_buffer(cx),
1520 BufferStoreState::Remote(this) => this.create_buffer(cx),
1521 }
1522 }
1523
1524 pub fn save_buffer(
1525 &mut self,
1526 buffer: Entity<Buffer>,
1527 cx: &mut Context<Self>,
1528 ) -> Task<Result<()>> {
1529 match &mut self.state {
1530 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1531 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1532 }
1533 }
1534
1535 pub fn save_buffer_as(
1536 &mut self,
1537 buffer: Entity<Buffer>,
1538 path: ProjectPath,
1539 cx: &mut Context<Self>,
1540 ) -> Task<Result<()>> {
1541 let old_file = buffer.read(cx).file().cloned();
1542 let task = match &self.state {
1543 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1544 BufferStoreState::Remote(this) => {
1545 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1546 }
1547 };
1548 cx.spawn(|this, mut cx| async move {
1549 task.await?;
1550 this.update(&mut cx, |_, cx| {
1551 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1552 })
1553 })
1554 }
1555
1556 pub fn blame_buffer(
1557 &self,
1558 buffer: &Entity<Buffer>,
1559 version: Option<clock::Global>,
1560 cx: &App,
1561 ) -> Task<Result<Option<Blame>>> {
1562 let buffer = buffer.read(cx);
1563 let Some(file) = File::from_dyn(buffer.file()) else {
1564 return Task::ready(Err(anyhow!("buffer has no file")));
1565 };
1566
1567 match file.worktree.clone().read(cx) {
1568 Worktree::Local(worktree) => {
1569 let worktree = worktree.snapshot();
1570 let blame_params = maybe!({
1571 let local_repo = match worktree.local_repo_for_path(&file.path) {
1572 Some(repo_for_path) => repo_for_path,
1573 None => return Ok(None),
1574 };
1575
1576 let relative_path = local_repo
1577 .relativize(&file.path)
1578 .context("failed to relativize buffer path")?;
1579
1580 let repo = local_repo.repo().clone();
1581
1582 let content = match version {
1583 Some(version) => buffer.rope_for_version(&version).clone(),
1584 None => buffer.as_rope().clone(),
1585 };
1586
1587 anyhow::Ok(Some((repo, relative_path, content)))
1588 });
1589
1590 cx.background_executor().spawn(async move {
1591 let Some((repo, relative_path, content)) = blame_params? else {
1592 return Ok(None);
1593 };
1594 repo.blame(&relative_path, content)
1595 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1596 .map(Some)
1597 })
1598 }
1599 Worktree::Remote(worktree) => {
1600 let buffer_id = buffer.remote_id();
1601 let version = buffer.version();
1602 let project_id = worktree.project_id();
1603 let client = worktree.client();
1604 cx.spawn(|_| async move {
1605 let response = client
1606 .request(proto::BlameBuffer {
1607 project_id,
1608 buffer_id: buffer_id.into(),
1609 version: serialize_version(&version),
1610 })
1611 .await?;
1612 Ok(deserialize_blame_buffer_response(response))
1613 })
1614 }
1615 }
1616 }
1617
1618 pub fn get_permalink_to_line(
1619 &self,
1620 buffer: &Entity<Buffer>,
1621 selection: Range<u32>,
1622 cx: &App,
1623 ) -> Task<Result<url::Url>> {
1624 let buffer = buffer.read(cx);
1625 let Some(file) = File::from_dyn(buffer.file()) else {
1626 return Task::ready(Err(anyhow!("buffer has no file")));
1627 };
1628
1629 match file.worktree.read(cx) {
1630 Worktree::Local(worktree) => {
1631 let worktree_path = worktree.abs_path().clone();
1632 let Some((repo_entry, repo)) =
1633 worktree.repository_for_path(file.path()).and_then(|entry| {
1634 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1635 Some((entry, repo))
1636 })
1637 else {
1638 // If we're not in a Git repo, check whether this is a Rust source
1639 // file in the Cargo registry (presumably opened with go-to-definition
1640 // from a normal Rust file). If so, we can put together a permalink
1641 // using crate metadata.
1642 if !buffer
1643 .language()
1644 .is_some_and(|lang| lang.name() == "Rust".into())
1645 {
1646 return Task::ready(Err(anyhow!("no permalink available")));
1647 }
1648 let file_path = worktree_path.join(file.path());
1649 return cx.spawn(|cx| async move {
1650 let provider_registry =
1651 cx.update(GitHostingProviderRegistry::default_global)?;
1652 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1653 .map_err(|_| anyhow!("no permalink available"))
1654 });
1655 };
1656
1657 let path = match repo_entry.relativize(file.path()) {
1658 Ok(RepoPath(path)) => path,
1659 Err(e) => return Task::ready(Err(e)),
1660 };
1661
1662 cx.spawn(|cx| async move {
1663 const REMOTE_NAME: &str = "origin";
1664 let origin_url = repo
1665 .remote_url(REMOTE_NAME)
1666 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1667
1668 let sha = repo
1669 .head_sha()
1670 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1671
1672 let provider_registry =
1673 cx.update(GitHostingProviderRegistry::default_global)?;
1674
1675 let (provider, remote) =
1676 parse_git_remote_url(provider_registry, &origin_url)
1677 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1678
1679 let path = path
1680 .to_str()
1681 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1682
1683 Ok(provider.build_permalink(
1684 remote,
1685 BuildPermalinkParams {
1686 sha: &sha,
1687 path,
1688 selection: Some(selection),
1689 },
1690 ))
1691 })
1692 }
1693 Worktree::Remote(worktree) => {
1694 let buffer_id = buffer.remote_id();
1695 let project_id = worktree.project_id();
1696 let client = worktree.client();
1697 cx.spawn(|_| async move {
1698 let response = client
1699 .request(proto::GetPermalinkToLine {
1700 project_id,
1701 buffer_id: buffer_id.into(),
1702 selection: Some(proto::Range {
1703 start: selection.start as u64,
1704 end: selection.end as u64,
1705 }),
1706 })
1707 .await?;
1708
1709 url::Url::parse(&response.permalink).context("failed to parse permalink")
1710 })
1711 }
1712 }
1713 }
1714
1715 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1716 let buffer = buffer_entity.read(cx);
1717 let language = buffer.language().cloned();
1718 let language_registry = buffer.language_registry();
1719 let remote_id = buffer.remote_id();
1720 let is_remote = buffer.replica_id() != 0;
1721 let open_buffer = OpenBuffer::Complete {
1722 buffer: buffer_entity.downgrade(),
1723 diff_state: cx.new(|_| BufferDiffState {
1724 language,
1725 language_registry,
1726 ..Default::default()
1727 }),
1728 };
1729
1730 let handle = cx.entity().downgrade();
1731 buffer_entity.update(cx, move |_, cx| {
1732 cx.on_release(move |buffer, cx| {
1733 handle
1734 .update(cx, |_, cx| {
1735 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1736 })
1737 .ok();
1738 })
1739 .detach()
1740 });
1741
1742 match self.opened_buffers.entry(remote_id) {
1743 hash_map::Entry::Vacant(entry) => {
1744 entry.insert(open_buffer);
1745 }
1746 hash_map::Entry::Occupied(mut entry) => {
1747 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1748 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1749 } else if entry.get().upgrade().is_some() {
1750 if is_remote {
1751 return Ok(());
1752 } else {
1753 debug_panic!("buffer {} was already registered", remote_id);
1754 Err(anyhow!("buffer {} was already registered", remote_id))?;
1755 }
1756 }
1757 entry.insert(open_buffer);
1758 }
1759 }
1760
1761 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1762 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1763 Ok(())
1764 }
1765
1766 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1767 self.opened_buffers
1768 .values()
1769 .filter_map(|buffer| buffer.upgrade())
1770 }
1771
1772 pub fn loading_buffers(
1773 &self,
1774 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1775 self.loading_buffers.iter().map(|(path, task)| {
1776 let task = task.clone();
1777 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1778 })
1779 }
1780
1781 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1782 self.buffers().find_map(|buffer| {
1783 let file = File::from_dyn(buffer.read(cx).file())?;
1784 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1785 Some(buffer)
1786 } else {
1787 None
1788 }
1789 })
1790 }
1791
1792 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1793 self.opened_buffers.get(&buffer_id)?.upgrade()
1794 }
1795
1796 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1797 self.get(buffer_id)
1798 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1799 }
1800
1801 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1802 self.get(buffer_id).or_else(|| {
1803 self.as_remote()
1804 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1805 })
1806 }
1807
1808 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1809 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1810 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1811 } else {
1812 None
1813 }
1814 }
1815
1816 pub fn get_uncommitted_diff(
1817 &self,
1818 buffer_id: BufferId,
1819 cx: &App,
1820 ) -> Option<Entity<BufferDiff>> {
1821 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1822 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1823 } else {
1824 None
1825 }
1826 }
1827
1828 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1829 let buffers = self
1830 .buffers()
1831 .map(|buffer| {
1832 let buffer = buffer.read(cx);
1833 proto::BufferVersion {
1834 id: buffer.remote_id().into(),
1835 version: language::proto::serialize_version(&buffer.version),
1836 }
1837 })
1838 .collect();
1839 let incomplete_buffer_ids = self
1840 .as_remote()
1841 .map(|remote| remote.incomplete_buffer_ids())
1842 .unwrap_or_default();
1843 (buffers, incomplete_buffer_ids)
1844 }
1845
1846 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1847 for open_buffer in self.opened_buffers.values_mut() {
1848 if let Some(buffer) = open_buffer.upgrade() {
1849 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1850 }
1851 }
1852
1853 for buffer in self.buffers() {
1854 buffer.update(cx, |buffer, cx| {
1855 buffer.set_capability(Capability::ReadOnly, cx)
1856 });
1857 }
1858
1859 if let Some(remote) = self.as_remote_mut() {
1860 // Wake up all futures currently waiting on a buffer to get opened,
1861 // to give them a chance to fail now that we've disconnected.
1862 remote.remote_buffer_listeners.clear()
1863 }
1864 }
1865
1866 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1867 self.downstream_client = Some((downstream_client, remote_id));
1868 }
1869
1870 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1871 self.downstream_client.take();
1872 self.forget_shared_buffers();
1873 }
1874
1875 pub fn discard_incomplete(&mut self) {
1876 self.opened_buffers
1877 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1878 }
1879
1880 pub fn find_search_candidates(
1881 &mut self,
1882 query: &SearchQuery,
1883 mut limit: usize,
1884 fs: Arc<dyn Fs>,
1885 cx: &mut Context<Self>,
1886 ) -> Receiver<Entity<Buffer>> {
1887 let (tx, rx) = smol::channel::unbounded();
1888 let mut open_buffers = HashSet::default();
1889 let mut unnamed_buffers = Vec::new();
1890 for handle in self.buffers() {
1891 let buffer = handle.read(cx);
1892 if let Some(entry_id) = buffer.entry_id(cx) {
1893 open_buffers.insert(entry_id);
1894 } else {
1895 limit = limit.saturating_sub(1);
1896 unnamed_buffers.push(handle)
1897 };
1898 }
1899
1900 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1901 let project_paths_rx = self
1902 .worktree_store
1903 .update(cx, |worktree_store, cx| {
1904 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1905 })
1906 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1907
1908 cx.spawn(|this, mut cx| async move {
1909 for buffer in unnamed_buffers {
1910 tx.send(buffer).await.ok();
1911 }
1912
1913 let mut project_paths_rx = pin!(project_paths_rx);
1914 while let Some(project_paths) = project_paths_rx.next().await {
1915 let buffers = this.update(&mut cx, |this, cx| {
1916 project_paths
1917 .into_iter()
1918 .map(|project_path| this.open_buffer(project_path, cx))
1919 .collect::<Vec<_>>()
1920 })?;
1921 for buffer_task in buffers {
1922 if let Some(buffer) = buffer_task.await.log_err() {
1923 if tx.send(buffer).await.is_err() {
1924 return anyhow::Ok(());
1925 }
1926 }
1927 }
1928 }
1929 anyhow::Ok(())
1930 })
1931 .detach();
1932 rx
1933 }
1934
1935 pub fn recalculate_buffer_diffs(
1936 &mut self,
1937 buffers: Vec<Entity<Buffer>>,
1938 cx: &mut Context<Self>,
1939 ) -> impl Future<Output = ()> {
1940 let mut futures = Vec::new();
1941 for buffer in buffers {
1942 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1943 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1944 {
1945 let buffer = buffer.read(cx).text_snapshot();
1946 futures.push(diff_state.update(cx, |diff_state, cx| {
1947 diff_state.recalculate_diffs(buffer, cx)
1948 }));
1949 }
1950 }
1951 async move {
1952 futures::future::join_all(futures).await;
1953 }
1954 }
1955
1956 fn on_buffer_event(
1957 &mut self,
1958 buffer: Entity<Buffer>,
1959 event: &BufferEvent,
1960 cx: &mut Context<Self>,
1961 ) {
1962 match event {
1963 BufferEvent::FileHandleChanged => {
1964 if let Some(local) = self.as_local_mut() {
1965 local.buffer_changed_file(buffer, cx);
1966 }
1967 }
1968 BufferEvent::Reloaded => {
1969 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1970 return;
1971 };
1972 let buffer = buffer.read(cx);
1973 downstream_client
1974 .send(proto::BufferReloaded {
1975 project_id: *project_id,
1976 buffer_id: buffer.remote_id().to_proto(),
1977 version: serialize_version(&buffer.version()),
1978 mtime: buffer.saved_mtime().map(|t| t.into()),
1979 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1980 })
1981 .log_err();
1982 }
1983 BufferEvent::LanguageChanged => {
1984 let buffer_id = buffer.read(cx).remote_id();
1985 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1986 self.opened_buffers.get(&buffer_id)
1987 {
1988 diff_state.update(cx, |diff_state, cx| {
1989 diff_state.buffer_language_changed(buffer, cx);
1990 });
1991 }
1992 }
1993 _ => {}
1994 }
1995 }
1996
1997 pub async fn handle_update_buffer(
1998 this: Entity<Self>,
1999 envelope: TypedEnvelope<proto::UpdateBuffer>,
2000 mut cx: AsyncApp,
2001 ) -> Result<proto::Ack> {
2002 let payload = envelope.payload.clone();
2003 let buffer_id = BufferId::new(payload.buffer_id)?;
2004 let ops = payload
2005 .operations
2006 .into_iter()
2007 .map(language::proto::deserialize_operation)
2008 .collect::<Result<Vec<_>, _>>()?;
2009 this.update(&mut cx, |this, cx| {
2010 match this.opened_buffers.entry(buffer_id) {
2011 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2012 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2013 OpenBuffer::Complete { buffer, .. } => {
2014 if let Some(buffer) = buffer.upgrade() {
2015 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2016 }
2017 }
2018 },
2019 hash_map::Entry::Vacant(e) => {
2020 e.insert(OpenBuffer::Operations(ops));
2021 }
2022 }
2023 Ok(proto::Ack {})
2024 })?
2025 }
2026
2027 pub fn register_shared_lsp_handle(
2028 &mut self,
2029 peer_id: proto::PeerId,
2030 buffer_id: BufferId,
2031 handle: OpenLspBufferHandle,
2032 ) {
2033 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2034 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2035 buffer.lsp_handle = Some(handle);
2036 return;
2037 }
2038 }
2039 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2040 }
2041
2042 pub fn handle_synchronize_buffers(
2043 &mut self,
2044 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2045 cx: &mut Context<Self>,
2046 client: Arc<Client>,
2047 ) -> Result<proto::SynchronizeBuffersResponse> {
2048 let project_id = envelope.payload.project_id;
2049 let mut response = proto::SynchronizeBuffersResponse {
2050 buffers: Default::default(),
2051 };
2052 let Some(guest_id) = envelope.original_sender_id else {
2053 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2054 };
2055
2056 self.shared_buffers.entry(guest_id).or_default().clear();
2057 for buffer in envelope.payload.buffers {
2058 let buffer_id = BufferId::new(buffer.id)?;
2059 let remote_version = language::proto::deserialize_version(&buffer.version);
2060 if let Some(buffer) = self.get(buffer_id) {
2061 self.shared_buffers
2062 .entry(guest_id)
2063 .or_default()
2064 .entry(buffer_id)
2065 .or_insert_with(|| SharedBuffer {
2066 buffer: buffer.clone(),
2067 diff: None,
2068 lsp_handle: None,
2069 });
2070
2071 let buffer = buffer.read(cx);
2072 response.buffers.push(proto::BufferVersion {
2073 id: buffer_id.into(),
2074 version: language::proto::serialize_version(&buffer.version),
2075 });
2076
2077 let operations = buffer.serialize_ops(Some(remote_version), cx);
2078 let client = client.clone();
2079 if let Some(file) = buffer.file() {
2080 client
2081 .send(proto::UpdateBufferFile {
2082 project_id,
2083 buffer_id: buffer_id.into(),
2084 file: Some(file.to_proto(cx)),
2085 })
2086 .log_err();
2087 }
2088
2089 // TODO(max): do something
2090 // client
2091 // .send(proto::UpdateStagedText {
2092 // project_id,
2093 // buffer_id: buffer_id.into(),
2094 // diff_base: buffer.diff_base().map(ToString::to_string),
2095 // })
2096 // .log_err();
2097
2098 client
2099 .send(proto::BufferReloaded {
2100 project_id,
2101 buffer_id: buffer_id.into(),
2102 version: language::proto::serialize_version(buffer.saved_version()),
2103 mtime: buffer.saved_mtime().map(|time| time.into()),
2104 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2105 as i32,
2106 })
2107 .log_err();
2108
2109 cx.background_executor()
2110 .spawn(
2111 async move {
2112 let operations = operations.await;
2113 for chunk in split_operations(operations) {
2114 client
2115 .request(proto::UpdateBuffer {
2116 project_id,
2117 buffer_id: buffer_id.into(),
2118 operations: chunk,
2119 })
2120 .await?;
2121 }
2122 anyhow::Ok(())
2123 }
2124 .log_err(),
2125 )
2126 .detach();
2127 }
2128 }
2129 Ok(response)
2130 }
2131
2132 pub fn handle_create_buffer_for_peer(
2133 &mut self,
2134 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2135 replica_id: u16,
2136 capability: Capability,
2137 cx: &mut Context<Self>,
2138 ) -> Result<()> {
2139 let Some(remote) = self.as_remote_mut() else {
2140 return Err(anyhow!("buffer store is not a remote"));
2141 };
2142
2143 if let Some(buffer) =
2144 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2145 {
2146 self.add_buffer(buffer, cx)?;
2147 }
2148
2149 Ok(())
2150 }
2151
2152 pub async fn handle_update_buffer_file(
2153 this: Entity<Self>,
2154 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2155 mut cx: AsyncApp,
2156 ) -> Result<()> {
2157 let buffer_id = envelope.payload.buffer_id;
2158 let buffer_id = BufferId::new(buffer_id)?;
2159
2160 this.update(&mut cx, |this, cx| {
2161 let payload = envelope.payload.clone();
2162 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2163 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2164 let worktree = this
2165 .worktree_store
2166 .read(cx)
2167 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2168 .ok_or_else(|| anyhow!("no such worktree"))?;
2169 let file = File::from_proto(file, worktree, cx)?;
2170 let old_file = buffer.update(cx, |buffer, cx| {
2171 let old_file = buffer.file().cloned();
2172 let new_path = file.path.clone();
2173 buffer.file_updated(Arc::new(file), cx);
2174 if old_file
2175 .as_ref()
2176 .map_or(true, |old| *old.path() != new_path)
2177 {
2178 Some(old_file)
2179 } else {
2180 None
2181 }
2182 });
2183 if let Some(old_file) = old_file {
2184 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2185 }
2186 }
2187 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2188 downstream_client
2189 .send(proto::UpdateBufferFile {
2190 project_id: *project_id,
2191 buffer_id: buffer_id.into(),
2192 file: envelope.payload.file,
2193 })
2194 .log_err();
2195 }
2196 Ok(())
2197 })?
2198 }
2199
2200 pub async fn handle_save_buffer(
2201 this: Entity<Self>,
2202 envelope: TypedEnvelope<proto::SaveBuffer>,
2203 mut cx: AsyncApp,
2204 ) -> Result<proto::BufferSaved> {
2205 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2206 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2207 anyhow::Ok((
2208 this.get_existing(buffer_id)?,
2209 this.downstream_client
2210 .as_ref()
2211 .map(|(_, project_id)| *project_id)
2212 .context("project is not shared")?,
2213 ))
2214 })??;
2215 buffer
2216 .update(&mut cx, |buffer, _| {
2217 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2218 })?
2219 .await?;
2220 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2221
2222 if let Some(new_path) = envelope.payload.new_path {
2223 let new_path = ProjectPath::from_proto(new_path);
2224 this.update(&mut cx, |this, cx| {
2225 this.save_buffer_as(buffer.clone(), new_path, cx)
2226 })?
2227 .await?;
2228 } else {
2229 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2230 .await?;
2231 }
2232
2233 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2234 project_id,
2235 buffer_id: buffer_id.into(),
2236 version: serialize_version(buffer.saved_version()),
2237 mtime: buffer.saved_mtime().map(|time| time.into()),
2238 })
2239 }
2240
2241 pub async fn handle_close_buffer(
2242 this: Entity<Self>,
2243 envelope: TypedEnvelope<proto::CloseBuffer>,
2244 mut cx: AsyncApp,
2245 ) -> Result<()> {
2246 let peer_id = envelope.sender_id;
2247 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2248 this.update(&mut cx, |this, _| {
2249 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2250 if shared.remove(&buffer_id).is_some() {
2251 if shared.is_empty() {
2252 this.shared_buffers.remove(&peer_id);
2253 }
2254 return;
2255 }
2256 }
2257 debug_panic!(
2258 "peer_id {} closed buffer_id {} which was either not open or already closed",
2259 peer_id,
2260 buffer_id
2261 )
2262 })
2263 }
2264
2265 pub async fn handle_buffer_saved(
2266 this: Entity<Self>,
2267 envelope: TypedEnvelope<proto::BufferSaved>,
2268 mut cx: AsyncApp,
2269 ) -> Result<()> {
2270 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2271 let version = deserialize_version(&envelope.payload.version);
2272 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2273 this.update(&mut cx, move |this, cx| {
2274 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2275 buffer.update(cx, |buffer, cx| {
2276 buffer.did_save(version, mtime, cx);
2277 });
2278 }
2279
2280 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2281 downstream_client
2282 .send(proto::BufferSaved {
2283 project_id: *project_id,
2284 buffer_id: buffer_id.into(),
2285 mtime: envelope.payload.mtime,
2286 version: envelope.payload.version,
2287 })
2288 .log_err();
2289 }
2290 })
2291 }
2292
2293 pub async fn handle_buffer_reloaded(
2294 this: Entity<Self>,
2295 envelope: TypedEnvelope<proto::BufferReloaded>,
2296 mut cx: AsyncApp,
2297 ) -> Result<()> {
2298 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2299 let version = deserialize_version(&envelope.payload.version);
2300 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2301 let line_ending = deserialize_line_ending(
2302 proto::LineEnding::from_i32(envelope.payload.line_ending)
2303 .ok_or_else(|| anyhow!("missing line ending"))?,
2304 );
2305 this.update(&mut cx, |this, cx| {
2306 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2307 buffer.update(cx, |buffer, cx| {
2308 buffer.did_reload(version, line_ending, mtime, cx);
2309 });
2310 }
2311
2312 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2313 downstream_client
2314 .send(proto::BufferReloaded {
2315 project_id: *project_id,
2316 buffer_id: buffer_id.into(),
2317 mtime: envelope.payload.mtime,
2318 version: envelope.payload.version,
2319 line_ending: envelope.payload.line_ending,
2320 })
2321 .log_err();
2322 }
2323 })
2324 }
2325
2326 pub async fn handle_blame_buffer(
2327 this: Entity<Self>,
2328 envelope: TypedEnvelope<proto::BlameBuffer>,
2329 mut cx: AsyncApp,
2330 ) -> Result<proto::BlameBufferResponse> {
2331 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2332 let version = deserialize_version(&envelope.payload.version);
2333 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2334 buffer
2335 .update(&mut cx, |buffer, _| {
2336 buffer.wait_for_version(version.clone())
2337 })?
2338 .await?;
2339 let blame = this
2340 .update(&mut cx, |this, cx| {
2341 this.blame_buffer(&buffer, Some(version), cx)
2342 })?
2343 .await?;
2344 Ok(serialize_blame_buffer_response(blame))
2345 }
2346
2347 pub async fn handle_get_permalink_to_line(
2348 this: Entity<Self>,
2349 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2350 mut cx: AsyncApp,
2351 ) -> Result<proto::GetPermalinkToLineResponse> {
2352 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2353 // let version = deserialize_version(&envelope.payload.version);
2354 let selection = {
2355 let proto_selection = envelope
2356 .payload
2357 .selection
2358 .context("no selection to get permalink for defined")?;
2359 proto_selection.start as u32..proto_selection.end as u32
2360 };
2361 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2362 let permalink = this
2363 .update(&mut cx, |this, cx| {
2364 this.get_permalink_to_line(&buffer, selection, cx)
2365 })?
2366 .await?;
2367 Ok(proto::GetPermalinkToLineResponse {
2368 permalink: permalink.to_string(),
2369 })
2370 }
2371
2372 pub async fn handle_open_unstaged_diff(
2373 this: Entity<Self>,
2374 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2375 mut cx: AsyncApp,
2376 ) -> Result<proto::OpenUnstagedDiffResponse> {
2377 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2378 let diff = this
2379 .update(&mut cx, |this, cx| {
2380 let buffer = this.get(buffer_id)?;
2381 Some(this.open_unstaged_diff(buffer, cx))
2382 })?
2383 .ok_or_else(|| anyhow!("no such buffer"))?
2384 .await?;
2385 this.update(&mut cx, |this, _| {
2386 let shared_buffers = this
2387 .shared_buffers
2388 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2389 .or_default();
2390 debug_assert!(shared_buffers.contains_key(&buffer_id));
2391 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2392 shared.diff = Some(diff.clone());
2393 }
2394 })?;
2395 let staged_text =
2396 diff.read_with(&cx, |diff, _| diff.base_text().map(|buffer| buffer.text()))?;
2397 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2398 }
2399
2400 pub async fn handle_open_uncommitted_diff(
2401 this: Entity<Self>,
2402 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2403 mut cx: AsyncApp,
2404 ) -> Result<proto::OpenUncommittedDiffResponse> {
2405 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2406 let diff = this
2407 .update(&mut cx, |this, cx| {
2408 let buffer = this.get(buffer_id)?;
2409 Some(this.open_uncommitted_diff(buffer, cx))
2410 })?
2411 .ok_or_else(|| anyhow!("no such buffer"))?
2412 .await?;
2413 this.update(&mut cx, |this, _| {
2414 let shared_buffers = this
2415 .shared_buffers
2416 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2417 .or_default();
2418 debug_assert!(shared_buffers.contains_key(&buffer_id));
2419 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2420 shared.diff = Some(diff.clone());
2421 }
2422 })?;
2423 diff.read_with(&cx, |diff, cx| {
2424 use proto::open_uncommitted_diff_response::Mode;
2425
2426 let staged_buffer = diff
2427 .secondary_diff()
2428 .and_then(|diff| diff.read(cx).base_text());
2429
2430 let mode;
2431 let staged_text;
2432 let committed_text;
2433 if let Some(committed_buffer) = diff.base_text() {
2434 committed_text = Some(committed_buffer.text());
2435 if let Some(staged_buffer) = staged_buffer {
2436 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2437 mode = Mode::IndexMatchesHead;
2438 staged_text = None;
2439 } else {
2440 mode = Mode::IndexAndHead;
2441 staged_text = Some(staged_buffer.text());
2442 }
2443 } else {
2444 mode = Mode::IndexAndHead;
2445 staged_text = None;
2446 }
2447 } else {
2448 mode = Mode::IndexAndHead;
2449 committed_text = None;
2450 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2451 }
2452
2453 proto::OpenUncommittedDiffResponse {
2454 committed_text,
2455 staged_text,
2456 mode: mode.into(),
2457 }
2458 })
2459 }
2460
2461 pub async fn handle_update_diff_bases(
2462 this: Entity<Self>,
2463 request: TypedEnvelope<proto::UpdateDiffBases>,
2464 mut cx: AsyncApp,
2465 ) -> Result<()> {
2466 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2467 this.update(&mut cx, |this, cx| {
2468 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2469 this.opened_buffers.get_mut(&buffer_id)
2470 {
2471 if let Some(buffer) = buffer.upgrade() {
2472 let buffer = buffer.read(cx).text_snapshot();
2473 diff_state.update(cx, |diff_state, cx| {
2474 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2475 })
2476 }
2477 }
2478 })
2479 }
2480
2481 pub fn reload_buffers(
2482 &self,
2483 buffers: HashSet<Entity<Buffer>>,
2484 push_to_history: bool,
2485 cx: &mut Context<Self>,
2486 ) -> Task<Result<ProjectTransaction>> {
2487 if buffers.is_empty() {
2488 return Task::ready(Ok(ProjectTransaction::default()));
2489 }
2490 match &self.state {
2491 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2492 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2493 }
2494 }
2495
2496 async fn handle_reload_buffers(
2497 this: Entity<Self>,
2498 envelope: TypedEnvelope<proto::ReloadBuffers>,
2499 mut cx: AsyncApp,
2500 ) -> Result<proto::ReloadBuffersResponse> {
2501 let sender_id = envelope.original_sender_id().unwrap_or_default();
2502 let reload = this.update(&mut cx, |this, cx| {
2503 let mut buffers = HashSet::default();
2504 for buffer_id in &envelope.payload.buffer_ids {
2505 let buffer_id = BufferId::new(*buffer_id)?;
2506 buffers.insert(this.get_existing(buffer_id)?);
2507 }
2508 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2509 })??;
2510
2511 let project_transaction = reload.await?;
2512 let project_transaction = this.update(&mut cx, |this, cx| {
2513 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2514 })?;
2515 Ok(proto::ReloadBuffersResponse {
2516 transaction: Some(project_transaction),
2517 })
2518 }
2519
2520 pub fn create_buffer_for_peer(
2521 &mut self,
2522 buffer: &Entity<Buffer>,
2523 peer_id: proto::PeerId,
2524 cx: &mut Context<Self>,
2525 ) -> Task<Result<()>> {
2526 let buffer_id = buffer.read(cx).remote_id();
2527 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2528 if shared_buffers.contains_key(&buffer_id) {
2529 return Task::ready(Ok(()));
2530 }
2531 shared_buffers.insert(
2532 buffer_id,
2533 SharedBuffer {
2534 buffer: buffer.clone(),
2535 diff: None,
2536 lsp_handle: None,
2537 },
2538 );
2539
2540 let Some((client, project_id)) = self.downstream_client.clone() else {
2541 return Task::ready(Ok(()));
2542 };
2543
2544 cx.spawn(|this, mut cx| async move {
2545 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2546 return anyhow::Ok(());
2547 };
2548
2549 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2550 let operations = operations.await;
2551 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2552
2553 let initial_state = proto::CreateBufferForPeer {
2554 project_id,
2555 peer_id: Some(peer_id),
2556 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2557 };
2558
2559 if client.send(initial_state).log_err().is_some() {
2560 let client = client.clone();
2561 cx.background_executor()
2562 .spawn(async move {
2563 let mut chunks = split_operations(operations).peekable();
2564 while let Some(chunk) = chunks.next() {
2565 let is_last = chunks.peek().is_none();
2566 client.send(proto::CreateBufferForPeer {
2567 project_id,
2568 peer_id: Some(peer_id),
2569 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2570 proto::BufferChunk {
2571 buffer_id: buffer_id.into(),
2572 operations: chunk,
2573 is_last,
2574 },
2575 )),
2576 })?;
2577 }
2578 anyhow::Ok(())
2579 })
2580 .await
2581 .log_err();
2582 }
2583 Ok(())
2584 })
2585 }
2586
2587 pub fn forget_shared_buffers(&mut self) {
2588 self.shared_buffers.clear();
2589 }
2590
2591 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2592 self.shared_buffers.remove(peer_id);
2593 }
2594
2595 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2596 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2597 self.shared_buffers.insert(new_peer_id, buffers);
2598 }
2599 }
2600
2601 pub fn has_shared_buffers(&self) -> bool {
2602 !self.shared_buffers.is_empty()
2603 }
2604
2605 pub fn create_local_buffer(
2606 &mut self,
2607 text: &str,
2608 language: Option<Arc<Language>>,
2609 cx: &mut Context<Self>,
2610 ) -> Entity<Buffer> {
2611 let buffer = cx.new(|cx| {
2612 Buffer::local(text, cx)
2613 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2614 });
2615
2616 self.add_buffer(buffer.clone(), cx).log_err();
2617 let buffer_id = buffer.read(cx).remote_id();
2618
2619 let this = self
2620 .as_local_mut()
2621 .expect("local-only method called in a non-local context");
2622 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2623 this.local_buffer_ids_by_path.insert(
2624 ProjectPath {
2625 worktree_id: file.worktree_id(cx),
2626 path: file.path.clone(),
2627 },
2628 buffer_id,
2629 );
2630
2631 if let Some(entry_id) = file.entry_id {
2632 this.local_buffer_ids_by_entry_id
2633 .insert(entry_id, buffer_id);
2634 }
2635 }
2636 buffer
2637 }
2638
2639 pub fn deserialize_project_transaction(
2640 &mut self,
2641 message: proto::ProjectTransaction,
2642 push_to_history: bool,
2643 cx: &mut Context<Self>,
2644 ) -> Task<Result<ProjectTransaction>> {
2645 if let Some(this) = self.as_remote_mut() {
2646 this.deserialize_project_transaction(message, push_to_history, cx)
2647 } else {
2648 debug_panic!("not a remote buffer store");
2649 Task::ready(Err(anyhow!("not a remote buffer store")))
2650 }
2651 }
2652
2653 pub fn wait_for_remote_buffer(
2654 &mut self,
2655 id: BufferId,
2656 cx: &mut Context<BufferStore>,
2657 ) -> Task<Result<Entity<Buffer>>> {
2658 if let Some(this) = self.as_remote_mut() {
2659 this.wait_for_remote_buffer(id, cx)
2660 } else {
2661 debug_panic!("not a remote buffer store");
2662 Task::ready(Err(anyhow!("not a remote buffer store")))
2663 }
2664 }
2665
2666 pub fn serialize_project_transaction_for_peer(
2667 &mut self,
2668 project_transaction: ProjectTransaction,
2669 peer_id: proto::PeerId,
2670 cx: &mut Context<Self>,
2671 ) -> proto::ProjectTransaction {
2672 let mut serialized_transaction = proto::ProjectTransaction {
2673 buffer_ids: Default::default(),
2674 transactions: Default::default(),
2675 };
2676 for (buffer, transaction) in project_transaction.0 {
2677 self.create_buffer_for_peer(&buffer, peer_id, cx)
2678 .detach_and_log_err(cx);
2679 serialized_transaction
2680 .buffer_ids
2681 .push(buffer.read(cx).remote_id().into());
2682 serialized_transaction
2683 .transactions
2684 .push(language::proto::serialize_transaction(&transaction));
2685 }
2686 serialized_transaction
2687 }
2688}
2689
2690impl OpenBuffer {
2691 fn upgrade(&self) -> Option<Entity<Buffer>> {
2692 match self {
2693 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2694 OpenBuffer::Operations(_) => None,
2695 }
2696 }
2697}
2698
2699fn is_not_found_error(error: &anyhow::Error) -> bool {
2700 error
2701 .root_cause()
2702 .downcast_ref::<io::Error>()
2703 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2704}
2705
2706fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2707 let Some(blame) = blame else {
2708 return proto::BlameBufferResponse {
2709 blame_response: None,
2710 };
2711 };
2712
2713 let entries = blame
2714 .entries
2715 .into_iter()
2716 .map(|entry| proto::BlameEntry {
2717 sha: entry.sha.as_bytes().into(),
2718 start_line: entry.range.start,
2719 end_line: entry.range.end,
2720 original_line_number: entry.original_line_number,
2721 author: entry.author.clone(),
2722 author_mail: entry.author_mail.clone(),
2723 author_time: entry.author_time,
2724 author_tz: entry.author_tz.clone(),
2725 committer: entry.committer.clone(),
2726 committer_mail: entry.committer_mail.clone(),
2727 committer_time: entry.committer_time,
2728 committer_tz: entry.committer_tz.clone(),
2729 summary: entry.summary.clone(),
2730 previous: entry.previous.clone(),
2731 filename: entry.filename.clone(),
2732 })
2733 .collect::<Vec<_>>();
2734
2735 let messages = blame
2736 .messages
2737 .into_iter()
2738 .map(|(oid, message)| proto::CommitMessage {
2739 oid: oid.as_bytes().into(),
2740 message,
2741 })
2742 .collect::<Vec<_>>();
2743
2744 let permalinks = blame
2745 .permalinks
2746 .into_iter()
2747 .map(|(oid, url)| proto::CommitPermalink {
2748 oid: oid.as_bytes().into(),
2749 permalink: url.to_string(),
2750 })
2751 .collect::<Vec<_>>();
2752
2753 proto::BlameBufferResponse {
2754 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2755 entries,
2756 messages,
2757 permalinks,
2758 remote_url: blame.remote_url,
2759 }),
2760 }
2761}
2762
2763fn deserialize_blame_buffer_response(
2764 response: proto::BlameBufferResponse,
2765) -> Option<git::blame::Blame> {
2766 let response = response.blame_response?;
2767 let entries = response
2768 .entries
2769 .into_iter()
2770 .filter_map(|entry| {
2771 Some(git::blame::BlameEntry {
2772 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2773 range: entry.start_line..entry.end_line,
2774 original_line_number: entry.original_line_number,
2775 committer: entry.committer,
2776 committer_time: entry.committer_time,
2777 committer_tz: entry.committer_tz,
2778 committer_mail: entry.committer_mail,
2779 author: entry.author,
2780 author_mail: entry.author_mail,
2781 author_time: entry.author_time,
2782 author_tz: entry.author_tz,
2783 summary: entry.summary,
2784 previous: entry.previous,
2785 filename: entry.filename,
2786 })
2787 })
2788 .collect::<Vec<_>>();
2789
2790 let messages = response
2791 .messages
2792 .into_iter()
2793 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2794 .collect::<HashMap<_, _>>();
2795
2796 let permalinks = response
2797 .permalinks
2798 .into_iter()
2799 .filter_map(|permalink| {
2800 Some((
2801 git::Oid::from_bytes(&permalink.oid).ok()?,
2802 Url::from_str(&permalink.permalink).ok()?,
2803 ))
2804 })
2805 .collect::<HashMap<_, _>>();
2806
2807 Some(Blame {
2808 entries,
2809 permalinks,
2810 messages,
2811 remote_url: response.remote_url,
2812 })
2813}
2814
2815fn get_permalink_in_rust_registry_src(
2816 provider_registry: Arc<GitHostingProviderRegistry>,
2817 path: PathBuf,
2818 selection: Range<u32>,
2819) -> Result<url::Url> {
2820 #[derive(Deserialize)]
2821 struct CargoVcsGit {
2822 sha1: String,
2823 }
2824
2825 #[derive(Deserialize)]
2826 struct CargoVcsInfo {
2827 git: CargoVcsGit,
2828 path_in_vcs: String,
2829 }
2830
2831 #[derive(Deserialize)]
2832 struct CargoPackage {
2833 repository: String,
2834 }
2835
2836 #[derive(Deserialize)]
2837 struct CargoToml {
2838 package: CargoPackage,
2839 }
2840
2841 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2842 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2843 Some((dir, json))
2844 }) else {
2845 bail!("No .cargo_vcs_info.json found in parent directories")
2846 };
2847 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2848 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2849 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2850 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2851 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2852 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2853 let permalink = provider.build_permalink(
2854 remote,
2855 BuildPermalinkParams {
2856 sha: &cargo_vcs_info.git.sha1,
2857 path: &path.to_string_lossy(),
2858 selection: Some(selection),
2859 },
2860 );
2861 Ok(permalink)
2862}