1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use buffer_diff::{BufferDiff, BufferDiffEvent};
10use client::Client;
11use collections::{hash_map, HashMap, HashSet};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{
27 proto::{self, ToProto},
28 AnyProtoClient, ErrorExt as _, TypedEnvelope,
29};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::BufferId;
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum DiffKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_diffs:
58 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
59 worktree_store: Entity<WorktreeStore>,
60 opened_buffers: HashMap<BufferId, OpenBuffer>,
61 downstream_client: Option<(AnyProtoClient, u64)>,
62 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
63}
64
65#[derive(Hash, Eq, PartialEq, Clone)]
66struct SharedBuffer {
67 buffer: Entity<Buffer>,
68 diff: Option<Entity<BufferDiff>>,
69 lsp_handle: Option<OpenLspBufferHandle>,
70}
71
72#[derive(Default)]
73struct BufferDiffState {
74 unstaged_diff: Option<WeakEntity<BufferDiff>>,
75 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
76 recalculate_diff_task: Option<Task<Result<()>>>,
77 language: Option<Arc<Language>>,
78 language_registry: Option<Arc<LanguageRegistry>>,
79 diff_updated_futures: Vec<oneshot::Sender<()>>,
80
81 head_text: Option<Arc<String>>,
82 index_text: Option<Arc<String>>,
83 head_changed: bool,
84 index_changed: bool,
85 language_changed: bool,
86}
87
88#[derive(Clone, Debug)]
89enum DiffBasesChange {
90 SetIndex(Option<String>),
91 SetHead(Option<String>),
92 SetEach {
93 index: Option<String>,
94 head: Option<String>,
95 },
96 SetBoth(Option<String>),
97}
98
99impl BufferDiffState {
100 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
101 self.language = buffer.read(cx).language().cloned();
102 self.language_changed = true;
103 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
104 }
105
106 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
107 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
108 }
109
110 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
111 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
112 }
113
114 fn handle_base_texts_updated(
115 &mut self,
116 buffer: text::BufferSnapshot,
117 message: proto::UpdateDiffBases,
118 cx: &mut Context<Self>,
119 ) {
120 use proto::update_diff_bases::Mode;
121
122 let Some(mode) = Mode::from_i32(message.mode) else {
123 return;
124 };
125
126 let diff_bases_change = match mode {
127 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
128 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
129 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
130 Mode::IndexAndHead => DiffBasesChange::SetEach {
131 index: message.staged_text,
132 head: message.committed_text,
133 },
134 };
135
136 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
137 }
138
139 fn diff_bases_changed(
140 &mut self,
141 buffer: text::BufferSnapshot,
142 diff_bases_change: DiffBasesChange,
143 cx: &mut Context<Self>,
144 ) -> oneshot::Receiver<()> {
145 match diff_bases_change {
146 DiffBasesChange::SetIndex(index) => {
147 self.index_text = index.map(|mut index| {
148 text::LineEnding::normalize(&mut index);
149 Arc::new(index)
150 });
151 self.index_changed = true;
152 }
153 DiffBasesChange::SetHead(head) => {
154 self.head_text = head.map(|mut head| {
155 text::LineEnding::normalize(&mut head);
156 Arc::new(head)
157 });
158 self.head_changed = true;
159 }
160 DiffBasesChange::SetBoth(text) => {
161 let text = text.map(|mut text| {
162 text::LineEnding::normalize(&mut text);
163 Arc::new(text)
164 });
165 self.head_text = text.clone();
166 self.index_text = text;
167 self.head_changed = true;
168 self.index_changed = true;
169 }
170 DiffBasesChange::SetEach { index, head } => {
171 self.index_text = index.map(|mut index| {
172 text::LineEnding::normalize(&mut index);
173 Arc::new(index)
174 });
175 self.index_changed = true;
176 self.head_text = head.map(|mut head| {
177 text::LineEnding::normalize(&mut head);
178 Arc::new(head)
179 });
180 self.head_changed = true;
181 }
182 }
183
184 self.recalculate_diffs(buffer, cx)
185 }
186
187 fn recalculate_diffs(
188 &mut self,
189 buffer: text::BufferSnapshot,
190 cx: &mut Context<Self>,
191 ) -> oneshot::Receiver<()> {
192 let (tx, rx) = oneshot::channel();
193 self.diff_updated_futures.push(tx);
194
195 let language = self.language.clone();
196 let language_registry = self.language_registry.clone();
197 let unstaged_diff = self.unstaged_diff();
198 let uncommitted_diff = self.uncommitted_diff();
199 let head = self.head_text.clone();
200 let index = self.index_text.clone();
201 let index_changed = self.index_changed;
202 let head_changed = self.head_changed;
203 let language_changed = self.language_changed;
204 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
205 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
206 (None, None) => true,
207 _ => false,
208 };
209 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
210 let mut unstaged_changed_range = None;
211 if let Some(unstaged_diff) = &unstaged_diff {
212 unstaged_changed_range = BufferDiff::update_diff(
213 unstaged_diff.clone(),
214 buffer.clone(),
215 index,
216 index_changed,
217 language_changed,
218 language.clone(),
219 language_registry.clone(),
220 &mut cx,
221 )
222 .await?;
223
224 unstaged_diff.update(&mut cx, |_, cx| {
225 if language_changed {
226 cx.emit(BufferDiffEvent::LanguageChanged);
227 }
228 if let Some(changed_range) = unstaged_changed_range.clone() {
229 cx.emit(BufferDiffEvent::DiffChanged {
230 changed_range: Some(changed_range),
231 })
232 }
233 })?;
234 }
235
236 if let Some(uncommitted_diff) = &uncommitted_diff {
237 let uncommitted_changed_range =
238 if let (Some(unstaged_diff), true) = (&unstaged_diff, index_matches_head) {
239 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
240 uncommitted_diff.update_diff_from(&buffer, unstaged_diff, cx)
241 })?
242 } else {
243 BufferDiff::update_diff(
244 uncommitted_diff.clone(),
245 buffer.clone(),
246 head,
247 head_changed,
248 language_changed,
249 language.clone(),
250 language_registry.clone(),
251 &mut cx,
252 )
253 .await?
254 };
255
256 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
257 if language_changed {
258 cx.emit(BufferDiffEvent::LanguageChanged);
259 }
260 let changed_range = match (unstaged_changed_range, uncommitted_changed_range) {
261 (None, None) => None,
262 (Some(unstaged_range), None) => {
263 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
264 }
265 (None, Some(uncommitted_range)) => Some(uncommitted_range),
266 (Some(unstaged_range), Some(uncommitted_range)) => {
267 let mut start = uncommitted_range.start;
268 let mut end = uncommitted_range.end;
269 if let Some(unstaged_range) =
270 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
271 {
272 start = unstaged_range.start.min(&uncommitted_range.start, &buffer);
273 end = unstaged_range.end.max(&uncommitted_range.end, &buffer);
274 }
275 Some(start..end)
276 }
277 };
278 cx.emit(BufferDiffEvent::DiffChanged { changed_range });
279 })?;
280 }
281
282 if let Some(this) = this.upgrade() {
283 this.update(&mut cx, |this, _| {
284 this.index_changed = false;
285 this.head_changed = false;
286 this.language_changed = false;
287 for tx in this.diff_updated_futures.drain(..) {
288 tx.send(()).ok();
289 }
290 })?;
291 }
292
293 Ok(())
294 }));
295
296 rx
297 }
298}
299
300enum BufferStoreState {
301 Local(LocalBufferStore),
302 Remote(RemoteBufferStore),
303}
304
305struct RemoteBufferStore {
306 shared_with_me: HashSet<Entity<Buffer>>,
307 upstream_client: AnyProtoClient,
308 project_id: u64,
309 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
310 remote_buffer_listeners:
311 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
312 worktree_store: Entity<WorktreeStore>,
313}
314
315struct LocalBufferStore {
316 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
317 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
318 worktree_store: Entity<WorktreeStore>,
319 _subscription: Subscription,
320}
321
322enum OpenBuffer {
323 Complete {
324 buffer: WeakEntity<Buffer>,
325 diff_state: Entity<BufferDiffState>,
326 },
327 Operations(Vec<Operation>),
328}
329
330pub enum BufferStoreEvent {
331 BufferAdded(Entity<Buffer>),
332 BufferDropped(BufferId),
333 BufferChangedFilePath {
334 buffer: Entity<Buffer>,
335 old_file: Option<Arc<dyn language::File>>,
336 },
337}
338
339#[derive(Default, Debug)]
340pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
341
342impl EventEmitter<BufferStoreEvent> for BufferStore {}
343
344impl RemoteBufferStore {
345 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
346 let project_id = self.project_id;
347 let client = self.upstream_client.clone();
348 cx.background_executor().spawn(async move {
349 let response = client
350 .request(proto::OpenUnstagedDiff {
351 project_id,
352 buffer_id: buffer_id.to_proto(),
353 })
354 .await?;
355 Ok(response.staged_text)
356 })
357 }
358
359 fn open_uncommitted_diff(
360 &self,
361 buffer_id: BufferId,
362 cx: &App,
363 ) -> Task<Result<DiffBasesChange>> {
364 use proto::open_uncommitted_diff_response::Mode;
365
366 let project_id = self.project_id;
367 let client = self.upstream_client.clone();
368 cx.background_executor().spawn(async move {
369 let response = client
370 .request(proto::OpenUncommittedDiff {
371 project_id,
372 buffer_id: buffer_id.to_proto(),
373 })
374 .await?;
375 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
376 let bases = match mode {
377 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
378 Mode::IndexAndHead => DiffBasesChange::SetEach {
379 head: response.committed_text,
380 index: response.staged_text,
381 },
382 };
383 Ok(bases)
384 })
385 }
386
387 pub fn wait_for_remote_buffer(
388 &mut self,
389 id: BufferId,
390 cx: &mut Context<BufferStore>,
391 ) -> Task<Result<Entity<Buffer>>> {
392 let (tx, rx) = oneshot::channel();
393 self.remote_buffer_listeners.entry(id).or_default().push(tx);
394
395 cx.spawn(|this, cx| async move {
396 if let Some(buffer) = this
397 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
398 .ok()
399 .flatten()
400 {
401 return Ok(buffer);
402 }
403
404 cx.background_executor()
405 .spawn(async move { rx.await? })
406 .await
407 })
408 }
409
410 fn save_remote_buffer(
411 &self,
412 buffer_handle: Entity<Buffer>,
413 new_path: Option<proto::ProjectPath>,
414 cx: &Context<BufferStore>,
415 ) -> Task<Result<()>> {
416 let buffer = buffer_handle.read(cx);
417 let buffer_id = buffer.remote_id().into();
418 let version = buffer.version();
419 let rpc = self.upstream_client.clone();
420 let project_id = self.project_id;
421 cx.spawn(move |_, mut cx| async move {
422 let response = rpc
423 .request(proto::SaveBuffer {
424 project_id,
425 buffer_id,
426 new_path,
427 version: serialize_version(&version),
428 })
429 .await?;
430 let version = deserialize_version(&response.version);
431 let mtime = response.mtime.map(|mtime| mtime.into());
432
433 buffer_handle.update(&mut cx, |buffer, cx| {
434 buffer.did_save(version.clone(), mtime, cx);
435 })?;
436
437 Ok(())
438 })
439 }
440
441 pub fn handle_create_buffer_for_peer(
442 &mut self,
443 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
444 replica_id: u16,
445 capability: Capability,
446 cx: &mut Context<BufferStore>,
447 ) -> Result<Option<Entity<Buffer>>> {
448 match envelope
449 .payload
450 .variant
451 .ok_or_else(|| anyhow!("missing variant"))?
452 {
453 proto::create_buffer_for_peer::Variant::State(mut state) => {
454 let buffer_id = BufferId::new(state.id)?;
455
456 let buffer_result = maybe!({
457 let mut buffer_file = None;
458 if let Some(file) = state.file.take() {
459 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
460 let worktree = self
461 .worktree_store
462 .read(cx)
463 .worktree_for_id(worktree_id, cx)
464 .ok_or_else(|| {
465 anyhow!("no worktree found for id {}", file.worktree_id)
466 })?;
467 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
468 as Arc<dyn language::File>);
469 }
470 Buffer::from_proto(replica_id, capability, state, buffer_file)
471 });
472
473 match buffer_result {
474 Ok(buffer) => {
475 let buffer = cx.new(|_| buffer);
476 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
477 }
478 Err(error) => {
479 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
480 for listener in listeners {
481 listener.send(Err(anyhow!(error.cloned()))).ok();
482 }
483 }
484 }
485 }
486 }
487 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
488 let buffer_id = BufferId::new(chunk.buffer_id)?;
489 let buffer = self
490 .loading_remote_buffers_by_id
491 .get(&buffer_id)
492 .cloned()
493 .ok_or_else(|| {
494 anyhow!(
495 "received chunk for buffer {} without initial state",
496 chunk.buffer_id
497 )
498 })?;
499
500 let result = maybe!({
501 let operations = chunk
502 .operations
503 .into_iter()
504 .map(language::proto::deserialize_operation)
505 .collect::<Result<Vec<_>>>()?;
506 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
507 anyhow::Ok(())
508 });
509
510 if let Err(error) = result {
511 self.loading_remote_buffers_by_id.remove(&buffer_id);
512 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
513 for listener in listeners {
514 listener.send(Err(error.cloned())).ok();
515 }
516 }
517 } else if chunk.is_last {
518 self.loading_remote_buffers_by_id.remove(&buffer_id);
519 if self.upstream_client.is_via_collab() {
520 // retain buffers sent by peers to avoid races.
521 self.shared_with_me.insert(buffer.clone());
522 }
523
524 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
525 for sender in senders {
526 sender.send(Ok(buffer.clone())).ok();
527 }
528 }
529 return Ok(Some(buffer));
530 }
531 }
532 }
533 return Ok(None);
534 }
535
536 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
537 self.loading_remote_buffers_by_id
538 .keys()
539 .copied()
540 .collect::<Vec<_>>()
541 }
542
543 pub fn deserialize_project_transaction(
544 &self,
545 message: proto::ProjectTransaction,
546 push_to_history: bool,
547 cx: &mut Context<BufferStore>,
548 ) -> Task<Result<ProjectTransaction>> {
549 cx.spawn(|this, mut cx| async move {
550 let mut project_transaction = ProjectTransaction::default();
551 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
552 {
553 let buffer_id = BufferId::new(buffer_id)?;
554 let buffer = this
555 .update(&mut cx, |this, cx| {
556 this.wait_for_remote_buffer(buffer_id, cx)
557 })?
558 .await?;
559 let transaction = language::proto::deserialize_transaction(transaction)?;
560 project_transaction.0.insert(buffer, transaction);
561 }
562
563 for (buffer, transaction) in &project_transaction.0 {
564 buffer
565 .update(&mut cx, |buffer, _| {
566 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
567 })?
568 .await?;
569
570 if push_to_history {
571 buffer.update(&mut cx, |buffer, _| {
572 buffer.push_transaction(transaction.clone(), Instant::now());
573 })?;
574 }
575 }
576
577 Ok(project_transaction)
578 })
579 }
580
581 fn open_buffer(
582 &self,
583 path: Arc<Path>,
584 worktree: Entity<Worktree>,
585 cx: &mut Context<BufferStore>,
586 ) -> Task<Result<Entity<Buffer>>> {
587 let worktree_id = worktree.read(cx).id().to_proto();
588 let project_id = self.project_id;
589 let client = self.upstream_client.clone();
590 cx.spawn(move |this, mut cx| async move {
591 let response = client
592 .request(proto::OpenBufferByPath {
593 project_id,
594 worktree_id,
595 path: path.to_proto(),
596 })
597 .await?;
598 let buffer_id = BufferId::new(response.buffer_id)?;
599
600 let buffer = this
601 .update(&mut cx, {
602 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
603 })?
604 .await?;
605
606 Ok(buffer)
607 })
608 }
609
610 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
611 let create = self.upstream_client.request(proto::OpenNewBuffer {
612 project_id: self.project_id,
613 });
614 cx.spawn(|this, mut cx| async move {
615 let response = create.await?;
616 let buffer_id = BufferId::new(response.buffer_id)?;
617
618 this.update(&mut cx, |this, cx| {
619 this.wait_for_remote_buffer(buffer_id, cx)
620 })?
621 .await
622 })
623 }
624
625 fn reload_buffers(
626 &self,
627 buffers: HashSet<Entity<Buffer>>,
628 push_to_history: bool,
629 cx: &mut Context<BufferStore>,
630 ) -> Task<Result<ProjectTransaction>> {
631 let request = self.upstream_client.request(proto::ReloadBuffers {
632 project_id: self.project_id,
633 buffer_ids: buffers
634 .iter()
635 .map(|buffer| buffer.read(cx).remote_id().to_proto())
636 .collect(),
637 });
638
639 cx.spawn(|this, mut cx| async move {
640 let response = request
641 .await?
642 .transaction
643 .ok_or_else(|| anyhow!("missing transaction"))?;
644 this.update(&mut cx, |this, cx| {
645 this.deserialize_project_transaction(response, push_to_history, cx)
646 })?
647 .await
648 })
649 }
650}
651
652impl LocalBufferStore {
653 fn worktree_for_buffer(
654 &self,
655 buffer: &Entity<Buffer>,
656 cx: &App,
657 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
658 let file = buffer.read(cx).file()?;
659 let worktree_id = file.worktree_id(cx);
660 let path = file.path().clone();
661 let worktree = self
662 .worktree_store
663 .read(cx)
664 .worktree_for_id(worktree_id, cx)?;
665 Some((worktree, path))
666 }
667
668 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
669 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
670 worktree.read(cx).load_staged_file(path.as_ref(), cx)
671 } else {
672 return Task::ready(Err(anyhow!("no such worktree")));
673 }
674 }
675
676 fn load_committed_text(
677 &self,
678 buffer: &Entity<Buffer>,
679 cx: &App,
680 ) -> Task<Result<Option<String>>> {
681 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
682 worktree.read(cx).load_committed_file(path.as_ref(), cx)
683 } else {
684 Task::ready(Err(anyhow!("no such worktree")))
685 }
686 }
687
688 fn save_local_buffer(
689 &self,
690 buffer_handle: Entity<Buffer>,
691 worktree: Entity<Worktree>,
692 path: Arc<Path>,
693 mut has_changed_file: bool,
694 cx: &mut Context<BufferStore>,
695 ) -> Task<Result<()>> {
696 let buffer = buffer_handle.read(cx);
697
698 let text = buffer.as_rope().clone();
699 let line_ending = buffer.line_ending();
700 let version = buffer.version();
701 let buffer_id = buffer.remote_id();
702 if buffer
703 .file()
704 .is_some_and(|file| file.disk_state() == DiskState::New)
705 {
706 has_changed_file = true;
707 }
708
709 let save = worktree.update(cx, |worktree, cx| {
710 worktree.write_file(path.as_ref(), text, line_ending, cx)
711 });
712
713 cx.spawn(move |this, mut cx| async move {
714 let new_file = save.await?;
715 let mtime = new_file.disk_state().mtime();
716 this.update(&mut cx, |this, cx| {
717 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
718 if has_changed_file {
719 downstream_client
720 .send(proto::UpdateBufferFile {
721 project_id,
722 buffer_id: buffer_id.to_proto(),
723 file: Some(language::File::to_proto(&*new_file, cx)),
724 })
725 .log_err();
726 }
727 downstream_client
728 .send(proto::BufferSaved {
729 project_id,
730 buffer_id: buffer_id.to_proto(),
731 version: serialize_version(&version),
732 mtime: mtime.map(|time| time.into()),
733 })
734 .log_err();
735 }
736 })?;
737 buffer_handle.update(&mut cx, |buffer, cx| {
738 if has_changed_file {
739 buffer.file_updated(new_file, cx);
740 }
741 buffer.did_save(version.clone(), mtime, cx);
742 })
743 })
744 }
745
746 fn subscribe_to_worktree(
747 &mut self,
748 worktree: &Entity<Worktree>,
749 cx: &mut Context<BufferStore>,
750 ) {
751 cx.subscribe(worktree, |this, worktree, event, cx| {
752 if worktree.read(cx).is_local() {
753 match event {
754 worktree::Event::UpdatedEntries(changes) => {
755 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
756 }
757 worktree::Event::UpdatedGitRepositories(updated_repos) => {
758 Self::local_worktree_git_repos_changed(
759 this,
760 worktree.clone(),
761 updated_repos,
762 cx,
763 )
764 }
765 _ => {}
766 }
767 }
768 })
769 .detach();
770 }
771
772 fn local_worktree_entries_changed(
773 this: &mut BufferStore,
774 worktree_handle: &Entity<Worktree>,
775 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
776 cx: &mut Context<BufferStore>,
777 ) {
778 let snapshot = worktree_handle.read(cx).snapshot();
779 for (path, entry_id, _) in changes {
780 Self::local_worktree_entry_changed(
781 this,
782 *entry_id,
783 path,
784 worktree_handle,
785 &snapshot,
786 cx,
787 );
788 }
789 }
790
791 fn local_worktree_git_repos_changed(
792 this: &mut BufferStore,
793 worktree_handle: Entity<Worktree>,
794 changed_repos: &UpdatedGitRepositoriesSet,
795 cx: &mut Context<BufferStore>,
796 ) {
797 debug_assert!(worktree_handle.read(cx).is_local());
798
799 let mut diff_state_updates = Vec::new();
800 for buffer in this.opened_buffers.values() {
801 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
802 continue;
803 };
804 let Some(buffer) = buffer.upgrade() else {
805 continue;
806 };
807 let buffer = buffer.read(cx);
808 let Some(file) = File::from_dyn(buffer.file()) else {
809 continue;
810 };
811 if file.worktree != worktree_handle {
812 continue;
813 }
814 let diff_state = diff_state.read(cx);
815 if changed_repos
816 .iter()
817 .any(|(work_dir, _)| file.path.starts_with(work_dir))
818 {
819 let snapshot = buffer.text_snapshot();
820 diff_state_updates.push((
821 snapshot.clone(),
822 file.path.clone(),
823 diff_state
824 .unstaged_diff
825 .as_ref()
826 .and_then(|set| set.upgrade())
827 .is_some(),
828 diff_state
829 .uncommitted_diff
830 .as_ref()
831 .and_then(|set| set.upgrade())
832 .is_some(),
833 ))
834 }
835 }
836
837 if diff_state_updates.is_empty() {
838 return;
839 }
840
841 cx.spawn(move |this, mut cx| async move {
842 let snapshot =
843 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
844 let diff_bases_changes_by_buffer = cx
845 .background_executor()
846 .spawn(async move {
847 diff_state_updates
848 .into_iter()
849 .filter_map(
850 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
851 let local_repo = snapshot.local_repo_for_path(&path)?;
852 let relative_path = local_repo.relativize(&path).ok()?;
853 let staged_text = if needs_staged_text {
854 local_repo.repo().load_index_text(&relative_path)
855 } else {
856 None
857 };
858 let committed_text = if needs_committed_text {
859 local_repo.repo().load_committed_text(&relative_path)
860 } else {
861 None
862 };
863 let diff_bases_change =
864 match (needs_staged_text, needs_committed_text) {
865 (true, true) => Some(if staged_text == committed_text {
866 DiffBasesChange::SetBoth(committed_text)
867 } else {
868 DiffBasesChange::SetEach {
869 index: staged_text,
870 head: committed_text,
871 }
872 }),
873 (true, false) => {
874 Some(DiffBasesChange::SetIndex(staged_text))
875 }
876 (false, true) => {
877 Some(DiffBasesChange::SetHead(committed_text))
878 }
879 (false, false) => None,
880 };
881 Some((buffer_snapshot, diff_bases_change))
882 },
883 )
884 .collect::<Vec<_>>()
885 })
886 .await;
887
888 this.update(&mut cx, |this, cx| {
889 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
890 let Some(OpenBuffer::Complete { diff_state, .. }) =
891 this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
892 else {
893 continue;
894 };
895 let Some(diff_bases_change) = diff_bases_change else {
896 continue;
897 };
898
899 diff_state.update(cx, |diff_state, cx| {
900 use proto::update_diff_bases::Mode;
901
902 if let Some((client, project_id)) = this.downstream_client.as_ref() {
903 let buffer_id = buffer_snapshot.remote_id().to_proto();
904 let (staged_text, committed_text, mode) = match diff_bases_change
905 .clone()
906 {
907 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
908 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
909 DiffBasesChange::SetEach { index, head } => {
910 (index, head, Mode::IndexAndHead)
911 }
912 DiffBasesChange::SetBoth(text) => {
913 (None, text, Mode::IndexMatchesHead)
914 }
915 };
916 let message = proto::UpdateDiffBases {
917 project_id: *project_id,
918 buffer_id,
919 staged_text,
920 committed_text,
921 mode: mode as i32,
922 };
923
924 client.send(message).log_err();
925 }
926
927 let _ =
928 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
929 });
930 }
931 })
932 })
933 .detach_and_log_err(cx);
934 }
935
936 fn local_worktree_entry_changed(
937 this: &mut BufferStore,
938 entry_id: ProjectEntryId,
939 path: &Arc<Path>,
940 worktree: &Entity<worktree::Worktree>,
941 snapshot: &worktree::Snapshot,
942 cx: &mut Context<BufferStore>,
943 ) -> Option<()> {
944 let project_path = ProjectPath {
945 worktree_id: snapshot.id(),
946 path: path.clone(),
947 };
948
949 let buffer_id = {
950 let local = this.as_local_mut()?;
951 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
952 Some(&buffer_id) => buffer_id,
953 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
954 }
955 };
956
957 let buffer = if let Some(buffer) = this.get(buffer_id) {
958 Some(buffer)
959 } else {
960 this.opened_buffers.remove(&buffer_id);
961 None
962 };
963
964 let buffer = if let Some(buffer) = buffer {
965 buffer
966 } else {
967 let this = this.as_local_mut()?;
968 this.local_buffer_ids_by_path.remove(&project_path);
969 this.local_buffer_ids_by_entry_id.remove(&entry_id);
970 return None;
971 };
972
973 let events = buffer.update(cx, |buffer, cx| {
974 let local = this.as_local_mut()?;
975 let file = buffer.file()?;
976 let old_file = File::from_dyn(Some(file))?;
977 if old_file.worktree != *worktree {
978 return None;
979 }
980
981 let snapshot_entry = old_file
982 .entry_id
983 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
984 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
985
986 let new_file = if let Some(entry) = snapshot_entry {
987 File {
988 disk_state: match entry.mtime {
989 Some(mtime) => DiskState::Present { mtime },
990 None => old_file.disk_state,
991 },
992 is_local: true,
993 entry_id: Some(entry.id),
994 path: entry.path.clone(),
995 worktree: worktree.clone(),
996 is_private: entry.is_private,
997 }
998 } else {
999 File {
1000 disk_state: DiskState::Deleted,
1001 is_local: true,
1002 entry_id: old_file.entry_id,
1003 path: old_file.path.clone(),
1004 worktree: worktree.clone(),
1005 is_private: old_file.is_private,
1006 }
1007 };
1008
1009 if new_file == *old_file {
1010 return None;
1011 }
1012
1013 let mut events = Vec::new();
1014 if new_file.path != old_file.path {
1015 local.local_buffer_ids_by_path.remove(&ProjectPath {
1016 path: old_file.path.clone(),
1017 worktree_id: old_file.worktree_id(cx),
1018 });
1019 local.local_buffer_ids_by_path.insert(
1020 ProjectPath {
1021 worktree_id: new_file.worktree_id(cx),
1022 path: new_file.path.clone(),
1023 },
1024 buffer_id,
1025 );
1026 events.push(BufferStoreEvent::BufferChangedFilePath {
1027 buffer: cx.entity(),
1028 old_file: buffer.file().cloned(),
1029 });
1030 }
1031
1032 if new_file.entry_id != old_file.entry_id {
1033 if let Some(entry_id) = old_file.entry_id {
1034 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1035 }
1036 if let Some(entry_id) = new_file.entry_id {
1037 local
1038 .local_buffer_ids_by_entry_id
1039 .insert(entry_id, buffer_id);
1040 }
1041 }
1042
1043 if let Some((client, project_id)) = &this.downstream_client {
1044 client
1045 .send(proto::UpdateBufferFile {
1046 project_id: *project_id,
1047 buffer_id: buffer_id.to_proto(),
1048 file: Some(new_file.to_proto(cx)),
1049 })
1050 .ok();
1051 }
1052
1053 buffer.file_updated(Arc::new(new_file), cx);
1054 Some(events)
1055 })?;
1056
1057 for event in events {
1058 cx.emit(event);
1059 }
1060
1061 None
1062 }
1063
1064 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1065 let file = File::from_dyn(buffer.read(cx).file())?;
1066
1067 let remote_id = buffer.read(cx).remote_id();
1068 if let Some(entry_id) = file.entry_id {
1069 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1070 Some(_) => {
1071 return None;
1072 }
1073 None => {
1074 self.local_buffer_ids_by_entry_id
1075 .insert(entry_id, remote_id);
1076 }
1077 }
1078 };
1079 self.local_buffer_ids_by_path.insert(
1080 ProjectPath {
1081 worktree_id: file.worktree_id(cx),
1082 path: file.path.clone(),
1083 },
1084 remote_id,
1085 );
1086
1087 Some(())
1088 }
1089
1090 fn save_buffer(
1091 &self,
1092 buffer: Entity<Buffer>,
1093 cx: &mut Context<BufferStore>,
1094 ) -> Task<Result<()>> {
1095 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1096 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1097 };
1098 let worktree = file.worktree.clone();
1099 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1100 }
1101
1102 fn save_buffer_as(
1103 &self,
1104 buffer: Entity<Buffer>,
1105 path: ProjectPath,
1106 cx: &mut Context<BufferStore>,
1107 ) -> Task<Result<()>> {
1108 let Some(worktree) = self
1109 .worktree_store
1110 .read(cx)
1111 .worktree_for_id(path.worktree_id, cx)
1112 else {
1113 return Task::ready(Err(anyhow!("no such worktree")));
1114 };
1115 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1116 }
1117
1118 fn open_buffer(
1119 &self,
1120 path: Arc<Path>,
1121 worktree: Entity<Worktree>,
1122 cx: &mut Context<BufferStore>,
1123 ) -> Task<Result<Entity<Buffer>>> {
1124 let load_buffer = worktree.update(cx, |worktree, cx| {
1125 let load_file = worktree.load_file(path.as_ref(), cx);
1126 let reservation = cx.reserve_entity();
1127 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1128 cx.spawn(move |_, mut cx| async move {
1129 let loaded = load_file.await?;
1130 let text_buffer = cx
1131 .background_executor()
1132 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1133 .await;
1134 cx.insert_entity(reservation, |_| {
1135 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1136 })
1137 })
1138 });
1139
1140 cx.spawn(move |this, mut cx| async move {
1141 let buffer = match load_buffer.await {
1142 Ok(buffer) => Ok(buffer),
1143 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1144 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1145 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1146 Buffer::build(
1147 text_buffer,
1148 Some(Arc::new(File {
1149 worktree,
1150 path,
1151 disk_state: DiskState::New,
1152 entry_id: None,
1153 is_local: true,
1154 is_private: false,
1155 })),
1156 Capability::ReadWrite,
1157 )
1158 }),
1159 Err(e) => Err(e),
1160 }?;
1161 this.update(&mut cx, |this, cx| {
1162 this.add_buffer(buffer.clone(), cx)?;
1163 let buffer_id = buffer.read(cx).remote_id();
1164 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1165 let this = this.as_local_mut().unwrap();
1166 this.local_buffer_ids_by_path.insert(
1167 ProjectPath {
1168 worktree_id: file.worktree_id(cx),
1169 path: file.path.clone(),
1170 },
1171 buffer_id,
1172 );
1173
1174 if let Some(entry_id) = file.entry_id {
1175 this.local_buffer_ids_by_entry_id
1176 .insert(entry_id, buffer_id);
1177 }
1178 }
1179
1180 anyhow::Ok(())
1181 })??;
1182
1183 Ok(buffer)
1184 })
1185 }
1186
1187 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1188 cx.spawn(|buffer_store, mut cx| async move {
1189 let buffer =
1190 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1191 buffer_store.update(&mut cx, |buffer_store, cx| {
1192 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1193 })?;
1194 Ok(buffer)
1195 })
1196 }
1197
1198 fn reload_buffers(
1199 &self,
1200 buffers: HashSet<Entity<Buffer>>,
1201 push_to_history: bool,
1202 cx: &mut Context<BufferStore>,
1203 ) -> Task<Result<ProjectTransaction>> {
1204 cx.spawn(move |_, mut cx| async move {
1205 let mut project_transaction = ProjectTransaction::default();
1206 for buffer in buffers {
1207 let transaction = buffer
1208 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1209 .await?;
1210 buffer.update(&mut cx, |buffer, cx| {
1211 if let Some(transaction) = transaction {
1212 if !push_to_history {
1213 buffer.forget_transaction(transaction.id);
1214 }
1215 project_transaction.0.insert(cx.entity(), transaction);
1216 }
1217 })?;
1218 }
1219
1220 Ok(project_transaction)
1221 })
1222 }
1223}
1224
1225impl BufferStore {
1226 pub fn init(client: &AnyProtoClient) {
1227 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1228 client.add_entity_message_handler(Self::handle_buffer_saved);
1229 client.add_entity_message_handler(Self::handle_update_buffer_file);
1230 client.add_entity_request_handler(Self::handle_save_buffer);
1231 client.add_entity_request_handler(Self::handle_blame_buffer);
1232 client.add_entity_request_handler(Self::handle_reload_buffers);
1233 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1234 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1235 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1236 client.add_entity_message_handler(Self::handle_update_diff_bases);
1237 }
1238
1239 /// Creates a buffer store, optionally retaining its buffers.
1240 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1241 Self {
1242 state: BufferStoreState::Local(LocalBufferStore {
1243 local_buffer_ids_by_path: Default::default(),
1244 local_buffer_ids_by_entry_id: Default::default(),
1245 worktree_store: worktree_store.clone(),
1246 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1247 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1248 let this = this.as_local_mut().unwrap();
1249 this.subscribe_to_worktree(worktree, cx);
1250 }
1251 }),
1252 }),
1253 downstream_client: None,
1254 opened_buffers: Default::default(),
1255 shared_buffers: Default::default(),
1256 loading_buffers: Default::default(),
1257 loading_diffs: Default::default(),
1258 worktree_store,
1259 }
1260 }
1261
1262 pub fn remote(
1263 worktree_store: Entity<WorktreeStore>,
1264 upstream_client: AnyProtoClient,
1265 remote_id: u64,
1266 _cx: &mut Context<Self>,
1267 ) -> Self {
1268 Self {
1269 state: BufferStoreState::Remote(RemoteBufferStore {
1270 shared_with_me: Default::default(),
1271 loading_remote_buffers_by_id: Default::default(),
1272 remote_buffer_listeners: Default::default(),
1273 project_id: remote_id,
1274 upstream_client,
1275 worktree_store: worktree_store.clone(),
1276 }),
1277 downstream_client: None,
1278 opened_buffers: Default::default(),
1279 loading_buffers: Default::default(),
1280 loading_diffs: Default::default(),
1281 shared_buffers: Default::default(),
1282 worktree_store,
1283 }
1284 }
1285
1286 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1287 match &mut self.state {
1288 BufferStoreState::Local(state) => Some(state),
1289 _ => None,
1290 }
1291 }
1292
1293 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1294 match &mut self.state {
1295 BufferStoreState::Remote(state) => Some(state),
1296 _ => None,
1297 }
1298 }
1299
1300 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1301 match &self.state {
1302 BufferStoreState::Remote(state) => Some(state),
1303 _ => None,
1304 }
1305 }
1306
1307 pub fn open_buffer(
1308 &mut self,
1309 project_path: ProjectPath,
1310 cx: &mut Context<Self>,
1311 ) -> Task<Result<Entity<Buffer>>> {
1312 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1313 return Task::ready(Ok(buffer));
1314 }
1315
1316 let task = match self.loading_buffers.entry(project_path.clone()) {
1317 hash_map::Entry::Occupied(e) => e.get().clone(),
1318 hash_map::Entry::Vacant(entry) => {
1319 let path = project_path.path.clone();
1320 let Some(worktree) = self
1321 .worktree_store
1322 .read(cx)
1323 .worktree_for_id(project_path.worktree_id, cx)
1324 else {
1325 return Task::ready(Err(anyhow!("no such worktree")));
1326 };
1327 let load_buffer = match &self.state {
1328 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1329 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1330 };
1331
1332 entry
1333 .insert(
1334 cx.spawn(move |this, mut cx| async move {
1335 let load_result = load_buffer.await;
1336 this.update(&mut cx, |this, _cx| {
1337 // Record the fact that the buffer is no longer loading.
1338 this.loading_buffers.remove(&project_path);
1339 })
1340 .ok();
1341 load_result.map_err(Arc::new)
1342 })
1343 .shared(),
1344 )
1345 .clone()
1346 }
1347 };
1348
1349 cx.background_executor()
1350 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1351 }
1352
1353 pub fn open_unstaged_diff(
1354 &mut self,
1355 buffer: Entity<Buffer>,
1356 cx: &mut Context<Self>,
1357 ) -> Task<Result<Entity<BufferDiff>>> {
1358 let buffer_id = buffer.read(cx).remote_id();
1359 if let Some(diff) = self.get_unstaged_diff(buffer_id, cx) {
1360 return Task::ready(Ok(diff));
1361 }
1362
1363 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1364 hash_map::Entry::Occupied(e) => e.get().clone(),
1365 hash_map::Entry::Vacant(entry) => {
1366 let staged_text = match &self.state {
1367 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1368 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1369 };
1370
1371 entry
1372 .insert(
1373 cx.spawn(move |this, cx| async move {
1374 Self::open_diff_internal(
1375 this,
1376 DiffKind::Unstaged,
1377 staged_text.await.map(DiffBasesChange::SetIndex),
1378 buffer,
1379 cx,
1380 )
1381 .await
1382 .map_err(Arc::new)
1383 })
1384 .shared(),
1385 )
1386 .clone()
1387 }
1388 };
1389
1390 cx.background_executor()
1391 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1392 }
1393
1394 pub fn open_uncommitted_diff(
1395 &mut self,
1396 buffer: Entity<Buffer>,
1397 cx: &mut Context<Self>,
1398 ) -> Task<Result<Entity<BufferDiff>>> {
1399 let buffer_id = buffer.read(cx).remote_id();
1400 if let Some(diff) = self.get_uncommitted_diff(buffer_id, cx) {
1401 return Task::ready(Ok(diff));
1402 }
1403
1404 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1405 hash_map::Entry::Occupied(e) => e.get().clone(),
1406 hash_map::Entry::Vacant(entry) => {
1407 let changes = match &self.state {
1408 BufferStoreState::Local(this) => {
1409 let committed_text = this.load_committed_text(&buffer, cx);
1410 let staged_text = this.load_staged_text(&buffer, cx);
1411 cx.background_executor().spawn(async move {
1412 let committed_text = committed_text.await?;
1413 let staged_text = staged_text.await?;
1414 let diff_bases_change = if committed_text == staged_text {
1415 DiffBasesChange::SetBoth(committed_text)
1416 } else {
1417 DiffBasesChange::SetEach {
1418 index: staged_text,
1419 head: committed_text,
1420 }
1421 };
1422 Ok(diff_bases_change)
1423 })
1424 }
1425 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1426 };
1427
1428 entry
1429 .insert(
1430 cx.spawn(move |this, cx| async move {
1431 Self::open_diff_internal(
1432 this,
1433 DiffKind::Uncommitted,
1434 changes.await,
1435 buffer,
1436 cx,
1437 )
1438 .await
1439 .map_err(Arc::new)
1440 })
1441 .shared(),
1442 )
1443 .clone()
1444 }
1445 };
1446
1447 cx.background_executor()
1448 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1449 }
1450
1451 async fn open_diff_internal(
1452 this: WeakEntity<Self>,
1453 kind: DiffKind,
1454 texts: Result<DiffBasesChange>,
1455 buffer_entity: Entity<Buffer>,
1456 mut cx: AsyncApp,
1457 ) -> Result<Entity<BufferDiff>> {
1458 let diff_bases_change = match texts {
1459 Err(e) => {
1460 this.update(&mut cx, |this, cx| {
1461 let buffer = buffer_entity.read(cx);
1462 let buffer_id = buffer.remote_id();
1463 this.loading_diffs.remove(&(buffer_id, kind));
1464 })?;
1465 return Err(e);
1466 }
1467 Ok(change) => change,
1468 };
1469
1470 this.update(&mut cx, |this, cx| {
1471 let buffer = buffer_entity.read(cx);
1472 let buffer_id = buffer.remote_id();
1473 let language = buffer.language().cloned();
1474 let language_registry = buffer.language_registry();
1475 let text_snapshot = buffer.text_snapshot();
1476 this.loading_diffs.remove(&(buffer_id, kind));
1477
1478 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1479 this.opened_buffers.get_mut(&buffer_id)
1480 {
1481 diff_state.update(cx, |diff_state, cx| {
1482 diff_state.language = language;
1483 diff_state.language_registry = language_registry;
1484
1485 let diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1486 match kind {
1487 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1488 DiffKind::Uncommitted => {
1489 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1490 diff
1491 } else {
1492 let unstaged_diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1493 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1494 unstaged_diff
1495 };
1496
1497 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
1498 diff_state.uncommitted_diff = Some(diff.downgrade())
1499 }
1500 };
1501
1502 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1503
1504 Ok(async move {
1505 rx.await.ok();
1506 Ok(diff)
1507 })
1508 })
1509 } else {
1510 Err(anyhow!("buffer was closed"))
1511 }
1512 })??
1513 .await
1514 }
1515
1516 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1517 match &self.state {
1518 BufferStoreState::Local(this) => this.create_buffer(cx),
1519 BufferStoreState::Remote(this) => this.create_buffer(cx),
1520 }
1521 }
1522
1523 pub fn save_buffer(
1524 &mut self,
1525 buffer: Entity<Buffer>,
1526 cx: &mut Context<Self>,
1527 ) -> Task<Result<()>> {
1528 match &mut self.state {
1529 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1530 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1531 }
1532 }
1533
1534 pub fn save_buffer_as(
1535 &mut self,
1536 buffer: Entity<Buffer>,
1537 path: ProjectPath,
1538 cx: &mut Context<Self>,
1539 ) -> Task<Result<()>> {
1540 let old_file = buffer.read(cx).file().cloned();
1541 let task = match &self.state {
1542 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1543 BufferStoreState::Remote(this) => {
1544 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1545 }
1546 };
1547 cx.spawn(|this, mut cx| async move {
1548 task.await?;
1549 this.update(&mut cx, |_, cx| {
1550 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1551 })
1552 })
1553 }
1554
1555 pub fn blame_buffer(
1556 &self,
1557 buffer: &Entity<Buffer>,
1558 version: Option<clock::Global>,
1559 cx: &App,
1560 ) -> Task<Result<Option<Blame>>> {
1561 let buffer = buffer.read(cx);
1562 let Some(file) = File::from_dyn(buffer.file()) else {
1563 return Task::ready(Err(anyhow!("buffer has no file")));
1564 };
1565
1566 match file.worktree.clone().read(cx) {
1567 Worktree::Local(worktree) => {
1568 let worktree = worktree.snapshot();
1569 let blame_params = maybe!({
1570 let local_repo = match worktree.local_repo_for_path(&file.path) {
1571 Some(repo_for_path) => repo_for_path,
1572 None => return Ok(None),
1573 };
1574
1575 let relative_path = local_repo
1576 .relativize(&file.path)
1577 .context("failed to relativize buffer path")?;
1578
1579 let repo = local_repo.repo().clone();
1580
1581 let content = match version {
1582 Some(version) => buffer.rope_for_version(&version).clone(),
1583 None => buffer.as_rope().clone(),
1584 };
1585
1586 anyhow::Ok(Some((repo, relative_path, content)))
1587 });
1588
1589 cx.background_executor().spawn(async move {
1590 let Some((repo, relative_path, content)) = blame_params? else {
1591 return Ok(None);
1592 };
1593 repo.blame(&relative_path, content)
1594 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1595 .map(Some)
1596 })
1597 }
1598 Worktree::Remote(worktree) => {
1599 let buffer_id = buffer.remote_id();
1600 let version = buffer.version();
1601 let project_id = worktree.project_id();
1602 let client = worktree.client();
1603 cx.spawn(|_| async move {
1604 let response = client
1605 .request(proto::BlameBuffer {
1606 project_id,
1607 buffer_id: buffer_id.into(),
1608 version: serialize_version(&version),
1609 })
1610 .await?;
1611 Ok(deserialize_blame_buffer_response(response))
1612 })
1613 }
1614 }
1615 }
1616
1617 pub fn get_permalink_to_line(
1618 &self,
1619 buffer: &Entity<Buffer>,
1620 selection: Range<u32>,
1621 cx: &App,
1622 ) -> Task<Result<url::Url>> {
1623 let buffer = buffer.read(cx);
1624 let Some(file) = File::from_dyn(buffer.file()) else {
1625 return Task::ready(Err(anyhow!("buffer has no file")));
1626 };
1627
1628 match file.worktree.read(cx) {
1629 Worktree::Local(worktree) => {
1630 let worktree_path = worktree.abs_path().clone();
1631 let Some((repo_entry, repo)) =
1632 worktree.repository_for_path(file.path()).and_then(|entry| {
1633 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1634 Some((entry, repo))
1635 })
1636 else {
1637 // If we're not in a Git repo, check whether this is a Rust source
1638 // file in the Cargo registry (presumably opened with go-to-definition
1639 // from a normal Rust file). If so, we can put together a permalink
1640 // using crate metadata.
1641 if !buffer
1642 .language()
1643 .is_some_and(|lang| lang.name() == "Rust".into())
1644 {
1645 return Task::ready(Err(anyhow!("no permalink available")));
1646 }
1647 let file_path = worktree_path.join(file.path());
1648 return cx.spawn(|cx| async move {
1649 let provider_registry =
1650 cx.update(GitHostingProviderRegistry::default_global)?;
1651 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1652 .map_err(|_| anyhow!("no permalink available"))
1653 });
1654 };
1655
1656 let path = match repo_entry.relativize(file.path()) {
1657 Ok(RepoPath(path)) => path,
1658 Err(e) => return Task::ready(Err(e)),
1659 };
1660
1661 cx.spawn(|cx| async move {
1662 const REMOTE_NAME: &str = "origin";
1663 let origin_url = repo
1664 .remote_url(REMOTE_NAME)
1665 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1666
1667 let sha = repo
1668 .head_sha()
1669 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1670
1671 let provider_registry =
1672 cx.update(GitHostingProviderRegistry::default_global)?;
1673
1674 let (provider, remote) =
1675 parse_git_remote_url(provider_registry, &origin_url)
1676 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1677
1678 let path = path
1679 .to_str()
1680 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1681
1682 Ok(provider.build_permalink(
1683 remote,
1684 BuildPermalinkParams {
1685 sha: &sha,
1686 path,
1687 selection: Some(selection),
1688 },
1689 ))
1690 })
1691 }
1692 Worktree::Remote(worktree) => {
1693 let buffer_id = buffer.remote_id();
1694 let project_id = worktree.project_id();
1695 let client = worktree.client();
1696 cx.spawn(|_| async move {
1697 let response = client
1698 .request(proto::GetPermalinkToLine {
1699 project_id,
1700 buffer_id: buffer_id.into(),
1701 selection: Some(proto::Range {
1702 start: selection.start as u64,
1703 end: selection.end as u64,
1704 }),
1705 })
1706 .await?;
1707
1708 url::Url::parse(&response.permalink).context("failed to parse permalink")
1709 })
1710 }
1711 }
1712 }
1713
1714 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1715 let buffer = buffer_entity.read(cx);
1716 let language = buffer.language().cloned();
1717 let language_registry = buffer.language_registry();
1718 let remote_id = buffer.remote_id();
1719 let is_remote = buffer.replica_id() != 0;
1720 let open_buffer = OpenBuffer::Complete {
1721 buffer: buffer_entity.downgrade(),
1722 diff_state: cx.new(|_| BufferDiffState {
1723 language,
1724 language_registry,
1725 ..Default::default()
1726 }),
1727 };
1728
1729 let handle = cx.entity().downgrade();
1730 buffer_entity.update(cx, move |_, cx| {
1731 cx.on_release(move |buffer, cx| {
1732 handle
1733 .update(cx, |_, cx| {
1734 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1735 })
1736 .ok();
1737 })
1738 .detach()
1739 });
1740
1741 match self.opened_buffers.entry(remote_id) {
1742 hash_map::Entry::Vacant(entry) => {
1743 entry.insert(open_buffer);
1744 }
1745 hash_map::Entry::Occupied(mut entry) => {
1746 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1747 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1748 } else if entry.get().upgrade().is_some() {
1749 if is_remote {
1750 return Ok(());
1751 } else {
1752 debug_panic!("buffer {} was already registered", remote_id);
1753 Err(anyhow!("buffer {} was already registered", remote_id))?;
1754 }
1755 }
1756 entry.insert(open_buffer);
1757 }
1758 }
1759
1760 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1761 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1762 Ok(())
1763 }
1764
1765 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1766 self.opened_buffers
1767 .values()
1768 .filter_map(|buffer| buffer.upgrade())
1769 }
1770
1771 pub fn loading_buffers(
1772 &self,
1773 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1774 self.loading_buffers.iter().map(|(path, task)| {
1775 let task = task.clone();
1776 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1777 })
1778 }
1779
1780 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1781 self.buffers().find_map(|buffer| {
1782 let file = File::from_dyn(buffer.read(cx).file())?;
1783 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1784 Some(buffer)
1785 } else {
1786 None
1787 }
1788 })
1789 }
1790
1791 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1792 self.opened_buffers.get(&buffer_id)?.upgrade()
1793 }
1794
1795 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1796 self.get(buffer_id)
1797 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1798 }
1799
1800 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1801 self.get(buffer_id).or_else(|| {
1802 self.as_remote()
1803 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1804 })
1805 }
1806
1807 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1808 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1809 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1810 } else {
1811 None
1812 }
1813 }
1814
1815 pub fn get_uncommitted_diff(
1816 &self,
1817 buffer_id: BufferId,
1818 cx: &App,
1819 ) -> Option<Entity<BufferDiff>> {
1820 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1821 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1822 } else {
1823 None
1824 }
1825 }
1826
1827 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1828 let buffers = self
1829 .buffers()
1830 .map(|buffer| {
1831 let buffer = buffer.read(cx);
1832 proto::BufferVersion {
1833 id: buffer.remote_id().into(),
1834 version: language::proto::serialize_version(&buffer.version),
1835 }
1836 })
1837 .collect();
1838 let incomplete_buffer_ids = self
1839 .as_remote()
1840 .map(|remote| remote.incomplete_buffer_ids())
1841 .unwrap_or_default();
1842 (buffers, incomplete_buffer_ids)
1843 }
1844
1845 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1846 for open_buffer in self.opened_buffers.values_mut() {
1847 if let Some(buffer) = open_buffer.upgrade() {
1848 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1849 }
1850 }
1851
1852 for buffer in self.buffers() {
1853 buffer.update(cx, |buffer, cx| {
1854 buffer.set_capability(Capability::ReadOnly, cx)
1855 });
1856 }
1857
1858 if let Some(remote) = self.as_remote_mut() {
1859 // Wake up all futures currently waiting on a buffer to get opened,
1860 // to give them a chance to fail now that we've disconnected.
1861 remote.remote_buffer_listeners.clear()
1862 }
1863 }
1864
1865 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1866 self.downstream_client = Some((downstream_client, remote_id));
1867 }
1868
1869 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1870 self.downstream_client.take();
1871 self.forget_shared_buffers();
1872 }
1873
1874 pub fn discard_incomplete(&mut self) {
1875 self.opened_buffers
1876 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1877 }
1878
1879 pub fn find_search_candidates(
1880 &mut self,
1881 query: &SearchQuery,
1882 mut limit: usize,
1883 fs: Arc<dyn Fs>,
1884 cx: &mut Context<Self>,
1885 ) -> Receiver<Entity<Buffer>> {
1886 let (tx, rx) = smol::channel::unbounded();
1887 let mut open_buffers = HashSet::default();
1888 let mut unnamed_buffers = Vec::new();
1889 for handle in self.buffers() {
1890 let buffer = handle.read(cx);
1891 if let Some(entry_id) = buffer.entry_id(cx) {
1892 open_buffers.insert(entry_id);
1893 } else {
1894 limit = limit.saturating_sub(1);
1895 unnamed_buffers.push(handle)
1896 };
1897 }
1898
1899 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1900 let project_paths_rx = self
1901 .worktree_store
1902 .update(cx, |worktree_store, cx| {
1903 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1904 })
1905 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1906
1907 cx.spawn(|this, mut cx| async move {
1908 for buffer in unnamed_buffers {
1909 tx.send(buffer).await.ok();
1910 }
1911
1912 let mut project_paths_rx = pin!(project_paths_rx);
1913 while let Some(project_paths) = project_paths_rx.next().await {
1914 let buffers = this.update(&mut cx, |this, cx| {
1915 project_paths
1916 .into_iter()
1917 .map(|project_path| this.open_buffer(project_path, cx))
1918 .collect::<Vec<_>>()
1919 })?;
1920 for buffer_task in buffers {
1921 if let Some(buffer) = buffer_task.await.log_err() {
1922 if tx.send(buffer).await.is_err() {
1923 return anyhow::Ok(());
1924 }
1925 }
1926 }
1927 }
1928 anyhow::Ok(())
1929 })
1930 .detach();
1931 rx
1932 }
1933
1934 pub fn recalculate_buffer_diffs(
1935 &mut self,
1936 buffers: Vec<Entity<Buffer>>,
1937 cx: &mut Context<Self>,
1938 ) -> impl Future<Output = ()> {
1939 let mut futures = Vec::new();
1940 for buffer in buffers {
1941 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1942 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1943 {
1944 let buffer = buffer.read(cx).text_snapshot();
1945 futures.push(diff_state.update(cx, |diff_state, cx| {
1946 diff_state.recalculate_diffs(buffer, cx)
1947 }));
1948 }
1949 }
1950 async move {
1951 futures::future::join_all(futures).await;
1952 }
1953 }
1954
1955 fn on_buffer_event(
1956 &mut self,
1957 buffer: Entity<Buffer>,
1958 event: &BufferEvent,
1959 cx: &mut Context<Self>,
1960 ) {
1961 match event {
1962 BufferEvent::FileHandleChanged => {
1963 if let Some(local) = self.as_local_mut() {
1964 local.buffer_changed_file(buffer, cx);
1965 }
1966 }
1967 BufferEvent::Reloaded => {
1968 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1969 return;
1970 };
1971 let buffer = buffer.read(cx);
1972 downstream_client
1973 .send(proto::BufferReloaded {
1974 project_id: *project_id,
1975 buffer_id: buffer.remote_id().to_proto(),
1976 version: serialize_version(&buffer.version()),
1977 mtime: buffer.saved_mtime().map(|t| t.into()),
1978 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1979 })
1980 .log_err();
1981 }
1982 BufferEvent::LanguageChanged => {
1983 let buffer_id = buffer.read(cx).remote_id();
1984 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1985 self.opened_buffers.get(&buffer_id)
1986 {
1987 diff_state.update(cx, |diff_state, cx| {
1988 diff_state.buffer_language_changed(buffer, cx);
1989 });
1990 }
1991 }
1992 _ => {}
1993 }
1994 }
1995
1996 pub async fn handle_update_buffer(
1997 this: Entity<Self>,
1998 envelope: TypedEnvelope<proto::UpdateBuffer>,
1999 mut cx: AsyncApp,
2000 ) -> Result<proto::Ack> {
2001 let payload = envelope.payload.clone();
2002 let buffer_id = BufferId::new(payload.buffer_id)?;
2003 let ops = payload
2004 .operations
2005 .into_iter()
2006 .map(language::proto::deserialize_operation)
2007 .collect::<Result<Vec<_>, _>>()?;
2008 this.update(&mut cx, |this, cx| {
2009 match this.opened_buffers.entry(buffer_id) {
2010 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2011 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2012 OpenBuffer::Complete { buffer, .. } => {
2013 if let Some(buffer) = buffer.upgrade() {
2014 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2015 }
2016 }
2017 },
2018 hash_map::Entry::Vacant(e) => {
2019 e.insert(OpenBuffer::Operations(ops));
2020 }
2021 }
2022 Ok(proto::Ack {})
2023 })?
2024 }
2025
2026 pub fn register_shared_lsp_handle(
2027 &mut self,
2028 peer_id: proto::PeerId,
2029 buffer_id: BufferId,
2030 handle: OpenLspBufferHandle,
2031 ) {
2032 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2033 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2034 buffer.lsp_handle = Some(handle);
2035 return;
2036 }
2037 }
2038 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2039 }
2040
2041 pub fn handle_synchronize_buffers(
2042 &mut self,
2043 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2044 cx: &mut Context<Self>,
2045 client: Arc<Client>,
2046 ) -> Result<proto::SynchronizeBuffersResponse> {
2047 let project_id = envelope.payload.project_id;
2048 let mut response = proto::SynchronizeBuffersResponse {
2049 buffers: Default::default(),
2050 };
2051 let Some(guest_id) = envelope.original_sender_id else {
2052 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2053 };
2054
2055 self.shared_buffers.entry(guest_id).or_default().clear();
2056 for buffer in envelope.payload.buffers {
2057 let buffer_id = BufferId::new(buffer.id)?;
2058 let remote_version = language::proto::deserialize_version(&buffer.version);
2059 if let Some(buffer) = self.get(buffer_id) {
2060 self.shared_buffers
2061 .entry(guest_id)
2062 .or_default()
2063 .entry(buffer_id)
2064 .or_insert_with(|| SharedBuffer {
2065 buffer: buffer.clone(),
2066 diff: None,
2067 lsp_handle: None,
2068 });
2069
2070 let buffer = buffer.read(cx);
2071 response.buffers.push(proto::BufferVersion {
2072 id: buffer_id.into(),
2073 version: language::proto::serialize_version(&buffer.version),
2074 });
2075
2076 let operations = buffer.serialize_ops(Some(remote_version), cx);
2077 let client = client.clone();
2078 if let Some(file) = buffer.file() {
2079 client
2080 .send(proto::UpdateBufferFile {
2081 project_id,
2082 buffer_id: buffer_id.into(),
2083 file: Some(file.to_proto(cx)),
2084 })
2085 .log_err();
2086 }
2087
2088 // TODO(max): do something
2089 // client
2090 // .send(proto::UpdateStagedText {
2091 // project_id,
2092 // buffer_id: buffer_id.into(),
2093 // diff_base: buffer.diff_base().map(ToString::to_string),
2094 // })
2095 // .log_err();
2096
2097 client
2098 .send(proto::BufferReloaded {
2099 project_id,
2100 buffer_id: buffer_id.into(),
2101 version: language::proto::serialize_version(buffer.saved_version()),
2102 mtime: buffer.saved_mtime().map(|time| time.into()),
2103 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2104 as i32,
2105 })
2106 .log_err();
2107
2108 cx.background_executor()
2109 .spawn(
2110 async move {
2111 let operations = operations.await;
2112 for chunk in split_operations(operations) {
2113 client
2114 .request(proto::UpdateBuffer {
2115 project_id,
2116 buffer_id: buffer_id.into(),
2117 operations: chunk,
2118 })
2119 .await?;
2120 }
2121 anyhow::Ok(())
2122 }
2123 .log_err(),
2124 )
2125 .detach();
2126 }
2127 }
2128 Ok(response)
2129 }
2130
2131 pub fn handle_create_buffer_for_peer(
2132 &mut self,
2133 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2134 replica_id: u16,
2135 capability: Capability,
2136 cx: &mut Context<Self>,
2137 ) -> Result<()> {
2138 let Some(remote) = self.as_remote_mut() else {
2139 return Err(anyhow!("buffer store is not a remote"));
2140 };
2141
2142 if let Some(buffer) =
2143 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2144 {
2145 self.add_buffer(buffer, cx)?;
2146 }
2147
2148 Ok(())
2149 }
2150
2151 pub async fn handle_update_buffer_file(
2152 this: Entity<Self>,
2153 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2154 mut cx: AsyncApp,
2155 ) -> Result<()> {
2156 let buffer_id = envelope.payload.buffer_id;
2157 let buffer_id = BufferId::new(buffer_id)?;
2158
2159 this.update(&mut cx, |this, cx| {
2160 let payload = envelope.payload.clone();
2161 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2162 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2163 let worktree = this
2164 .worktree_store
2165 .read(cx)
2166 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2167 .ok_or_else(|| anyhow!("no such worktree"))?;
2168 let file = File::from_proto(file, worktree, cx)?;
2169 let old_file = buffer.update(cx, |buffer, cx| {
2170 let old_file = buffer.file().cloned();
2171 let new_path = file.path.clone();
2172 buffer.file_updated(Arc::new(file), cx);
2173 if old_file
2174 .as_ref()
2175 .map_or(true, |old| *old.path() != new_path)
2176 {
2177 Some(old_file)
2178 } else {
2179 None
2180 }
2181 });
2182 if let Some(old_file) = old_file {
2183 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2184 }
2185 }
2186 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2187 downstream_client
2188 .send(proto::UpdateBufferFile {
2189 project_id: *project_id,
2190 buffer_id: buffer_id.into(),
2191 file: envelope.payload.file,
2192 })
2193 .log_err();
2194 }
2195 Ok(())
2196 })?
2197 }
2198
2199 pub async fn handle_save_buffer(
2200 this: Entity<Self>,
2201 envelope: TypedEnvelope<proto::SaveBuffer>,
2202 mut cx: AsyncApp,
2203 ) -> Result<proto::BufferSaved> {
2204 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2205 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2206 anyhow::Ok((
2207 this.get_existing(buffer_id)?,
2208 this.downstream_client
2209 .as_ref()
2210 .map(|(_, project_id)| *project_id)
2211 .context("project is not shared")?,
2212 ))
2213 })??;
2214 buffer
2215 .update(&mut cx, |buffer, _| {
2216 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2217 })?
2218 .await?;
2219 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2220
2221 if let Some(new_path) = envelope.payload.new_path {
2222 let new_path = ProjectPath::from_proto(new_path);
2223 this.update(&mut cx, |this, cx| {
2224 this.save_buffer_as(buffer.clone(), new_path, cx)
2225 })?
2226 .await?;
2227 } else {
2228 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2229 .await?;
2230 }
2231
2232 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2233 project_id,
2234 buffer_id: buffer_id.into(),
2235 version: serialize_version(buffer.saved_version()),
2236 mtime: buffer.saved_mtime().map(|time| time.into()),
2237 })
2238 }
2239
2240 pub async fn handle_close_buffer(
2241 this: Entity<Self>,
2242 envelope: TypedEnvelope<proto::CloseBuffer>,
2243 mut cx: AsyncApp,
2244 ) -> Result<()> {
2245 let peer_id = envelope.sender_id;
2246 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2247 this.update(&mut cx, |this, _| {
2248 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2249 if shared.remove(&buffer_id).is_some() {
2250 if shared.is_empty() {
2251 this.shared_buffers.remove(&peer_id);
2252 }
2253 return;
2254 }
2255 }
2256 debug_panic!(
2257 "peer_id {} closed buffer_id {} which was either not open or already closed",
2258 peer_id,
2259 buffer_id
2260 )
2261 })
2262 }
2263
2264 pub async fn handle_buffer_saved(
2265 this: Entity<Self>,
2266 envelope: TypedEnvelope<proto::BufferSaved>,
2267 mut cx: AsyncApp,
2268 ) -> Result<()> {
2269 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2270 let version = deserialize_version(&envelope.payload.version);
2271 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2272 this.update(&mut cx, move |this, cx| {
2273 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2274 buffer.update(cx, |buffer, cx| {
2275 buffer.did_save(version, mtime, cx);
2276 });
2277 }
2278
2279 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2280 downstream_client
2281 .send(proto::BufferSaved {
2282 project_id: *project_id,
2283 buffer_id: buffer_id.into(),
2284 mtime: envelope.payload.mtime,
2285 version: envelope.payload.version,
2286 })
2287 .log_err();
2288 }
2289 })
2290 }
2291
2292 pub async fn handle_buffer_reloaded(
2293 this: Entity<Self>,
2294 envelope: TypedEnvelope<proto::BufferReloaded>,
2295 mut cx: AsyncApp,
2296 ) -> Result<()> {
2297 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2298 let version = deserialize_version(&envelope.payload.version);
2299 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2300 let line_ending = deserialize_line_ending(
2301 proto::LineEnding::from_i32(envelope.payload.line_ending)
2302 .ok_or_else(|| anyhow!("missing line ending"))?,
2303 );
2304 this.update(&mut cx, |this, cx| {
2305 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2306 buffer.update(cx, |buffer, cx| {
2307 buffer.did_reload(version, line_ending, mtime, cx);
2308 });
2309 }
2310
2311 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2312 downstream_client
2313 .send(proto::BufferReloaded {
2314 project_id: *project_id,
2315 buffer_id: buffer_id.into(),
2316 mtime: envelope.payload.mtime,
2317 version: envelope.payload.version,
2318 line_ending: envelope.payload.line_ending,
2319 })
2320 .log_err();
2321 }
2322 })
2323 }
2324
2325 pub async fn handle_blame_buffer(
2326 this: Entity<Self>,
2327 envelope: TypedEnvelope<proto::BlameBuffer>,
2328 mut cx: AsyncApp,
2329 ) -> Result<proto::BlameBufferResponse> {
2330 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2331 let version = deserialize_version(&envelope.payload.version);
2332 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2333 buffer
2334 .update(&mut cx, |buffer, _| {
2335 buffer.wait_for_version(version.clone())
2336 })?
2337 .await?;
2338 let blame = this
2339 .update(&mut cx, |this, cx| {
2340 this.blame_buffer(&buffer, Some(version), cx)
2341 })?
2342 .await?;
2343 Ok(serialize_blame_buffer_response(blame))
2344 }
2345
2346 pub async fn handle_get_permalink_to_line(
2347 this: Entity<Self>,
2348 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2349 mut cx: AsyncApp,
2350 ) -> Result<proto::GetPermalinkToLineResponse> {
2351 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2352 // let version = deserialize_version(&envelope.payload.version);
2353 let selection = {
2354 let proto_selection = envelope
2355 .payload
2356 .selection
2357 .context("no selection to get permalink for defined")?;
2358 proto_selection.start as u32..proto_selection.end as u32
2359 };
2360 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2361 let permalink = this
2362 .update(&mut cx, |this, cx| {
2363 this.get_permalink_to_line(&buffer, selection, cx)
2364 })?
2365 .await?;
2366 Ok(proto::GetPermalinkToLineResponse {
2367 permalink: permalink.to_string(),
2368 })
2369 }
2370
2371 pub async fn handle_open_unstaged_diff(
2372 this: Entity<Self>,
2373 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2374 mut cx: AsyncApp,
2375 ) -> Result<proto::OpenUnstagedDiffResponse> {
2376 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2377 let diff = this
2378 .update(&mut cx, |this, cx| {
2379 let buffer = this.get(buffer_id)?;
2380 Some(this.open_unstaged_diff(buffer, cx))
2381 })?
2382 .ok_or_else(|| anyhow!("no such buffer"))?
2383 .await?;
2384 this.update(&mut cx, |this, _| {
2385 let shared_buffers = this
2386 .shared_buffers
2387 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2388 .or_default();
2389 debug_assert!(shared_buffers.contains_key(&buffer_id));
2390 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2391 shared.diff = Some(diff.clone());
2392 }
2393 })?;
2394 let staged_text =
2395 diff.read_with(&cx, |diff, _| diff.base_text().map(|buffer| buffer.text()))?;
2396 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2397 }
2398
2399 pub async fn handle_open_uncommitted_diff(
2400 this: Entity<Self>,
2401 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2402 mut cx: AsyncApp,
2403 ) -> Result<proto::OpenUncommittedDiffResponse> {
2404 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2405 let diff = this
2406 .update(&mut cx, |this, cx| {
2407 let buffer = this.get(buffer_id)?;
2408 Some(this.open_uncommitted_diff(buffer, cx))
2409 })?
2410 .ok_or_else(|| anyhow!("no such buffer"))?
2411 .await?;
2412 this.update(&mut cx, |this, _| {
2413 let shared_buffers = this
2414 .shared_buffers
2415 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2416 .or_default();
2417 debug_assert!(shared_buffers.contains_key(&buffer_id));
2418 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2419 shared.diff = Some(diff.clone());
2420 }
2421 })?;
2422 diff.read_with(&cx, |diff, cx| {
2423 use proto::open_uncommitted_diff_response::Mode;
2424
2425 let staged_buffer = diff
2426 .secondary_diff()
2427 .and_then(|diff| diff.read(cx).base_text());
2428
2429 let mode;
2430 let staged_text;
2431 let committed_text;
2432 if let Some(committed_buffer) = diff.base_text() {
2433 committed_text = Some(committed_buffer.text());
2434 if let Some(staged_buffer) = staged_buffer {
2435 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2436 mode = Mode::IndexMatchesHead;
2437 staged_text = None;
2438 } else {
2439 mode = Mode::IndexAndHead;
2440 staged_text = Some(staged_buffer.text());
2441 }
2442 } else {
2443 mode = Mode::IndexAndHead;
2444 staged_text = None;
2445 }
2446 } else {
2447 mode = Mode::IndexAndHead;
2448 committed_text = None;
2449 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2450 }
2451
2452 proto::OpenUncommittedDiffResponse {
2453 committed_text,
2454 staged_text,
2455 mode: mode.into(),
2456 }
2457 })
2458 }
2459
2460 pub async fn handle_update_diff_bases(
2461 this: Entity<Self>,
2462 request: TypedEnvelope<proto::UpdateDiffBases>,
2463 mut cx: AsyncApp,
2464 ) -> Result<()> {
2465 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2466 this.update(&mut cx, |this, cx| {
2467 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2468 this.opened_buffers.get_mut(&buffer_id)
2469 {
2470 if let Some(buffer) = buffer.upgrade() {
2471 let buffer = buffer.read(cx).text_snapshot();
2472 diff_state.update(cx, |diff_state, cx| {
2473 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2474 })
2475 }
2476 }
2477 })
2478 }
2479
2480 pub fn reload_buffers(
2481 &self,
2482 buffers: HashSet<Entity<Buffer>>,
2483 push_to_history: bool,
2484 cx: &mut Context<Self>,
2485 ) -> Task<Result<ProjectTransaction>> {
2486 if buffers.is_empty() {
2487 return Task::ready(Ok(ProjectTransaction::default()));
2488 }
2489 match &self.state {
2490 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2491 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2492 }
2493 }
2494
2495 async fn handle_reload_buffers(
2496 this: Entity<Self>,
2497 envelope: TypedEnvelope<proto::ReloadBuffers>,
2498 mut cx: AsyncApp,
2499 ) -> Result<proto::ReloadBuffersResponse> {
2500 let sender_id = envelope.original_sender_id().unwrap_or_default();
2501 let reload = this.update(&mut cx, |this, cx| {
2502 let mut buffers = HashSet::default();
2503 for buffer_id in &envelope.payload.buffer_ids {
2504 let buffer_id = BufferId::new(*buffer_id)?;
2505 buffers.insert(this.get_existing(buffer_id)?);
2506 }
2507 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2508 })??;
2509
2510 let project_transaction = reload.await?;
2511 let project_transaction = this.update(&mut cx, |this, cx| {
2512 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2513 })?;
2514 Ok(proto::ReloadBuffersResponse {
2515 transaction: Some(project_transaction),
2516 })
2517 }
2518
2519 pub fn create_buffer_for_peer(
2520 &mut self,
2521 buffer: &Entity<Buffer>,
2522 peer_id: proto::PeerId,
2523 cx: &mut Context<Self>,
2524 ) -> Task<Result<()>> {
2525 let buffer_id = buffer.read(cx).remote_id();
2526 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2527 if shared_buffers.contains_key(&buffer_id) {
2528 return Task::ready(Ok(()));
2529 }
2530 shared_buffers.insert(
2531 buffer_id,
2532 SharedBuffer {
2533 buffer: buffer.clone(),
2534 diff: None,
2535 lsp_handle: None,
2536 },
2537 );
2538
2539 let Some((client, project_id)) = self.downstream_client.clone() else {
2540 return Task::ready(Ok(()));
2541 };
2542
2543 cx.spawn(|this, mut cx| async move {
2544 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2545 return anyhow::Ok(());
2546 };
2547
2548 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2549 let operations = operations.await;
2550 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2551
2552 let initial_state = proto::CreateBufferForPeer {
2553 project_id,
2554 peer_id: Some(peer_id),
2555 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2556 };
2557
2558 if client.send(initial_state).log_err().is_some() {
2559 let client = client.clone();
2560 cx.background_executor()
2561 .spawn(async move {
2562 let mut chunks = split_operations(operations).peekable();
2563 while let Some(chunk) = chunks.next() {
2564 let is_last = chunks.peek().is_none();
2565 client.send(proto::CreateBufferForPeer {
2566 project_id,
2567 peer_id: Some(peer_id),
2568 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2569 proto::BufferChunk {
2570 buffer_id: buffer_id.into(),
2571 operations: chunk,
2572 is_last,
2573 },
2574 )),
2575 })?;
2576 }
2577 anyhow::Ok(())
2578 })
2579 .await
2580 .log_err();
2581 }
2582 Ok(())
2583 })
2584 }
2585
2586 pub fn forget_shared_buffers(&mut self) {
2587 self.shared_buffers.clear();
2588 }
2589
2590 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2591 self.shared_buffers.remove(peer_id);
2592 }
2593
2594 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2595 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2596 self.shared_buffers.insert(new_peer_id, buffers);
2597 }
2598 }
2599
2600 pub fn has_shared_buffers(&self) -> bool {
2601 !self.shared_buffers.is_empty()
2602 }
2603
2604 pub fn create_local_buffer(
2605 &mut self,
2606 text: &str,
2607 language: Option<Arc<Language>>,
2608 cx: &mut Context<Self>,
2609 ) -> Entity<Buffer> {
2610 let buffer = cx.new(|cx| {
2611 Buffer::local(text, cx)
2612 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2613 });
2614
2615 self.add_buffer(buffer.clone(), cx).log_err();
2616 let buffer_id = buffer.read(cx).remote_id();
2617
2618 let this = self
2619 .as_local_mut()
2620 .expect("local-only method called in a non-local context");
2621 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2622 this.local_buffer_ids_by_path.insert(
2623 ProjectPath {
2624 worktree_id: file.worktree_id(cx),
2625 path: file.path.clone(),
2626 },
2627 buffer_id,
2628 );
2629
2630 if let Some(entry_id) = file.entry_id {
2631 this.local_buffer_ids_by_entry_id
2632 .insert(entry_id, buffer_id);
2633 }
2634 }
2635 buffer
2636 }
2637
2638 pub fn deserialize_project_transaction(
2639 &mut self,
2640 message: proto::ProjectTransaction,
2641 push_to_history: bool,
2642 cx: &mut Context<Self>,
2643 ) -> Task<Result<ProjectTransaction>> {
2644 if let Some(this) = self.as_remote_mut() {
2645 this.deserialize_project_transaction(message, push_to_history, cx)
2646 } else {
2647 debug_panic!("not a remote buffer store");
2648 Task::ready(Err(anyhow!("not a remote buffer store")))
2649 }
2650 }
2651
2652 pub fn wait_for_remote_buffer(
2653 &mut self,
2654 id: BufferId,
2655 cx: &mut Context<BufferStore>,
2656 ) -> Task<Result<Entity<Buffer>>> {
2657 if let Some(this) = self.as_remote_mut() {
2658 this.wait_for_remote_buffer(id, cx)
2659 } else {
2660 debug_panic!("not a remote buffer store");
2661 Task::ready(Err(anyhow!("not a remote buffer store")))
2662 }
2663 }
2664
2665 pub fn serialize_project_transaction_for_peer(
2666 &mut self,
2667 project_transaction: ProjectTransaction,
2668 peer_id: proto::PeerId,
2669 cx: &mut Context<Self>,
2670 ) -> proto::ProjectTransaction {
2671 let mut serialized_transaction = proto::ProjectTransaction {
2672 buffer_ids: Default::default(),
2673 transactions: Default::default(),
2674 };
2675 for (buffer, transaction) in project_transaction.0 {
2676 self.create_buffer_for_peer(&buffer, peer_id, cx)
2677 .detach_and_log_err(cx);
2678 serialized_transaction
2679 .buffer_ids
2680 .push(buffer.read(cx).remote_id().into());
2681 serialized_transaction
2682 .transactions
2683 .push(language::proto::serialize_transaction(&transaction));
2684 }
2685 serialized_transaction
2686 }
2687}
2688
2689impl OpenBuffer {
2690 fn upgrade(&self) -> Option<Entity<Buffer>> {
2691 match self {
2692 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2693 OpenBuffer::Operations(_) => None,
2694 }
2695 }
2696}
2697
2698fn is_not_found_error(error: &anyhow::Error) -> bool {
2699 error
2700 .root_cause()
2701 .downcast_ref::<io::Error>()
2702 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2703}
2704
2705fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2706 let Some(blame) = blame else {
2707 return proto::BlameBufferResponse {
2708 blame_response: None,
2709 };
2710 };
2711
2712 let entries = blame
2713 .entries
2714 .into_iter()
2715 .map(|entry| proto::BlameEntry {
2716 sha: entry.sha.as_bytes().into(),
2717 start_line: entry.range.start,
2718 end_line: entry.range.end,
2719 original_line_number: entry.original_line_number,
2720 author: entry.author.clone(),
2721 author_mail: entry.author_mail.clone(),
2722 author_time: entry.author_time,
2723 author_tz: entry.author_tz.clone(),
2724 committer: entry.committer.clone(),
2725 committer_mail: entry.committer_mail.clone(),
2726 committer_time: entry.committer_time,
2727 committer_tz: entry.committer_tz.clone(),
2728 summary: entry.summary.clone(),
2729 previous: entry.previous.clone(),
2730 filename: entry.filename.clone(),
2731 })
2732 .collect::<Vec<_>>();
2733
2734 let messages = blame
2735 .messages
2736 .into_iter()
2737 .map(|(oid, message)| proto::CommitMessage {
2738 oid: oid.as_bytes().into(),
2739 message,
2740 })
2741 .collect::<Vec<_>>();
2742
2743 let permalinks = blame
2744 .permalinks
2745 .into_iter()
2746 .map(|(oid, url)| proto::CommitPermalink {
2747 oid: oid.as_bytes().into(),
2748 permalink: url.to_string(),
2749 })
2750 .collect::<Vec<_>>();
2751
2752 proto::BlameBufferResponse {
2753 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2754 entries,
2755 messages,
2756 permalinks,
2757 remote_url: blame.remote_url,
2758 }),
2759 }
2760}
2761
2762fn deserialize_blame_buffer_response(
2763 response: proto::BlameBufferResponse,
2764) -> Option<git::blame::Blame> {
2765 let response = response.blame_response?;
2766 let entries = response
2767 .entries
2768 .into_iter()
2769 .filter_map(|entry| {
2770 Some(git::blame::BlameEntry {
2771 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2772 range: entry.start_line..entry.end_line,
2773 original_line_number: entry.original_line_number,
2774 committer: entry.committer,
2775 committer_time: entry.committer_time,
2776 committer_tz: entry.committer_tz,
2777 committer_mail: entry.committer_mail,
2778 author: entry.author,
2779 author_mail: entry.author_mail,
2780 author_time: entry.author_time,
2781 author_tz: entry.author_tz,
2782 summary: entry.summary,
2783 previous: entry.previous,
2784 filename: entry.filename,
2785 })
2786 })
2787 .collect::<Vec<_>>();
2788
2789 let messages = response
2790 .messages
2791 .into_iter()
2792 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2793 .collect::<HashMap<_, _>>();
2794
2795 let permalinks = response
2796 .permalinks
2797 .into_iter()
2798 .filter_map(|permalink| {
2799 Some((
2800 git::Oid::from_bytes(&permalink.oid).ok()?,
2801 Url::from_str(&permalink.permalink).ok()?,
2802 ))
2803 })
2804 .collect::<HashMap<_, _>>();
2805
2806 Some(Blame {
2807 entries,
2808 permalinks,
2809 messages,
2810 remote_url: response.remote_url,
2811 })
2812}
2813
2814fn get_permalink_in_rust_registry_src(
2815 provider_registry: Arc<GitHostingProviderRegistry>,
2816 path: PathBuf,
2817 selection: Range<u32>,
2818) -> Result<url::Url> {
2819 #[derive(Deserialize)]
2820 struct CargoVcsGit {
2821 sha1: String,
2822 }
2823
2824 #[derive(Deserialize)]
2825 struct CargoVcsInfo {
2826 git: CargoVcsGit,
2827 path_in_vcs: String,
2828 }
2829
2830 #[derive(Deserialize)]
2831 struct CargoPackage {
2832 repository: String,
2833 }
2834
2835 #[derive(Deserialize)]
2836 struct CargoToml {
2837 package: CargoPackage,
2838 }
2839
2840 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2841 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2842 Some((dir, json))
2843 }) else {
2844 bail!("No .cargo_vcs_info.json found in parent directories")
2845 };
2846 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2847 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2848 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2849 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2850 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2851 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2852 let permalink = provider.build_permalink(
2853 remote,
2854 BuildPermalinkParams {
2855 sha: &cargo_vcs_info.git.sha1,
2856 path: &path.to_string_lossy(),
2857 selection: Some(selection),
2858 },
2859 );
2860 Ok(permalink)
2861}