1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use buffer_diff::{BufferDiff, BufferDiffEvent};
10use client::Client;
11use collections::{hash_map, HashMap, HashSet};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{
27 proto::{self, ToProto},
28 AnyProtoClient, ErrorExt as _, TypedEnvelope,
29};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::BufferId;
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum DiffKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_diffs:
58 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
59 worktree_store: Entity<WorktreeStore>,
60 opened_buffers: HashMap<BufferId, OpenBuffer>,
61 downstream_client: Option<(AnyProtoClient, u64)>,
62 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
63}
64
65#[derive(Hash, Eq, PartialEq, Clone)]
66struct SharedBuffer {
67 buffer: Entity<Buffer>,
68 diff: Option<Entity<BufferDiff>>,
69 lsp_handle: Option<OpenLspBufferHandle>,
70}
71
72#[derive(Default)]
73struct BufferDiffState {
74 unstaged_diff: Option<WeakEntity<BufferDiff>>,
75 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
76 recalculate_diff_task: Option<Task<Result<()>>>,
77 language: Option<Arc<Language>>,
78 language_registry: Option<Arc<LanguageRegistry>>,
79 diff_updated_futures: Vec<oneshot::Sender<()>>,
80
81 head_text: Option<Arc<String>>,
82 index_text: Option<Arc<String>>,
83 head_changed: bool,
84 index_changed: bool,
85 language_changed: bool,
86}
87
88#[derive(Clone, Debug)]
89enum DiffBasesChange {
90 SetIndex(Option<String>),
91 SetHead(Option<String>),
92 SetEach {
93 index: Option<String>,
94 head: Option<String>,
95 },
96 SetBoth(Option<String>),
97}
98
99impl BufferDiffState {
100 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
101 self.language = buffer.read(cx).language().cloned();
102 self.language_changed = true;
103 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
104 }
105
106 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
107 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
108 }
109
110 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
111 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
112 }
113
114 fn handle_base_texts_updated(
115 &mut self,
116 buffer: text::BufferSnapshot,
117 message: proto::UpdateDiffBases,
118 cx: &mut Context<Self>,
119 ) {
120 use proto::update_diff_bases::Mode;
121
122 let Some(mode) = Mode::from_i32(message.mode) else {
123 return;
124 };
125
126 let diff_bases_change = match mode {
127 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
128 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
129 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
130 Mode::IndexAndHead => DiffBasesChange::SetEach {
131 index: message.staged_text,
132 head: message.committed_text,
133 },
134 };
135
136 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
137 }
138
139 fn diff_bases_changed(
140 &mut self,
141 buffer: text::BufferSnapshot,
142 diff_bases_change: DiffBasesChange,
143 cx: &mut Context<Self>,
144 ) -> oneshot::Receiver<()> {
145 match diff_bases_change {
146 DiffBasesChange::SetIndex(index) => {
147 self.index_text = index.map(|mut index| {
148 text::LineEnding::normalize(&mut index);
149 Arc::new(index)
150 });
151 self.index_changed = true;
152 }
153 DiffBasesChange::SetHead(head) => {
154 self.head_text = head.map(|mut head| {
155 text::LineEnding::normalize(&mut head);
156 Arc::new(head)
157 });
158 self.head_changed = true;
159 }
160 DiffBasesChange::SetBoth(text) => {
161 let text = text.map(|mut text| {
162 text::LineEnding::normalize(&mut text);
163 Arc::new(text)
164 });
165 self.head_text = text.clone();
166 self.index_text = text;
167 self.head_changed = true;
168 self.index_changed = true;
169 }
170 DiffBasesChange::SetEach { index, head } => {
171 self.index_text = index.map(|mut index| {
172 text::LineEnding::normalize(&mut index);
173 Arc::new(index)
174 });
175 self.index_changed = true;
176 self.head_text = head.map(|mut head| {
177 text::LineEnding::normalize(&mut head);
178 Arc::new(head)
179 });
180 self.head_changed = true;
181 }
182 }
183
184 self.recalculate_diffs(buffer, cx)
185 }
186
187 fn recalculate_diffs(
188 &mut self,
189 buffer: text::BufferSnapshot,
190 cx: &mut Context<Self>,
191 ) -> oneshot::Receiver<()> {
192 let (tx, rx) = oneshot::channel();
193 self.diff_updated_futures.push(tx);
194
195 let language = self.language.clone();
196 let language_registry = self.language_registry.clone();
197 let unstaged_diff = self.unstaged_diff();
198 let uncommitted_diff = self.uncommitted_diff();
199 let head = self.head_text.clone();
200 let index = self.index_text.clone();
201 let index_changed = self.index_changed;
202 let head_changed = self.head_changed;
203 let language_changed = self.language_changed;
204 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
205 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
206 (None, None) => true,
207 _ => false,
208 };
209 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
210 let mut unstaged_changed_range = None;
211 if let Some(unstaged_diff) = &unstaged_diff {
212 unstaged_changed_range = BufferDiff::update_diff(
213 unstaged_diff.clone(),
214 buffer.clone(),
215 index,
216 index_changed,
217 language_changed,
218 language.clone(),
219 language_registry.clone(),
220 &mut cx,
221 )
222 .await?;
223
224 unstaged_diff.update(&mut cx, |_, cx| {
225 if language_changed {
226 cx.emit(BufferDiffEvent::LanguageChanged);
227 }
228 if let Some(changed_range) = unstaged_changed_range.clone() {
229 cx.emit(BufferDiffEvent::DiffChanged {
230 changed_range: Some(changed_range),
231 })
232 }
233 })?;
234 }
235
236 if let Some(uncommitted_diff) = &uncommitted_diff {
237 let uncommitted_changed_range =
238 if let (Some(unstaged_diff), true) = (&unstaged_diff, index_matches_head) {
239 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
240 uncommitted_diff.update_diff_from(&buffer, unstaged_diff, cx)
241 })?
242 } else {
243 BufferDiff::update_diff(
244 uncommitted_diff.clone(),
245 buffer.clone(),
246 head,
247 head_changed,
248 language_changed,
249 language.clone(),
250 language_registry.clone(),
251 &mut cx,
252 )
253 .await?
254 };
255
256 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
257 if language_changed {
258 cx.emit(BufferDiffEvent::LanguageChanged);
259 }
260 let changed_range = match (unstaged_changed_range, uncommitted_changed_range) {
261 (None, None) => None,
262 (Some(unstaged_range), None) => {
263 Some(uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx))
264 }
265 (None, Some(uncommitted_range)) => Some(uncommitted_range),
266 (Some(unstaged_range), Some(uncommitted_range)) => maybe!({
267 let expanded_range =
268 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx);
269 let start = expanded_range.start.min(&uncommitted_range.start, &buffer);
270 let end = expanded_range.end.max(&uncommitted_range.end, &buffer);
271 Some(start..end)
272 }),
273 };
274 cx.emit(BufferDiffEvent::DiffChanged { changed_range });
275 })?;
276 }
277
278 if let Some(this) = this.upgrade() {
279 this.update(&mut cx, |this, _| {
280 this.index_changed = false;
281 this.head_changed = false;
282 this.language_changed = false;
283 for tx in this.diff_updated_futures.drain(..) {
284 tx.send(()).ok();
285 }
286 })?;
287 }
288
289 Ok(())
290 }));
291
292 rx
293 }
294}
295
296enum BufferStoreState {
297 Local(LocalBufferStore),
298 Remote(RemoteBufferStore),
299}
300
301struct RemoteBufferStore {
302 shared_with_me: HashSet<Entity<Buffer>>,
303 upstream_client: AnyProtoClient,
304 project_id: u64,
305 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
306 remote_buffer_listeners:
307 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
308 worktree_store: Entity<WorktreeStore>,
309}
310
311struct LocalBufferStore {
312 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
313 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
314 worktree_store: Entity<WorktreeStore>,
315 _subscription: Subscription,
316}
317
318enum OpenBuffer {
319 Complete {
320 buffer: WeakEntity<Buffer>,
321 diff_state: Entity<BufferDiffState>,
322 },
323 Operations(Vec<Operation>),
324}
325
326pub enum BufferStoreEvent {
327 BufferAdded(Entity<Buffer>),
328 BufferDropped(BufferId),
329 BufferChangedFilePath {
330 buffer: Entity<Buffer>,
331 old_file: Option<Arc<dyn language::File>>,
332 },
333}
334
335#[derive(Default, Debug)]
336pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
337
338impl EventEmitter<BufferStoreEvent> for BufferStore {}
339
340impl RemoteBufferStore {
341 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
342 let project_id = self.project_id;
343 let client = self.upstream_client.clone();
344 cx.background_executor().spawn(async move {
345 let response = client
346 .request(proto::OpenUnstagedDiff {
347 project_id,
348 buffer_id: buffer_id.to_proto(),
349 })
350 .await?;
351 Ok(response.staged_text)
352 })
353 }
354
355 fn open_uncommitted_diff(
356 &self,
357 buffer_id: BufferId,
358 cx: &App,
359 ) -> Task<Result<DiffBasesChange>> {
360 use proto::open_uncommitted_diff_response::Mode;
361
362 let project_id = self.project_id;
363 let client = self.upstream_client.clone();
364 cx.background_executor().spawn(async move {
365 let response = client
366 .request(proto::OpenUncommittedDiff {
367 project_id,
368 buffer_id: buffer_id.to_proto(),
369 })
370 .await?;
371 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
372 let bases = match mode {
373 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
374 Mode::IndexAndHead => DiffBasesChange::SetEach {
375 head: response.committed_text,
376 index: response.staged_text,
377 },
378 };
379 Ok(bases)
380 })
381 }
382
383 pub fn wait_for_remote_buffer(
384 &mut self,
385 id: BufferId,
386 cx: &mut Context<BufferStore>,
387 ) -> Task<Result<Entity<Buffer>>> {
388 let (tx, rx) = oneshot::channel();
389 self.remote_buffer_listeners.entry(id).or_default().push(tx);
390
391 cx.spawn(|this, cx| async move {
392 if let Some(buffer) = this
393 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
394 .ok()
395 .flatten()
396 {
397 return Ok(buffer);
398 }
399
400 cx.background_executor()
401 .spawn(async move { rx.await? })
402 .await
403 })
404 }
405
406 fn save_remote_buffer(
407 &self,
408 buffer_handle: Entity<Buffer>,
409 new_path: Option<proto::ProjectPath>,
410 cx: &Context<BufferStore>,
411 ) -> Task<Result<()>> {
412 let buffer = buffer_handle.read(cx);
413 let buffer_id = buffer.remote_id().into();
414 let version = buffer.version();
415 let rpc = self.upstream_client.clone();
416 let project_id = self.project_id;
417 cx.spawn(move |_, mut cx| async move {
418 let response = rpc
419 .request(proto::SaveBuffer {
420 project_id,
421 buffer_id,
422 new_path,
423 version: serialize_version(&version),
424 })
425 .await?;
426 let version = deserialize_version(&response.version);
427 let mtime = response.mtime.map(|mtime| mtime.into());
428
429 buffer_handle.update(&mut cx, |buffer, cx| {
430 buffer.did_save(version.clone(), mtime, cx);
431 })?;
432
433 Ok(())
434 })
435 }
436
437 pub fn handle_create_buffer_for_peer(
438 &mut self,
439 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
440 replica_id: u16,
441 capability: Capability,
442 cx: &mut Context<BufferStore>,
443 ) -> Result<Option<Entity<Buffer>>> {
444 match envelope
445 .payload
446 .variant
447 .ok_or_else(|| anyhow!("missing variant"))?
448 {
449 proto::create_buffer_for_peer::Variant::State(mut state) => {
450 let buffer_id = BufferId::new(state.id)?;
451
452 let buffer_result = maybe!({
453 let mut buffer_file = None;
454 if let Some(file) = state.file.take() {
455 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
456 let worktree = self
457 .worktree_store
458 .read(cx)
459 .worktree_for_id(worktree_id, cx)
460 .ok_or_else(|| {
461 anyhow!("no worktree found for id {}", file.worktree_id)
462 })?;
463 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
464 as Arc<dyn language::File>);
465 }
466 Buffer::from_proto(replica_id, capability, state, buffer_file)
467 });
468
469 match buffer_result {
470 Ok(buffer) => {
471 let buffer = cx.new(|_| buffer);
472 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
473 }
474 Err(error) => {
475 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
476 for listener in listeners {
477 listener.send(Err(anyhow!(error.cloned()))).ok();
478 }
479 }
480 }
481 }
482 }
483 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
484 let buffer_id = BufferId::new(chunk.buffer_id)?;
485 let buffer = self
486 .loading_remote_buffers_by_id
487 .get(&buffer_id)
488 .cloned()
489 .ok_or_else(|| {
490 anyhow!(
491 "received chunk for buffer {} without initial state",
492 chunk.buffer_id
493 )
494 })?;
495
496 let result = maybe!({
497 let operations = chunk
498 .operations
499 .into_iter()
500 .map(language::proto::deserialize_operation)
501 .collect::<Result<Vec<_>>>()?;
502 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
503 anyhow::Ok(())
504 });
505
506 if let Err(error) = result {
507 self.loading_remote_buffers_by_id.remove(&buffer_id);
508 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
509 for listener in listeners {
510 listener.send(Err(error.cloned())).ok();
511 }
512 }
513 } else if chunk.is_last {
514 self.loading_remote_buffers_by_id.remove(&buffer_id);
515 if self.upstream_client.is_via_collab() {
516 // retain buffers sent by peers to avoid races.
517 self.shared_with_me.insert(buffer.clone());
518 }
519
520 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
521 for sender in senders {
522 sender.send(Ok(buffer.clone())).ok();
523 }
524 }
525 return Ok(Some(buffer));
526 }
527 }
528 }
529 return Ok(None);
530 }
531
532 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
533 self.loading_remote_buffers_by_id
534 .keys()
535 .copied()
536 .collect::<Vec<_>>()
537 }
538
539 pub fn deserialize_project_transaction(
540 &self,
541 message: proto::ProjectTransaction,
542 push_to_history: bool,
543 cx: &mut Context<BufferStore>,
544 ) -> Task<Result<ProjectTransaction>> {
545 cx.spawn(|this, mut cx| async move {
546 let mut project_transaction = ProjectTransaction::default();
547 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
548 {
549 let buffer_id = BufferId::new(buffer_id)?;
550 let buffer = this
551 .update(&mut cx, |this, cx| {
552 this.wait_for_remote_buffer(buffer_id, cx)
553 })?
554 .await?;
555 let transaction = language::proto::deserialize_transaction(transaction)?;
556 project_transaction.0.insert(buffer, transaction);
557 }
558
559 for (buffer, transaction) in &project_transaction.0 {
560 buffer
561 .update(&mut cx, |buffer, _| {
562 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
563 })?
564 .await?;
565
566 if push_to_history {
567 buffer.update(&mut cx, |buffer, _| {
568 buffer.push_transaction(transaction.clone(), Instant::now());
569 })?;
570 }
571 }
572
573 Ok(project_transaction)
574 })
575 }
576
577 fn open_buffer(
578 &self,
579 path: Arc<Path>,
580 worktree: Entity<Worktree>,
581 cx: &mut Context<BufferStore>,
582 ) -> Task<Result<Entity<Buffer>>> {
583 let worktree_id = worktree.read(cx).id().to_proto();
584 let project_id = self.project_id;
585 let client = self.upstream_client.clone();
586 cx.spawn(move |this, mut cx| async move {
587 let response = client
588 .request(proto::OpenBufferByPath {
589 project_id,
590 worktree_id,
591 path: path.to_proto(),
592 })
593 .await?;
594 let buffer_id = BufferId::new(response.buffer_id)?;
595
596 let buffer = this
597 .update(&mut cx, {
598 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
599 })?
600 .await?;
601
602 Ok(buffer)
603 })
604 }
605
606 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
607 let create = self.upstream_client.request(proto::OpenNewBuffer {
608 project_id: self.project_id,
609 });
610 cx.spawn(|this, mut cx| async move {
611 let response = create.await?;
612 let buffer_id = BufferId::new(response.buffer_id)?;
613
614 this.update(&mut cx, |this, cx| {
615 this.wait_for_remote_buffer(buffer_id, cx)
616 })?
617 .await
618 })
619 }
620
621 fn reload_buffers(
622 &self,
623 buffers: HashSet<Entity<Buffer>>,
624 push_to_history: bool,
625 cx: &mut Context<BufferStore>,
626 ) -> Task<Result<ProjectTransaction>> {
627 let request = self.upstream_client.request(proto::ReloadBuffers {
628 project_id: self.project_id,
629 buffer_ids: buffers
630 .iter()
631 .map(|buffer| buffer.read(cx).remote_id().to_proto())
632 .collect(),
633 });
634
635 cx.spawn(|this, mut cx| async move {
636 let response = request
637 .await?
638 .transaction
639 .ok_or_else(|| anyhow!("missing transaction"))?;
640 this.update(&mut cx, |this, cx| {
641 this.deserialize_project_transaction(response, push_to_history, cx)
642 })?
643 .await
644 })
645 }
646}
647
648impl LocalBufferStore {
649 fn worktree_for_buffer(
650 &self,
651 buffer: &Entity<Buffer>,
652 cx: &App,
653 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
654 let file = buffer.read(cx).file()?;
655 let worktree_id = file.worktree_id(cx);
656 let path = file.path().clone();
657 let worktree = self
658 .worktree_store
659 .read(cx)
660 .worktree_for_id(worktree_id, cx)?;
661 Some((worktree, path))
662 }
663
664 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
665 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
666 worktree.read(cx).load_staged_file(path.as_ref(), cx)
667 } else {
668 return Task::ready(Err(anyhow!("no such worktree")));
669 }
670 }
671
672 fn load_committed_text(
673 &self,
674 buffer: &Entity<Buffer>,
675 cx: &App,
676 ) -> Task<Result<Option<String>>> {
677 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
678 worktree.read(cx).load_committed_file(path.as_ref(), cx)
679 } else {
680 Task::ready(Err(anyhow!("no such worktree")))
681 }
682 }
683
684 fn save_local_buffer(
685 &self,
686 buffer_handle: Entity<Buffer>,
687 worktree: Entity<Worktree>,
688 path: Arc<Path>,
689 mut has_changed_file: bool,
690 cx: &mut Context<BufferStore>,
691 ) -> Task<Result<()>> {
692 let buffer = buffer_handle.read(cx);
693
694 let text = buffer.as_rope().clone();
695 let line_ending = buffer.line_ending();
696 let version = buffer.version();
697 let buffer_id = buffer.remote_id();
698 if buffer
699 .file()
700 .is_some_and(|file| file.disk_state() == DiskState::New)
701 {
702 has_changed_file = true;
703 }
704
705 let save = worktree.update(cx, |worktree, cx| {
706 worktree.write_file(path.as_ref(), text, line_ending, cx)
707 });
708
709 cx.spawn(move |this, mut cx| async move {
710 let new_file = save.await?;
711 let mtime = new_file.disk_state().mtime();
712 this.update(&mut cx, |this, cx| {
713 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
714 if has_changed_file {
715 downstream_client
716 .send(proto::UpdateBufferFile {
717 project_id,
718 buffer_id: buffer_id.to_proto(),
719 file: Some(language::File::to_proto(&*new_file, cx)),
720 })
721 .log_err();
722 }
723 downstream_client
724 .send(proto::BufferSaved {
725 project_id,
726 buffer_id: buffer_id.to_proto(),
727 version: serialize_version(&version),
728 mtime: mtime.map(|time| time.into()),
729 })
730 .log_err();
731 }
732 })?;
733 buffer_handle.update(&mut cx, |buffer, cx| {
734 if has_changed_file {
735 buffer.file_updated(new_file, cx);
736 }
737 buffer.did_save(version.clone(), mtime, cx);
738 })
739 })
740 }
741
742 fn subscribe_to_worktree(
743 &mut self,
744 worktree: &Entity<Worktree>,
745 cx: &mut Context<BufferStore>,
746 ) {
747 cx.subscribe(worktree, |this, worktree, event, cx| {
748 if worktree.read(cx).is_local() {
749 match event {
750 worktree::Event::UpdatedEntries(changes) => {
751 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
752 }
753 worktree::Event::UpdatedGitRepositories(updated_repos) => {
754 Self::local_worktree_git_repos_changed(
755 this,
756 worktree.clone(),
757 updated_repos,
758 cx,
759 )
760 }
761 _ => {}
762 }
763 }
764 })
765 .detach();
766 }
767
768 fn local_worktree_entries_changed(
769 this: &mut BufferStore,
770 worktree_handle: &Entity<Worktree>,
771 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
772 cx: &mut Context<BufferStore>,
773 ) {
774 let snapshot = worktree_handle.read(cx).snapshot();
775 for (path, entry_id, _) in changes {
776 Self::local_worktree_entry_changed(
777 this,
778 *entry_id,
779 path,
780 worktree_handle,
781 &snapshot,
782 cx,
783 );
784 }
785 }
786
787 fn local_worktree_git_repos_changed(
788 this: &mut BufferStore,
789 worktree_handle: Entity<Worktree>,
790 changed_repos: &UpdatedGitRepositoriesSet,
791 cx: &mut Context<BufferStore>,
792 ) {
793 debug_assert!(worktree_handle.read(cx).is_local());
794
795 let mut diff_state_updates = Vec::new();
796 for buffer in this.opened_buffers.values() {
797 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
798 continue;
799 };
800 let Some(buffer) = buffer.upgrade() else {
801 continue;
802 };
803 let buffer = buffer.read(cx);
804 let Some(file) = File::from_dyn(buffer.file()) else {
805 continue;
806 };
807 if file.worktree != worktree_handle {
808 continue;
809 }
810 let diff_state = diff_state.read(cx);
811 if changed_repos
812 .iter()
813 .any(|(work_dir, _)| file.path.starts_with(work_dir))
814 {
815 let snapshot = buffer.text_snapshot();
816 diff_state_updates.push((
817 snapshot.clone(),
818 file.path.clone(),
819 diff_state
820 .unstaged_diff
821 .as_ref()
822 .and_then(|set| set.upgrade())
823 .is_some(),
824 diff_state
825 .uncommitted_diff
826 .as_ref()
827 .and_then(|set| set.upgrade())
828 .is_some(),
829 ))
830 }
831 }
832
833 if diff_state_updates.is_empty() {
834 return;
835 }
836
837 cx.spawn(move |this, mut cx| async move {
838 let snapshot =
839 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
840 let diff_bases_changes_by_buffer = cx
841 .background_executor()
842 .spawn(async move {
843 diff_state_updates
844 .into_iter()
845 .filter_map(
846 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
847 let local_repo = snapshot.local_repo_for_path(&path)?;
848 let relative_path = local_repo.relativize(&path).ok()?;
849 let staged_text = if needs_staged_text {
850 local_repo.repo().load_index_text(&relative_path)
851 } else {
852 None
853 };
854 let committed_text = if needs_committed_text {
855 local_repo.repo().load_committed_text(&relative_path)
856 } else {
857 None
858 };
859 let diff_bases_change =
860 match (needs_staged_text, needs_committed_text) {
861 (true, true) => Some(if staged_text == committed_text {
862 DiffBasesChange::SetBoth(committed_text)
863 } else {
864 DiffBasesChange::SetEach {
865 index: staged_text,
866 head: committed_text,
867 }
868 }),
869 (true, false) => {
870 Some(DiffBasesChange::SetIndex(staged_text))
871 }
872 (false, true) => {
873 Some(DiffBasesChange::SetHead(committed_text))
874 }
875 (false, false) => None,
876 };
877 Some((buffer_snapshot, diff_bases_change))
878 },
879 )
880 .collect::<Vec<_>>()
881 })
882 .await;
883
884 this.update(&mut cx, |this, cx| {
885 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
886 let Some(OpenBuffer::Complete { diff_state, .. }) =
887 this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
888 else {
889 continue;
890 };
891 let Some(diff_bases_change) = diff_bases_change else {
892 continue;
893 };
894
895 diff_state.update(cx, |diff_state, cx| {
896 use proto::update_diff_bases::Mode;
897
898 if let Some((client, project_id)) = this.downstream_client.as_ref() {
899 let buffer_id = buffer_snapshot.remote_id().to_proto();
900 let (staged_text, committed_text, mode) = match diff_bases_change
901 .clone()
902 {
903 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
904 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
905 DiffBasesChange::SetEach { index, head } => {
906 (index, head, Mode::IndexAndHead)
907 }
908 DiffBasesChange::SetBoth(text) => {
909 (None, text, Mode::IndexMatchesHead)
910 }
911 };
912 let message = proto::UpdateDiffBases {
913 project_id: *project_id,
914 buffer_id,
915 staged_text,
916 committed_text,
917 mode: mode as i32,
918 };
919
920 client.send(message).log_err();
921 }
922
923 let _ =
924 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
925 });
926 }
927 })
928 })
929 .detach_and_log_err(cx);
930 }
931
932 fn local_worktree_entry_changed(
933 this: &mut BufferStore,
934 entry_id: ProjectEntryId,
935 path: &Arc<Path>,
936 worktree: &Entity<worktree::Worktree>,
937 snapshot: &worktree::Snapshot,
938 cx: &mut Context<BufferStore>,
939 ) -> Option<()> {
940 let project_path = ProjectPath {
941 worktree_id: snapshot.id(),
942 path: path.clone(),
943 };
944
945 let buffer_id = {
946 let local = this.as_local_mut()?;
947 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
948 Some(&buffer_id) => buffer_id,
949 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
950 }
951 };
952
953 let buffer = if let Some(buffer) = this.get(buffer_id) {
954 Some(buffer)
955 } else {
956 this.opened_buffers.remove(&buffer_id);
957 None
958 };
959
960 let buffer = if let Some(buffer) = buffer {
961 buffer
962 } else {
963 let this = this.as_local_mut()?;
964 this.local_buffer_ids_by_path.remove(&project_path);
965 this.local_buffer_ids_by_entry_id.remove(&entry_id);
966 return None;
967 };
968
969 let events = buffer.update(cx, |buffer, cx| {
970 let local = this.as_local_mut()?;
971 let file = buffer.file()?;
972 let old_file = File::from_dyn(Some(file))?;
973 if old_file.worktree != *worktree {
974 return None;
975 }
976
977 let snapshot_entry = old_file
978 .entry_id
979 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
980 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
981
982 let new_file = if let Some(entry) = snapshot_entry {
983 File {
984 disk_state: match entry.mtime {
985 Some(mtime) => DiskState::Present { mtime },
986 None => old_file.disk_state,
987 },
988 is_local: true,
989 entry_id: Some(entry.id),
990 path: entry.path.clone(),
991 worktree: worktree.clone(),
992 is_private: entry.is_private,
993 }
994 } else {
995 File {
996 disk_state: DiskState::Deleted,
997 is_local: true,
998 entry_id: old_file.entry_id,
999 path: old_file.path.clone(),
1000 worktree: worktree.clone(),
1001 is_private: old_file.is_private,
1002 }
1003 };
1004
1005 if new_file == *old_file {
1006 return None;
1007 }
1008
1009 let mut events = Vec::new();
1010 if new_file.path != old_file.path {
1011 local.local_buffer_ids_by_path.remove(&ProjectPath {
1012 path: old_file.path.clone(),
1013 worktree_id: old_file.worktree_id(cx),
1014 });
1015 local.local_buffer_ids_by_path.insert(
1016 ProjectPath {
1017 worktree_id: new_file.worktree_id(cx),
1018 path: new_file.path.clone(),
1019 },
1020 buffer_id,
1021 );
1022 events.push(BufferStoreEvent::BufferChangedFilePath {
1023 buffer: cx.entity(),
1024 old_file: buffer.file().cloned(),
1025 });
1026 }
1027
1028 if new_file.entry_id != old_file.entry_id {
1029 if let Some(entry_id) = old_file.entry_id {
1030 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1031 }
1032 if let Some(entry_id) = new_file.entry_id {
1033 local
1034 .local_buffer_ids_by_entry_id
1035 .insert(entry_id, buffer_id);
1036 }
1037 }
1038
1039 if let Some((client, project_id)) = &this.downstream_client {
1040 client
1041 .send(proto::UpdateBufferFile {
1042 project_id: *project_id,
1043 buffer_id: buffer_id.to_proto(),
1044 file: Some(new_file.to_proto(cx)),
1045 })
1046 .ok();
1047 }
1048
1049 buffer.file_updated(Arc::new(new_file), cx);
1050 Some(events)
1051 })?;
1052
1053 for event in events {
1054 cx.emit(event);
1055 }
1056
1057 None
1058 }
1059
1060 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1061 let file = File::from_dyn(buffer.read(cx).file())?;
1062
1063 let remote_id = buffer.read(cx).remote_id();
1064 if let Some(entry_id) = file.entry_id {
1065 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1066 Some(_) => {
1067 return None;
1068 }
1069 None => {
1070 self.local_buffer_ids_by_entry_id
1071 .insert(entry_id, remote_id);
1072 }
1073 }
1074 };
1075 self.local_buffer_ids_by_path.insert(
1076 ProjectPath {
1077 worktree_id: file.worktree_id(cx),
1078 path: file.path.clone(),
1079 },
1080 remote_id,
1081 );
1082
1083 Some(())
1084 }
1085
1086 fn save_buffer(
1087 &self,
1088 buffer: Entity<Buffer>,
1089 cx: &mut Context<BufferStore>,
1090 ) -> Task<Result<()>> {
1091 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1092 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1093 };
1094 let worktree = file.worktree.clone();
1095 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1096 }
1097
1098 fn save_buffer_as(
1099 &self,
1100 buffer: Entity<Buffer>,
1101 path: ProjectPath,
1102 cx: &mut Context<BufferStore>,
1103 ) -> Task<Result<()>> {
1104 let Some(worktree) = self
1105 .worktree_store
1106 .read(cx)
1107 .worktree_for_id(path.worktree_id, cx)
1108 else {
1109 return Task::ready(Err(anyhow!("no such worktree")));
1110 };
1111 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1112 }
1113
1114 fn open_buffer(
1115 &self,
1116 path: Arc<Path>,
1117 worktree: Entity<Worktree>,
1118 cx: &mut Context<BufferStore>,
1119 ) -> Task<Result<Entity<Buffer>>> {
1120 let load_buffer = worktree.update(cx, |worktree, cx| {
1121 let load_file = worktree.load_file(path.as_ref(), cx);
1122 let reservation = cx.reserve_entity();
1123 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1124 cx.spawn(move |_, mut cx| async move {
1125 let loaded = load_file.await?;
1126 let text_buffer = cx
1127 .background_executor()
1128 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1129 .await;
1130 cx.insert_entity(reservation, |_| {
1131 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1132 })
1133 })
1134 });
1135
1136 cx.spawn(move |this, mut cx| async move {
1137 let buffer = match load_buffer.await {
1138 Ok(buffer) => Ok(buffer),
1139 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1140 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1141 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1142 Buffer::build(
1143 text_buffer,
1144 Some(Arc::new(File {
1145 worktree,
1146 path,
1147 disk_state: DiskState::New,
1148 entry_id: None,
1149 is_local: true,
1150 is_private: false,
1151 })),
1152 Capability::ReadWrite,
1153 )
1154 }),
1155 Err(e) => Err(e),
1156 }?;
1157 this.update(&mut cx, |this, cx| {
1158 this.add_buffer(buffer.clone(), cx)?;
1159 let buffer_id = buffer.read(cx).remote_id();
1160 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1161 let this = this.as_local_mut().unwrap();
1162 this.local_buffer_ids_by_path.insert(
1163 ProjectPath {
1164 worktree_id: file.worktree_id(cx),
1165 path: file.path.clone(),
1166 },
1167 buffer_id,
1168 );
1169
1170 if let Some(entry_id) = file.entry_id {
1171 this.local_buffer_ids_by_entry_id
1172 .insert(entry_id, buffer_id);
1173 }
1174 }
1175
1176 anyhow::Ok(())
1177 })??;
1178
1179 Ok(buffer)
1180 })
1181 }
1182
1183 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1184 cx.spawn(|buffer_store, mut cx| async move {
1185 let buffer =
1186 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1187 buffer_store.update(&mut cx, |buffer_store, cx| {
1188 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1189 })?;
1190 Ok(buffer)
1191 })
1192 }
1193
1194 fn reload_buffers(
1195 &self,
1196 buffers: HashSet<Entity<Buffer>>,
1197 push_to_history: bool,
1198 cx: &mut Context<BufferStore>,
1199 ) -> Task<Result<ProjectTransaction>> {
1200 cx.spawn(move |_, mut cx| async move {
1201 let mut project_transaction = ProjectTransaction::default();
1202 for buffer in buffers {
1203 let transaction = buffer
1204 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1205 .await?;
1206 buffer.update(&mut cx, |buffer, cx| {
1207 if let Some(transaction) = transaction {
1208 if !push_to_history {
1209 buffer.forget_transaction(transaction.id);
1210 }
1211 project_transaction.0.insert(cx.entity(), transaction);
1212 }
1213 })?;
1214 }
1215
1216 Ok(project_transaction)
1217 })
1218 }
1219}
1220
1221impl BufferStore {
1222 pub fn init(client: &AnyProtoClient) {
1223 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1224 client.add_entity_message_handler(Self::handle_buffer_saved);
1225 client.add_entity_message_handler(Self::handle_update_buffer_file);
1226 client.add_entity_request_handler(Self::handle_save_buffer);
1227 client.add_entity_request_handler(Self::handle_blame_buffer);
1228 client.add_entity_request_handler(Self::handle_reload_buffers);
1229 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1230 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1231 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1232 client.add_entity_message_handler(Self::handle_update_diff_bases);
1233 }
1234
1235 /// Creates a buffer store, optionally retaining its buffers.
1236 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1237 Self {
1238 state: BufferStoreState::Local(LocalBufferStore {
1239 local_buffer_ids_by_path: Default::default(),
1240 local_buffer_ids_by_entry_id: Default::default(),
1241 worktree_store: worktree_store.clone(),
1242 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1243 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1244 let this = this.as_local_mut().unwrap();
1245 this.subscribe_to_worktree(worktree, cx);
1246 }
1247 }),
1248 }),
1249 downstream_client: None,
1250 opened_buffers: Default::default(),
1251 shared_buffers: Default::default(),
1252 loading_buffers: Default::default(),
1253 loading_diffs: Default::default(),
1254 worktree_store,
1255 }
1256 }
1257
1258 pub fn remote(
1259 worktree_store: Entity<WorktreeStore>,
1260 upstream_client: AnyProtoClient,
1261 remote_id: u64,
1262 _cx: &mut Context<Self>,
1263 ) -> Self {
1264 Self {
1265 state: BufferStoreState::Remote(RemoteBufferStore {
1266 shared_with_me: Default::default(),
1267 loading_remote_buffers_by_id: Default::default(),
1268 remote_buffer_listeners: Default::default(),
1269 project_id: remote_id,
1270 upstream_client,
1271 worktree_store: worktree_store.clone(),
1272 }),
1273 downstream_client: None,
1274 opened_buffers: Default::default(),
1275 loading_buffers: Default::default(),
1276 loading_diffs: Default::default(),
1277 shared_buffers: Default::default(),
1278 worktree_store,
1279 }
1280 }
1281
1282 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1283 match &mut self.state {
1284 BufferStoreState::Local(state) => Some(state),
1285 _ => None,
1286 }
1287 }
1288
1289 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1290 match &mut self.state {
1291 BufferStoreState::Remote(state) => Some(state),
1292 _ => None,
1293 }
1294 }
1295
1296 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1297 match &self.state {
1298 BufferStoreState::Remote(state) => Some(state),
1299 _ => None,
1300 }
1301 }
1302
1303 pub fn open_buffer(
1304 &mut self,
1305 project_path: ProjectPath,
1306 cx: &mut Context<Self>,
1307 ) -> Task<Result<Entity<Buffer>>> {
1308 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1309 return Task::ready(Ok(buffer));
1310 }
1311
1312 let task = match self.loading_buffers.entry(project_path.clone()) {
1313 hash_map::Entry::Occupied(e) => e.get().clone(),
1314 hash_map::Entry::Vacant(entry) => {
1315 let path = project_path.path.clone();
1316 let Some(worktree) = self
1317 .worktree_store
1318 .read(cx)
1319 .worktree_for_id(project_path.worktree_id, cx)
1320 else {
1321 return Task::ready(Err(anyhow!("no such worktree")));
1322 };
1323 let load_buffer = match &self.state {
1324 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1325 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1326 };
1327
1328 entry
1329 .insert(
1330 cx.spawn(move |this, mut cx| async move {
1331 let load_result = load_buffer.await;
1332 this.update(&mut cx, |this, _cx| {
1333 // Record the fact that the buffer is no longer loading.
1334 this.loading_buffers.remove(&project_path);
1335 })
1336 .ok();
1337 load_result.map_err(Arc::new)
1338 })
1339 .shared(),
1340 )
1341 .clone()
1342 }
1343 };
1344
1345 cx.background_executor()
1346 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1347 }
1348
1349 pub fn open_unstaged_diff(
1350 &mut self,
1351 buffer: Entity<Buffer>,
1352 cx: &mut Context<Self>,
1353 ) -> Task<Result<Entity<BufferDiff>>> {
1354 let buffer_id = buffer.read(cx).remote_id();
1355 if let Some(diff) = self.get_unstaged_diff(buffer_id, cx) {
1356 return Task::ready(Ok(diff));
1357 }
1358
1359 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1360 hash_map::Entry::Occupied(e) => e.get().clone(),
1361 hash_map::Entry::Vacant(entry) => {
1362 let staged_text = match &self.state {
1363 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1364 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1365 };
1366
1367 entry
1368 .insert(
1369 cx.spawn(move |this, cx| async move {
1370 Self::open_diff_internal(
1371 this,
1372 DiffKind::Unstaged,
1373 staged_text.await.map(DiffBasesChange::SetIndex),
1374 buffer,
1375 cx,
1376 )
1377 .await
1378 .map_err(Arc::new)
1379 })
1380 .shared(),
1381 )
1382 .clone()
1383 }
1384 };
1385
1386 cx.background_executor()
1387 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1388 }
1389
1390 pub fn open_uncommitted_diff(
1391 &mut self,
1392 buffer: Entity<Buffer>,
1393 cx: &mut Context<Self>,
1394 ) -> Task<Result<Entity<BufferDiff>>> {
1395 let buffer_id = buffer.read(cx).remote_id();
1396 if let Some(diff) = self.get_uncommitted_diff(buffer_id, cx) {
1397 return Task::ready(Ok(diff));
1398 }
1399
1400 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1401 hash_map::Entry::Occupied(e) => e.get().clone(),
1402 hash_map::Entry::Vacant(entry) => {
1403 let changes = match &self.state {
1404 BufferStoreState::Local(this) => {
1405 let committed_text = this.load_committed_text(&buffer, cx);
1406 let staged_text = this.load_staged_text(&buffer, cx);
1407 cx.background_executor().spawn(async move {
1408 let committed_text = committed_text.await?;
1409 let staged_text = staged_text.await?;
1410 let diff_bases_change = if committed_text == staged_text {
1411 DiffBasesChange::SetBoth(committed_text)
1412 } else {
1413 DiffBasesChange::SetEach {
1414 index: staged_text,
1415 head: committed_text,
1416 }
1417 };
1418 Ok(diff_bases_change)
1419 })
1420 }
1421 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1422 };
1423
1424 entry
1425 .insert(
1426 cx.spawn(move |this, cx| async move {
1427 Self::open_diff_internal(
1428 this,
1429 DiffKind::Uncommitted,
1430 changes.await,
1431 buffer,
1432 cx,
1433 )
1434 .await
1435 .map_err(Arc::new)
1436 })
1437 .shared(),
1438 )
1439 .clone()
1440 }
1441 };
1442
1443 cx.background_executor()
1444 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1445 }
1446
1447 async fn open_diff_internal(
1448 this: WeakEntity<Self>,
1449 kind: DiffKind,
1450 texts: Result<DiffBasesChange>,
1451 buffer_entity: Entity<Buffer>,
1452 mut cx: AsyncApp,
1453 ) -> Result<Entity<BufferDiff>> {
1454 let diff_bases_change = match texts {
1455 Err(e) => {
1456 this.update(&mut cx, |this, cx| {
1457 let buffer = buffer_entity.read(cx);
1458 let buffer_id = buffer.remote_id();
1459 this.loading_diffs.remove(&(buffer_id, kind));
1460 })?;
1461 return Err(e);
1462 }
1463 Ok(change) => change,
1464 };
1465
1466 this.update(&mut cx, |this, cx| {
1467 let buffer = buffer_entity.read(cx);
1468 let buffer_id = buffer.remote_id();
1469 let language = buffer.language().cloned();
1470 let language_registry = buffer.language_registry();
1471 let text_snapshot = buffer.text_snapshot();
1472 this.loading_diffs.remove(&(buffer_id, kind));
1473
1474 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1475 this.opened_buffers.get_mut(&buffer_id)
1476 {
1477 diff_state.update(cx, |diff_state, cx| {
1478 diff_state.language = language;
1479 diff_state.language_registry = language_registry;
1480
1481 let diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1482 match kind {
1483 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1484 DiffKind::Uncommitted => {
1485 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1486 diff
1487 } else {
1488 let unstaged_diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1489 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1490 unstaged_diff
1491 };
1492
1493 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
1494 diff_state.uncommitted_diff = Some(diff.downgrade())
1495 }
1496 };
1497
1498 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1499
1500 Ok(async move {
1501 rx.await.ok();
1502 Ok(diff)
1503 })
1504 })
1505 } else {
1506 Err(anyhow!("buffer was closed"))
1507 }
1508 })??
1509 .await
1510 }
1511
1512 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1513 match &self.state {
1514 BufferStoreState::Local(this) => this.create_buffer(cx),
1515 BufferStoreState::Remote(this) => this.create_buffer(cx),
1516 }
1517 }
1518
1519 pub fn save_buffer(
1520 &mut self,
1521 buffer: Entity<Buffer>,
1522 cx: &mut Context<Self>,
1523 ) -> Task<Result<()>> {
1524 match &mut self.state {
1525 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1526 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1527 }
1528 }
1529
1530 pub fn save_buffer_as(
1531 &mut self,
1532 buffer: Entity<Buffer>,
1533 path: ProjectPath,
1534 cx: &mut Context<Self>,
1535 ) -> Task<Result<()>> {
1536 let old_file = buffer.read(cx).file().cloned();
1537 let task = match &self.state {
1538 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1539 BufferStoreState::Remote(this) => {
1540 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1541 }
1542 };
1543 cx.spawn(|this, mut cx| async move {
1544 task.await?;
1545 this.update(&mut cx, |_, cx| {
1546 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1547 })
1548 })
1549 }
1550
1551 pub fn blame_buffer(
1552 &self,
1553 buffer: &Entity<Buffer>,
1554 version: Option<clock::Global>,
1555 cx: &App,
1556 ) -> Task<Result<Option<Blame>>> {
1557 let buffer = buffer.read(cx);
1558 let Some(file) = File::from_dyn(buffer.file()) else {
1559 return Task::ready(Err(anyhow!("buffer has no file")));
1560 };
1561
1562 match file.worktree.clone().read(cx) {
1563 Worktree::Local(worktree) => {
1564 let worktree = worktree.snapshot();
1565 let blame_params = maybe!({
1566 let local_repo = match worktree.local_repo_for_path(&file.path) {
1567 Some(repo_for_path) => repo_for_path,
1568 None => return Ok(None),
1569 };
1570
1571 let relative_path = local_repo
1572 .relativize(&file.path)
1573 .context("failed to relativize buffer path")?;
1574
1575 let repo = local_repo.repo().clone();
1576
1577 let content = match version {
1578 Some(version) => buffer.rope_for_version(&version).clone(),
1579 None => buffer.as_rope().clone(),
1580 };
1581
1582 anyhow::Ok(Some((repo, relative_path, content)))
1583 });
1584
1585 cx.background_executor().spawn(async move {
1586 let Some((repo, relative_path, content)) = blame_params? else {
1587 return Ok(None);
1588 };
1589 repo.blame(&relative_path, content)
1590 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1591 .map(Some)
1592 })
1593 }
1594 Worktree::Remote(worktree) => {
1595 let buffer_id = buffer.remote_id();
1596 let version = buffer.version();
1597 let project_id = worktree.project_id();
1598 let client = worktree.client();
1599 cx.spawn(|_| async move {
1600 let response = client
1601 .request(proto::BlameBuffer {
1602 project_id,
1603 buffer_id: buffer_id.into(),
1604 version: serialize_version(&version),
1605 })
1606 .await?;
1607 Ok(deserialize_blame_buffer_response(response))
1608 })
1609 }
1610 }
1611 }
1612
1613 pub fn get_permalink_to_line(
1614 &self,
1615 buffer: &Entity<Buffer>,
1616 selection: Range<u32>,
1617 cx: &App,
1618 ) -> Task<Result<url::Url>> {
1619 let buffer = buffer.read(cx);
1620 let Some(file) = File::from_dyn(buffer.file()) else {
1621 return Task::ready(Err(anyhow!("buffer has no file")));
1622 };
1623
1624 match file.worktree.read(cx) {
1625 Worktree::Local(worktree) => {
1626 let worktree_path = worktree.abs_path().clone();
1627 let Some((repo_entry, repo)) =
1628 worktree.repository_for_path(file.path()).and_then(|entry| {
1629 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1630 Some((entry, repo))
1631 })
1632 else {
1633 // If we're not in a Git repo, check whether this is a Rust source
1634 // file in the Cargo registry (presumably opened with go-to-definition
1635 // from a normal Rust file). If so, we can put together a permalink
1636 // using crate metadata.
1637 if !buffer
1638 .language()
1639 .is_some_and(|lang| lang.name() == "Rust".into())
1640 {
1641 return Task::ready(Err(anyhow!("no permalink available")));
1642 }
1643 let file_path = worktree_path.join(file.path());
1644 return cx.spawn(|cx| async move {
1645 let provider_registry =
1646 cx.update(GitHostingProviderRegistry::default_global)?;
1647 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1648 .map_err(|_| anyhow!("no permalink available"))
1649 });
1650 };
1651
1652 let path = match repo_entry.relativize(file.path()) {
1653 Ok(RepoPath(path)) => path,
1654 Err(e) => return Task::ready(Err(e)),
1655 };
1656
1657 cx.spawn(|cx| async move {
1658 const REMOTE_NAME: &str = "origin";
1659 let origin_url = repo
1660 .remote_url(REMOTE_NAME)
1661 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1662
1663 let sha = repo
1664 .head_sha()
1665 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1666
1667 let provider_registry =
1668 cx.update(GitHostingProviderRegistry::default_global)?;
1669
1670 let (provider, remote) =
1671 parse_git_remote_url(provider_registry, &origin_url)
1672 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1673
1674 let path = path
1675 .to_str()
1676 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1677
1678 Ok(provider.build_permalink(
1679 remote,
1680 BuildPermalinkParams {
1681 sha: &sha,
1682 path,
1683 selection: Some(selection),
1684 },
1685 ))
1686 })
1687 }
1688 Worktree::Remote(worktree) => {
1689 let buffer_id = buffer.remote_id();
1690 let project_id = worktree.project_id();
1691 let client = worktree.client();
1692 cx.spawn(|_| async move {
1693 let response = client
1694 .request(proto::GetPermalinkToLine {
1695 project_id,
1696 buffer_id: buffer_id.into(),
1697 selection: Some(proto::Range {
1698 start: selection.start as u64,
1699 end: selection.end as u64,
1700 }),
1701 })
1702 .await?;
1703
1704 url::Url::parse(&response.permalink).context("failed to parse permalink")
1705 })
1706 }
1707 }
1708 }
1709
1710 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1711 let buffer = buffer_entity.read(cx);
1712 let language = buffer.language().cloned();
1713 let language_registry = buffer.language_registry();
1714 let remote_id = buffer.remote_id();
1715 let is_remote = buffer.replica_id() != 0;
1716 let open_buffer = OpenBuffer::Complete {
1717 buffer: buffer_entity.downgrade(),
1718 diff_state: cx.new(|_| BufferDiffState {
1719 language,
1720 language_registry,
1721 ..Default::default()
1722 }),
1723 };
1724
1725 let handle = cx.entity().downgrade();
1726 buffer_entity.update(cx, move |_, cx| {
1727 cx.on_release(move |buffer, cx| {
1728 handle
1729 .update(cx, |_, cx| {
1730 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1731 })
1732 .ok();
1733 })
1734 .detach()
1735 });
1736
1737 match self.opened_buffers.entry(remote_id) {
1738 hash_map::Entry::Vacant(entry) => {
1739 entry.insert(open_buffer);
1740 }
1741 hash_map::Entry::Occupied(mut entry) => {
1742 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1743 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1744 } else if entry.get().upgrade().is_some() {
1745 if is_remote {
1746 return Ok(());
1747 } else {
1748 debug_panic!("buffer {} was already registered", remote_id);
1749 Err(anyhow!("buffer {} was already registered", remote_id))?;
1750 }
1751 }
1752 entry.insert(open_buffer);
1753 }
1754 }
1755
1756 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1757 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1758 Ok(())
1759 }
1760
1761 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1762 self.opened_buffers
1763 .values()
1764 .filter_map(|buffer| buffer.upgrade())
1765 }
1766
1767 pub fn loading_buffers(
1768 &self,
1769 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1770 self.loading_buffers.iter().map(|(path, task)| {
1771 let task = task.clone();
1772 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1773 })
1774 }
1775
1776 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1777 self.buffers().find_map(|buffer| {
1778 let file = File::from_dyn(buffer.read(cx).file())?;
1779 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1780 Some(buffer)
1781 } else {
1782 None
1783 }
1784 })
1785 }
1786
1787 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1788 self.opened_buffers.get(&buffer_id)?.upgrade()
1789 }
1790
1791 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1792 self.get(buffer_id)
1793 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1794 }
1795
1796 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1797 self.get(buffer_id).or_else(|| {
1798 self.as_remote()
1799 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1800 })
1801 }
1802
1803 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1804 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1805 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1806 } else {
1807 None
1808 }
1809 }
1810
1811 pub fn get_uncommitted_diff(
1812 &self,
1813 buffer_id: BufferId,
1814 cx: &App,
1815 ) -> Option<Entity<BufferDiff>> {
1816 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1817 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1818 } else {
1819 None
1820 }
1821 }
1822
1823 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1824 let buffers = self
1825 .buffers()
1826 .map(|buffer| {
1827 let buffer = buffer.read(cx);
1828 proto::BufferVersion {
1829 id: buffer.remote_id().into(),
1830 version: language::proto::serialize_version(&buffer.version),
1831 }
1832 })
1833 .collect();
1834 let incomplete_buffer_ids = self
1835 .as_remote()
1836 .map(|remote| remote.incomplete_buffer_ids())
1837 .unwrap_or_default();
1838 (buffers, incomplete_buffer_ids)
1839 }
1840
1841 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1842 for open_buffer in self.opened_buffers.values_mut() {
1843 if let Some(buffer) = open_buffer.upgrade() {
1844 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1845 }
1846 }
1847
1848 for buffer in self.buffers() {
1849 buffer.update(cx, |buffer, cx| {
1850 buffer.set_capability(Capability::ReadOnly, cx)
1851 });
1852 }
1853
1854 if let Some(remote) = self.as_remote_mut() {
1855 // Wake up all futures currently waiting on a buffer to get opened,
1856 // to give them a chance to fail now that we've disconnected.
1857 remote.remote_buffer_listeners.clear()
1858 }
1859 }
1860
1861 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1862 self.downstream_client = Some((downstream_client, remote_id));
1863 }
1864
1865 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1866 self.downstream_client.take();
1867 self.forget_shared_buffers();
1868 }
1869
1870 pub fn discard_incomplete(&mut self) {
1871 self.opened_buffers
1872 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1873 }
1874
1875 pub fn find_search_candidates(
1876 &mut self,
1877 query: &SearchQuery,
1878 mut limit: usize,
1879 fs: Arc<dyn Fs>,
1880 cx: &mut Context<Self>,
1881 ) -> Receiver<Entity<Buffer>> {
1882 let (tx, rx) = smol::channel::unbounded();
1883 let mut open_buffers = HashSet::default();
1884 let mut unnamed_buffers = Vec::new();
1885 for handle in self.buffers() {
1886 let buffer = handle.read(cx);
1887 if let Some(entry_id) = buffer.entry_id(cx) {
1888 open_buffers.insert(entry_id);
1889 } else {
1890 limit = limit.saturating_sub(1);
1891 unnamed_buffers.push(handle)
1892 };
1893 }
1894
1895 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1896 let project_paths_rx = self
1897 .worktree_store
1898 .update(cx, |worktree_store, cx| {
1899 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1900 })
1901 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1902
1903 cx.spawn(|this, mut cx| async move {
1904 for buffer in unnamed_buffers {
1905 tx.send(buffer).await.ok();
1906 }
1907
1908 let mut project_paths_rx = pin!(project_paths_rx);
1909 while let Some(project_paths) = project_paths_rx.next().await {
1910 let buffers = this.update(&mut cx, |this, cx| {
1911 project_paths
1912 .into_iter()
1913 .map(|project_path| this.open_buffer(project_path, cx))
1914 .collect::<Vec<_>>()
1915 })?;
1916 for buffer_task in buffers {
1917 if let Some(buffer) = buffer_task.await.log_err() {
1918 if tx.send(buffer).await.is_err() {
1919 return anyhow::Ok(());
1920 }
1921 }
1922 }
1923 }
1924 anyhow::Ok(())
1925 })
1926 .detach();
1927 rx
1928 }
1929
1930 pub fn recalculate_buffer_diffs(
1931 &mut self,
1932 buffers: Vec<Entity<Buffer>>,
1933 cx: &mut Context<Self>,
1934 ) -> impl Future<Output = ()> {
1935 let mut futures = Vec::new();
1936 for buffer in buffers {
1937 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1938 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1939 {
1940 let buffer = buffer.read(cx).text_snapshot();
1941 futures.push(diff_state.update(cx, |diff_state, cx| {
1942 diff_state.recalculate_diffs(buffer, cx)
1943 }));
1944 }
1945 }
1946 async move {
1947 futures::future::join_all(futures).await;
1948 }
1949 }
1950
1951 fn on_buffer_event(
1952 &mut self,
1953 buffer: Entity<Buffer>,
1954 event: &BufferEvent,
1955 cx: &mut Context<Self>,
1956 ) {
1957 match event {
1958 BufferEvent::FileHandleChanged => {
1959 if let Some(local) = self.as_local_mut() {
1960 local.buffer_changed_file(buffer, cx);
1961 }
1962 }
1963 BufferEvent::Reloaded => {
1964 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1965 return;
1966 };
1967 let buffer = buffer.read(cx);
1968 downstream_client
1969 .send(proto::BufferReloaded {
1970 project_id: *project_id,
1971 buffer_id: buffer.remote_id().to_proto(),
1972 version: serialize_version(&buffer.version()),
1973 mtime: buffer.saved_mtime().map(|t| t.into()),
1974 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1975 })
1976 .log_err();
1977 }
1978 BufferEvent::LanguageChanged => {
1979 let buffer_id = buffer.read(cx).remote_id();
1980 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1981 self.opened_buffers.get(&buffer_id)
1982 {
1983 diff_state.update(cx, |diff_state, cx| {
1984 diff_state.buffer_language_changed(buffer, cx);
1985 });
1986 }
1987 }
1988 _ => {}
1989 }
1990 }
1991
1992 pub async fn handle_update_buffer(
1993 this: Entity<Self>,
1994 envelope: TypedEnvelope<proto::UpdateBuffer>,
1995 mut cx: AsyncApp,
1996 ) -> Result<proto::Ack> {
1997 let payload = envelope.payload.clone();
1998 let buffer_id = BufferId::new(payload.buffer_id)?;
1999 let ops = payload
2000 .operations
2001 .into_iter()
2002 .map(language::proto::deserialize_operation)
2003 .collect::<Result<Vec<_>, _>>()?;
2004 this.update(&mut cx, |this, cx| {
2005 match this.opened_buffers.entry(buffer_id) {
2006 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2007 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2008 OpenBuffer::Complete { buffer, .. } => {
2009 if let Some(buffer) = buffer.upgrade() {
2010 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2011 }
2012 }
2013 },
2014 hash_map::Entry::Vacant(e) => {
2015 e.insert(OpenBuffer::Operations(ops));
2016 }
2017 }
2018 Ok(proto::Ack {})
2019 })?
2020 }
2021
2022 pub fn register_shared_lsp_handle(
2023 &mut self,
2024 peer_id: proto::PeerId,
2025 buffer_id: BufferId,
2026 handle: OpenLspBufferHandle,
2027 ) {
2028 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2029 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2030 buffer.lsp_handle = Some(handle);
2031 return;
2032 }
2033 }
2034 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2035 }
2036
2037 pub fn handle_synchronize_buffers(
2038 &mut self,
2039 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2040 cx: &mut Context<Self>,
2041 client: Arc<Client>,
2042 ) -> Result<proto::SynchronizeBuffersResponse> {
2043 let project_id = envelope.payload.project_id;
2044 let mut response = proto::SynchronizeBuffersResponse {
2045 buffers: Default::default(),
2046 };
2047 let Some(guest_id) = envelope.original_sender_id else {
2048 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2049 };
2050
2051 self.shared_buffers.entry(guest_id).or_default().clear();
2052 for buffer in envelope.payload.buffers {
2053 let buffer_id = BufferId::new(buffer.id)?;
2054 let remote_version = language::proto::deserialize_version(&buffer.version);
2055 if let Some(buffer) = self.get(buffer_id) {
2056 self.shared_buffers
2057 .entry(guest_id)
2058 .or_default()
2059 .entry(buffer_id)
2060 .or_insert_with(|| SharedBuffer {
2061 buffer: buffer.clone(),
2062 diff: None,
2063 lsp_handle: None,
2064 });
2065
2066 let buffer = buffer.read(cx);
2067 response.buffers.push(proto::BufferVersion {
2068 id: buffer_id.into(),
2069 version: language::proto::serialize_version(&buffer.version),
2070 });
2071
2072 let operations = buffer.serialize_ops(Some(remote_version), cx);
2073 let client = client.clone();
2074 if let Some(file) = buffer.file() {
2075 client
2076 .send(proto::UpdateBufferFile {
2077 project_id,
2078 buffer_id: buffer_id.into(),
2079 file: Some(file.to_proto(cx)),
2080 })
2081 .log_err();
2082 }
2083
2084 // TODO(max): do something
2085 // client
2086 // .send(proto::UpdateStagedText {
2087 // project_id,
2088 // buffer_id: buffer_id.into(),
2089 // diff_base: buffer.diff_base().map(ToString::to_string),
2090 // })
2091 // .log_err();
2092
2093 client
2094 .send(proto::BufferReloaded {
2095 project_id,
2096 buffer_id: buffer_id.into(),
2097 version: language::proto::serialize_version(buffer.saved_version()),
2098 mtime: buffer.saved_mtime().map(|time| time.into()),
2099 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2100 as i32,
2101 })
2102 .log_err();
2103
2104 cx.background_executor()
2105 .spawn(
2106 async move {
2107 let operations = operations.await;
2108 for chunk in split_operations(operations) {
2109 client
2110 .request(proto::UpdateBuffer {
2111 project_id,
2112 buffer_id: buffer_id.into(),
2113 operations: chunk,
2114 })
2115 .await?;
2116 }
2117 anyhow::Ok(())
2118 }
2119 .log_err(),
2120 )
2121 .detach();
2122 }
2123 }
2124 Ok(response)
2125 }
2126
2127 pub fn handle_create_buffer_for_peer(
2128 &mut self,
2129 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2130 replica_id: u16,
2131 capability: Capability,
2132 cx: &mut Context<Self>,
2133 ) -> Result<()> {
2134 let Some(remote) = self.as_remote_mut() else {
2135 return Err(anyhow!("buffer store is not a remote"));
2136 };
2137
2138 if let Some(buffer) =
2139 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2140 {
2141 self.add_buffer(buffer, cx)?;
2142 }
2143
2144 Ok(())
2145 }
2146
2147 pub async fn handle_update_buffer_file(
2148 this: Entity<Self>,
2149 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2150 mut cx: AsyncApp,
2151 ) -> Result<()> {
2152 let buffer_id = envelope.payload.buffer_id;
2153 let buffer_id = BufferId::new(buffer_id)?;
2154
2155 this.update(&mut cx, |this, cx| {
2156 let payload = envelope.payload.clone();
2157 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2158 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2159 let worktree = this
2160 .worktree_store
2161 .read(cx)
2162 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2163 .ok_or_else(|| anyhow!("no such worktree"))?;
2164 let file = File::from_proto(file, worktree, cx)?;
2165 let old_file = buffer.update(cx, |buffer, cx| {
2166 let old_file = buffer.file().cloned();
2167 let new_path = file.path.clone();
2168 buffer.file_updated(Arc::new(file), cx);
2169 if old_file
2170 .as_ref()
2171 .map_or(true, |old| *old.path() != new_path)
2172 {
2173 Some(old_file)
2174 } else {
2175 None
2176 }
2177 });
2178 if let Some(old_file) = old_file {
2179 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2180 }
2181 }
2182 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2183 downstream_client
2184 .send(proto::UpdateBufferFile {
2185 project_id: *project_id,
2186 buffer_id: buffer_id.into(),
2187 file: envelope.payload.file,
2188 })
2189 .log_err();
2190 }
2191 Ok(())
2192 })?
2193 }
2194
2195 pub async fn handle_save_buffer(
2196 this: Entity<Self>,
2197 envelope: TypedEnvelope<proto::SaveBuffer>,
2198 mut cx: AsyncApp,
2199 ) -> Result<proto::BufferSaved> {
2200 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2201 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2202 anyhow::Ok((
2203 this.get_existing(buffer_id)?,
2204 this.downstream_client
2205 .as_ref()
2206 .map(|(_, project_id)| *project_id)
2207 .context("project is not shared")?,
2208 ))
2209 })??;
2210 buffer
2211 .update(&mut cx, |buffer, _| {
2212 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2213 })?
2214 .await?;
2215 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2216
2217 if let Some(new_path) = envelope.payload.new_path {
2218 let new_path = ProjectPath::from_proto(new_path);
2219 this.update(&mut cx, |this, cx| {
2220 this.save_buffer_as(buffer.clone(), new_path, cx)
2221 })?
2222 .await?;
2223 } else {
2224 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2225 .await?;
2226 }
2227
2228 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2229 project_id,
2230 buffer_id: buffer_id.into(),
2231 version: serialize_version(buffer.saved_version()),
2232 mtime: buffer.saved_mtime().map(|time| time.into()),
2233 })
2234 }
2235
2236 pub async fn handle_close_buffer(
2237 this: Entity<Self>,
2238 envelope: TypedEnvelope<proto::CloseBuffer>,
2239 mut cx: AsyncApp,
2240 ) -> Result<()> {
2241 let peer_id = envelope.sender_id;
2242 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2243 this.update(&mut cx, |this, _| {
2244 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2245 if shared.remove(&buffer_id).is_some() {
2246 if shared.is_empty() {
2247 this.shared_buffers.remove(&peer_id);
2248 }
2249 return;
2250 }
2251 }
2252 debug_panic!(
2253 "peer_id {} closed buffer_id {} which was either not open or already closed",
2254 peer_id,
2255 buffer_id
2256 )
2257 })
2258 }
2259
2260 pub async fn handle_buffer_saved(
2261 this: Entity<Self>,
2262 envelope: TypedEnvelope<proto::BufferSaved>,
2263 mut cx: AsyncApp,
2264 ) -> Result<()> {
2265 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2266 let version = deserialize_version(&envelope.payload.version);
2267 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2268 this.update(&mut cx, move |this, cx| {
2269 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2270 buffer.update(cx, |buffer, cx| {
2271 buffer.did_save(version, mtime, cx);
2272 });
2273 }
2274
2275 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2276 downstream_client
2277 .send(proto::BufferSaved {
2278 project_id: *project_id,
2279 buffer_id: buffer_id.into(),
2280 mtime: envelope.payload.mtime,
2281 version: envelope.payload.version,
2282 })
2283 .log_err();
2284 }
2285 })
2286 }
2287
2288 pub async fn handle_buffer_reloaded(
2289 this: Entity<Self>,
2290 envelope: TypedEnvelope<proto::BufferReloaded>,
2291 mut cx: AsyncApp,
2292 ) -> Result<()> {
2293 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2294 let version = deserialize_version(&envelope.payload.version);
2295 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2296 let line_ending = deserialize_line_ending(
2297 proto::LineEnding::from_i32(envelope.payload.line_ending)
2298 .ok_or_else(|| anyhow!("missing line ending"))?,
2299 );
2300 this.update(&mut cx, |this, cx| {
2301 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2302 buffer.update(cx, |buffer, cx| {
2303 buffer.did_reload(version, line_ending, mtime, cx);
2304 });
2305 }
2306
2307 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2308 downstream_client
2309 .send(proto::BufferReloaded {
2310 project_id: *project_id,
2311 buffer_id: buffer_id.into(),
2312 mtime: envelope.payload.mtime,
2313 version: envelope.payload.version,
2314 line_ending: envelope.payload.line_ending,
2315 })
2316 .log_err();
2317 }
2318 })
2319 }
2320
2321 pub async fn handle_blame_buffer(
2322 this: Entity<Self>,
2323 envelope: TypedEnvelope<proto::BlameBuffer>,
2324 mut cx: AsyncApp,
2325 ) -> Result<proto::BlameBufferResponse> {
2326 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2327 let version = deserialize_version(&envelope.payload.version);
2328 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2329 buffer
2330 .update(&mut cx, |buffer, _| {
2331 buffer.wait_for_version(version.clone())
2332 })?
2333 .await?;
2334 let blame = this
2335 .update(&mut cx, |this, cx| {
2336 this.blame_buffer(&buffer, Some(version), cx)
2337 })?
2338 .await?;
2339 Ok(serialize_blame_buffer_response(blame))
2340 }
2341
2342 pub async fn handle_get_permalink_to_line(
2343 this: Entity<Self>,
2344 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2345 mut cx: AsyncApp,
2346 ) -> Result<proto::GetPermalinkToLineResponse> {
2347 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2348 // let version = deserialize_version(&envelope.payload.version);
2349 let selection = {
2350 let proto_selection = envelope
2351 .payload
2352 .selection
2353 .context("no selection to get permalink for defined")?;
2354 proto_selection.start as u32..proto_selection.end as u32
2355 };
2356 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2357 let permalink = this
2358 .update(&mut cx, |this, cx| {
2359 this.get_permalink_to_line(&buffer, selection, cx)
2360 })?
2361 .await?;
2362 Ok(proto::GetPermalinkToLineResponse {
2363 permalink: permalink.to_string(),
2364 })
2365 }
2366
2367 pub async fn handle_open_unstaged_diff(
2368 this: Entity<Self>,
2369 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2370 mut cx: AsyncApp,
2371 ) -> Result<proto::OpenUnstagedDiffResponse> {
2372 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2373 let diff = this
2374 .update(&mut cx, |this, cx| {
2375 let buffer = this.get(buffer_id)?;
2376 Some(this.open_unstaged_diff(buffer, cx))
2377 })?
2378 .ok_or_else(|| anyhow!("no such buffer"))?
2379 .await?;
2380 this.update(&mut cx, |this, _| {
2381 let shared_buffers = this
2382 .shared_buffers
2383 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2384 .or_default();
2385 debug_assert!(shared_buffers.contains_key(&buffer_id));
2386 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2387 shared.diff = Some(diff.clone());
2388 }
2389 })?;
2390 let staged_text =
2391 diff.read_with(&cx, |diff, _| diff.base_text().map(|buffer| buffer.text()))?;
2392 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2393 }
2394
2395 pub async fn handle_open_uncommitted_diff(
2396 this: Entity<Self>,
2397 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2398 mut cx: AsyncApp,
2399 ) -> Result<proto::OpenUncommittedDiffResponse> {
2400 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2401 let diff = this
2402 .update(&mut cx, |this, cx| {
2403 let buffer = this.get(buffer_id)?;
2404 Some(this.open_uncommitted_diff(buffer, cx))
2405 })?
2406 .ok_or_else(|| anyhow!("no such buffer"))?
2407 .await?;
2408 this.update(&mut cx, |this, _| {
2409 let shared_buffers = this
2410 .shared_buffers
2411 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2412 .or_default();
2413 debug_assert!(shared_buffers.contains_key(&buffer_id));
2414 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2415 shared.diff = Some(diff.clone());
2416 }
2417 })?;
2418 diff.read_with(&cx, |diff, cx| {
2419 use proto::open_uncommitted_diff_response::Mode;
2420
2421 let staged_buffer = diff
2422 .secondary_diff()
2423 .and_then(|diff| diff.read(cx).base_text());
2424
2425 let mode;
2426 let staged_text;
2427 let committed_text;
2428 if let Some(committed_buffer) = diff.base_text() {
2429 committed_text = Some(committed_buffer.text());
2430 if let Some(staged_buffer) = staged_buffer {
2431 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2432 mode = Mode::IndexMatchesHead;
2433 staged_text = None;
2434 } else {
2435 mode = Mode::IndexAndHead;
2436 staged_text = Some(staged_buffer.text());
2437 }
2438 } else {
2439 mode = Mode::IndexAndHead;
2440 staged_text = None;
2441 }
2442 } else {
2443 mode = Mode::IndexAndHead;
2444 committed_text = None;
2445 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2446 }
2447
2448 proto::OpenUncommittedDiffResponse {
2449 committed_text,
2450 staged_text,
2451 mode: mode.into(),
2452 }
2453 })
2454 }
2455
2456 pub async fn handle_update_diff_bases(
2457 this: Entity<Self>,
2458 request: TypedEnvelope<proto::UpdateDiffBases>,
2459 mut cx: AsyncApp,
2460 ) -> Result<()> {
2461 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2462 this.update(&mut cx, |this, cx| {
2463 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2464 this.opened_buffers.get_mut(&buffer_id)
2465 {
2466 if let Some(buffer) = buffer.upgrade() {
2467 let buffer = buffer.read(cx).text_snapshot();
2468 diff_state.update(cx, |diff_state, cx| {
2469 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2470 })
2471 }
2472 }
2473 })
2474 }
2475
2476 pub fn reload_buffers(
2477 &self,
2478 buffers: HashSet<Entity<Buffer>>,
2479 push_to_history: bool,
2480 cx: &mut Context<Self>,
2481 ) -> Task<Result<ProjectTransaction>> {
2482 if buffers.is_empty() {
2483 return Task::ready(Ok(ProjectTransaction::default()));
2484 }
2485 match &self.state {
2486 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2487 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2488 }
2489 }
2490
2491 async fn handle_reload_buffers(
2492 this: Entity<Self>,
2493 envelope: TypedEnvelope<proto::ReloadBuffers>,
2494 mut cx: AsyncApp,
2495 ) -> Result<proto::ReloadBuffersResponse> {
2496 let sender_id = envelope.original_sender_id().unwrap_or_default();
2497 let reload = this.update(&mut cx, |this, cx| {
2498 let mut buffers = HashSet::default();
2499 for buffer_id in &envelope.payload.buffer_ids {
2500 let buffer_id = BufferId::new(*buffer_id)?;
2501 buffers.insert(this.get_existing(buffer_id)?);
2502 }
2503 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2504 })??;
2505
2506 let project_transaction = reload.await?;
2507 let project_transaction = this.update(&mut cx, |this, cx| {
2508 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2509 })?;
2510 Ok(proto::ReloadBuffersResponse {
2511 transaction: Some(project_transaction),
2512 })
2513 }
2514
2515 pub fn create_buffer_for_peer(
2516 &mut self,
2517 buffer: &Entity<Buffer>,
2518 peer_id: proto::PeerId,
2519 cx: &mut Context<Self>,
2520 ) -> Task<Result<()>> {
2521 let buffer_id = buffer.read(cx).remote_id();
2522 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2523 if shared_buffers.contains_key(&buffer_id) {
2524 return Task::ready(Ok(()));
2525 }
2526 shared_buffers.insert(
2527 buffer_id,
2528 SharedBuffer {
2529 buffer: buffer.clone(),
2530 diff: None,
2531 lsp_handle: None,
2532 },
2533 );
2534
2535 let Some((client, project_id)) = self.downstream_client.clone() else {
2536 return Task::ready(Ok(()));
2537 };
2538
2539 cx.spawn(|this, mut cx| async move {
2540 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2541 return anyhow::Ok(());
2542 };
2543
2544 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2545 let operations = operations.await;
2546 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2547
2548 let initial_state = proto::CreateBufferForPeer {
2549 project_id,
2550 peer_id: Some(peer_id),
2551 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2552 };
2553
2554 if client.send(initial_state).log_err().is_some() {
2555 let client = client.clone();
2556 cx.background_executor()
2557 .spawn(async move {
2558 let mut chunks = split_operations(operations).peekable();
2559 while let Some(chunk) = chunks.next() {
2560 let is_last = chunks.peek().is_none();
2561 client.send(proto::CreateBufferForPeer {
2562 project_id,
2563 peer_id: Some(peer_id),
2564 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2565 proto::BufferChunk {
2566 buffer_id: buffer_id.into(),
2567 operations: chunk,
2568 is_last,
2569 },
2570 )),
2571 })?;
2572 }
2573 anyhow::Ok(())
2574 })
2575 .await
2576 .log_err();
2577 }
2578 Ok(())
2579 })
2580 }
2581
2582 pub fn forget_shared_buffers(&mut self) {
2583 self.shared_buffers.clear();
2584 }
2585
2586 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2587 self.shared_buffers.remove(peer_id);
2588 }
2589
2590 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2591 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2592 self.shared_buffers.insert(new_peer_id, buffers);
2593 }
2594 }
2595
2596 pub fn has_shared_buffers(&self) -> bool {
2597 !self.shared_buffers.is_empty()
2598 }
2599
2600 pub fn create_local_buffer(
2601 &mut self,
2602 text: &str,
2603 language: Option<Arc<Language>>,
2604 cx: &mut Context<Self>,
2605 ) -> Entity<Buffer> {
2606 let buffer = cx.new(|cx| {
2607 Buffer::local(text, cx)
2608 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2609 });
2610
2611 self.add_buffer(buffer.clone(), cx).log_err();
2612 let buffer_id = buffer.read(cx).remote_id();
2613
2614 let this = self
2615 .as_local_mut()
2616 .expect("local-only method called in a non-local context");
2617 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2618 this.local_buffer_ids_by_path.insert(
2619 ProjectPath {
2620 worktree_id: file.worktree_id(cx),
2621 path: file.path.clone(),
2622 },
2623 buffer_id,
2624 );
2625
2626 if let Some(entry_id) = file.entry_id {
2627 this.local_buffer_ids_by_entry_id
2628 .insert(entry_id, buffer_id);
2629 }
2630 }
2631 buffer
2632 }
2633
2634 pub fn deserialize_project_transaction(
2635 &mut self,
2636 message: proto::ProjectTransaction,
2637 push_to_history: bool,
2638 cx: &mut Context<Self>,
2639 ) -> Task<Result<ProjectTransaction>> {
2640 if let Some(this) = self.as_remote_mut() {
2641 this.deserialize_project_transaction(message, push_to_history, cx)
2642 } else {
2643 debug_panic!("not a remote buffer store");
2644 Task::ready(Err(anyhow!("not a remote buffer store")))
2645 }
2646 }
2647
2648 pub fn wait_for_remote_buffer(
2649 &mut self,
2650 id: BufferId,
2651 cx: &mut Context<BufferStore>,
2652 ) -> Task<Result<Entity<Buffer>>> {
2653 if let Some(this) = self.as_remote_mut() {
2654 this.wait_for_remote_buffer(id, cx)
2655 } else {
2656 debug_panic!("not a remote buffer store");
2657 Task::ready(Err(anyhow!("not a remote buffer store")))
2658 }
2659 }
2660
2661 pub fn serialize_project_transaction_for_peer(
2662 &mut self,
2663 project_transaction: ProjectTransaction,
2664 peer_id: proto::PeerId,
2665 cx: &mut Context<Self>,
2666 ) -> proto::ProjectTransaction {
2667 let mut serialized_transaction = proto::ProjectTransaction {
2668 buffer_ids: Default::default(),
2669 transactions: Default::default(),
2670 };
2671 for (buffer, transaction) in project_transaction.0 {
2672 self.create_buffer_for_peer(&buffer, peer_id, cx)
2673 .detach_and_log_err(cx);
2674 serialized_transaction
2675 .buffer_ids
2676 .push(buffer.read(cx).remote_id().into());
2677 serialized_transaction
2678 .transactions
2679 .push(language::proto::serialize_transaction(&transaction));
2680 }
2681 serialized_transaction
2682 }
2683}
2684
2685impl OpenBuffer {
2686 fn upgrade(&self) -> Option<Entity<Buffer>> {
2687 match self {
2688 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2689 OpenBuffer::Operations(_) => None,
2690 }
2691 }
2692}
2693
2694fn is_not_found_error(error: &anyhow::Error) -> bool {
2695 error
2696 .root_cause()
2697 .downcast_ref::<io::Error>()
2698 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2699}
2700
2701fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2702 let Some(blame) = blame else {
2703 return proto::BlameBufferResponse {
2704 blame_response: None,
2705 };
2706 };
2707
2708 let entries = blame
2709 .entries
2710 .into_iter()
2711 .map(|entry| proto::BlameEntry {
2712 sha: entry.sha.as_bytes().into(),
2713 start_line: entry.range.start,
2714 end_line: entry.range.end,
2715 original_line_number: entry.original_line_number,
2716 author: entry.author.clone(),
2717 author_mail: entry.author_mail.clone(),
2718 author_time: entry.author_time,
2719 author_tz: entry.author_tz.clone(),
2720 committer: entry.committer.clone(),
2721 committer_mail: entry.committer_mail.clone(),
2722 committer_time: entry.committer_time,
2723 committer_tz: entry.committer_tz.clone(),
2724 summary: entry.summary.clone(),
2725 previous: entry.previous.clone(),
2726 filename: entry.filename.clone(),
2727 })
2728 .collect::<Vec<_>>();
2729
2730 let messages = blame
2731 .messages
2732 .into_iter()
2733 .map(|(oid, message)| proto::CommitMessage {
2734 oid: oid.as_bytes().into(),
2735 message,
2736 })
2737 .collect::<Vec<_>>();
2738
2739 let permalinks = blame
2740 .permalinks
2741 .into_iter()
2742 .map(|(oid, url)| proto::CommitPermalink {
2743 oid: oid.as_bytes().into(),
2744 permalink: url.to_string(),
2745 })
2746 .collect::<Vec<_>>();
2747
2748 proto::BlameBufferResponse {
2749 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2750 entries,
2751 messages,
2752 permalinks,
2753 remote_url: blame.remote_url,
2754 }),
2755 }
2756}
2757
2758fn deserialize_blame_buffer_response(
2759 response: proto::BlameBufferResponse,
2760) -> Option<git::blame::Blame> {
2761 let response = response.blame_response?;
2762 let entries = response
2763 .entries
2764 .into_iter()
2765 .filter_map(|entry| {
2766 Some(git::blame::BlameEntry {
2767 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2768 range: entry.start_line..entry.end_line,
2769 original_line_number: entry.original_line_number,
2770 committer: entry.committer,
2771 committer_time: entry.committer_time,
2772 committer_tz: entry.committer_tz,
2773 committer_mail: entry.committer_mail,
2774 author: entry.author,
2775 author_mail: entry.author_mail,
2776 author_time: entry.author_time,
2777 author_tz: entry.author_tz,
2778 summary: entry.summary,
2779 previous: entry.previous,
2780 filename: entry.filename,
2781 })
2782 })
2783 .collect::<Vec<_>>();
2784
2785 let messages = response
2786 .messages
2787 .into_iter()
2788 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2789 .collect::<HashMap<_, _>>();
2790
2791 let permalinks = response
2792 .permalinks
2793 .into_iter()
2794 .filter_map(|permalink| {
2795 Some((
2796 git::Oid::from_bytes(&permalink.oid).ok()?,
2797 Url::from_str(&permalink.permalink).ok()?,
2798 ))
2799 })
2800 .collect::<HashMap<_, _>>();
2801
2802 Some(Blame {
2803 entries,
2804 permalinks,
2805 messages,
2806 remote_url: response.remote_url,
2807 })
2808}
2809
2810fn get_permalink_in_rust_registry_src(
2811 provider_registry: Arc<GitHostingProviderRegistry>,
2812 path: PathBuf,
2813 selection: Range<u32>,
2814) -> Result<url::Url> {
2815 #[derive(Deserialize)]
2816 struct CargoVcsGit {
2817 sha1: String,
2818 }
2819
2820 #[derive(Deserialize)]
2821 struct CargoVcsInfo {
2822 git: CargoVcsGit,
2823 path_in_vcs: String,
2824 }
2825
2826 #[derive(Deserialize)]
2827 struct CargoPackage {
2828 repository: String,
2829 }
2830
2831 #[derive(Deserialize)]
2832 struct CargoToml {
2833 package: CargoPackage,
2834 }
2835
2836 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2837 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2838 Some((dir, json))
2839 }) else {
2840 bail!("No .cargo_vcs_info.json found in parent directories")
2841 };
2842 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2843 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2844 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2845 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2846 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2847 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2848 let permalink = provider.build_permalink(
2849 remote,
2850 BuildPermalinkParams {
2851 sha: &cargo_vcs_info.git.sha1,
2852 path: &path.to_string_lossy(),
2853 selection: Some(selection),
2854 },
2855 );
2856 Ok(permalink)
2857}