1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use buffer_diff::{BufferDiff, BufferDiffEvent};
10use client::Client;
11use collections::{hash_map, HashMap, HashSet};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{
27 proto::{self, ToProto},
28 AnyProtoClient, ErrorExt as _, TypedEnvelope,
29};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::BufferId;
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum DiffKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_diffs:
58 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
59 worktree_store: Entity<WorktreeStore>,
60 opened_buffers: HashMap<BufferId, OpenBuffer>,
61 downstream_client: Option<(AnyProtoClient, u64)>,
62 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
63}
64
65#[derive(Hash, Eq, PartialEq, Clone)]
66struct SharedBuffer {
67 buffer: Entity<Buffer>,
68 diff: Option<Entity<BufferDiff>>,
69 lsp_handle: Option<OpenLspBufferHandle>,
70}
71
72#[derive(Default)]
73struct BufferDiffState {
74 unstaged_diff: Option<WeakEntity<BufferDiff>>,
75 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
76 recalculate_diff_task: Option<Task<Result<()>>>,
77 language: Option<Arc<Language>>,
78 language_registry: Option<Arc<LanguageRegistry>>,
79 diff_updated_futures: Vec<oneshot::Sender<()>>,
80
81 head_text: Option<Arc<String>>,
82 index_text: Option<Arc<String>>,
83 head_changed: bool,
84 index_changed: bool,
85 language_changed: bool,
86}
87
88#[derive(Clone, Debug)]
89enum DiffBasesChange {
90 SetIndex(Option<String>),
91 SetHead(Option<String>),
92 SetEach {
93 index: Option<String>,
94 head: Option<String>,
95 },
96 SetBoth(Option<String>),
97}
98
99impl BufferDiffState {
100 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
101 self.language = buffer.read(cx).language().cloned();
102 self.language_changed = true;
103 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
104 }
105
106 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
107 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
108 }
109
110 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
111 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
112 }
113
114 fn handle_base_texts_updated(
115 &mut self,
116 buffer: text::BufferSnapshot,
117 message: proto::UpdateDiffBases,
118 cx: &mut Context<Self>,
119 ) {
120 use proto::update_diff_bases::Mode;
121
122 let Some(mode) = Mode::from_i32(message.mode) else {
123 return;
124 };
125
126 let diff_bases_change = match mode {
127 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
128 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
129 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
130 Mode::IndexAndHead => DiffBasesChange::SetEach {
131 index: message.staged_text,
132 head: message.committed_text,
133 },
134 };
135
136 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
137 }
138
139 fn diff_bases_changed(
140 &mut self,
141 buffer: text::BufferSnapshot,
142 diff_bases_change: DiffBasesChange,
143 cx: &mut Context<Self>,
144 ) -> oneshot::Receiver<()> {
145 match diff_bases_change {
146 DiffBasesChange::SetIndex(index) => {
147 self.index_text = index.map(|mut index| {
148 text::LineEnding::normalize(&mut index);
149 Arc::new(index)
150 });
151 self.index_changed = true;
152 }
153 DiffBasesChange::SetHead(head) => {
154 self.head_text = head.map(|mut head| {
155 text::LineEnding::normalize(&mut head);
156 Arc::new(head)
157 });
158 self.head_changed = true;
159 }
160 DiffBasesChange::SetBoth(text) => {
161 let text = text.map(|mut text| {
162 text::LineEnding::normalize(&mut text);
163 Arc::new(text)
164 });
165 self.head_text = text.clone();
166 self.index_text = text;
167 self.head_changed = true;
168 self.index_changed = true;
169 }
170 DiffBasesChange::SetEach { index, head } => {
171 self.index_text = index.map(|mut index| {
172 text::LineEnding::normalize(&mut index);
173 Arc::new(index)
174 });
175 self.index_changed = true;
176 self.head_text = head.map(|mut head| {
177 text::LineEnding::normalize(&mut head);
178 Arc::new(head)
179 });
180 self.head_changed = true;
181 }
182 }
183
184 self.recalculate_diffs(buffer, cx)
185 }
186
187 fn recalculate_diffs(
188 &mut self,
189 buffer: text::BufferSnapshot,
190 cx: &mut Context<Self>,
191 ) -> oneshot::Receiver<()> {
192 let (tx, rx) = oneshot::channel();
193 self.diff_updated_futures.push(tx);
194
195 let language = self.language.clone();
196 let language_registry = self.language_registry.clone();
197 let unstaged_diff = self.unstaged_diff();
198 let uncommitted_diff = self.uncommitted_diff();
199 let head = self.head_text.clone();
200 let index = self.index_text.clone();
201 let index_changed = self.index_changed;
202 let head_changed = self.head_changed;
203 let language_changed = self.language_changed;
204 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
205 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
206 (None, None) => true,
207 _ => false,
208 };
209 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
210 let mut unstaged_changed_range = None;
211 if let Some(unstaged_diff) = &unstaged_diff {
212 unstaged_changed_range = BufferDiff::update_diff(
213 unstaged_diff.clone(),
214 buffer.clone(),
215 index,
216 index_changed,
217 language_changed,
218 language.clone(),
219 language_registry.clone(),
220 &mut cx,
221 )
222 .await?;
223
224 unstaged_diff.update(&mut cx, |_, cx| {
225 if language_changed {
226 cx.emit(BufferDiffEvent::LanguageChanged);
227 }
228 if let Some(changed_range) = unstaged_changed_range.clone() {
229 cx.emit(BufferDiffEvent::DiffChanged {
230 changed_range: Some(changed_range),
231 })
232 }
233 })?;
234 }
235
236 if let Some(uncommitted_diff) = &uncommitted_diff {
237 let uncommitted_changed_range =
238 if let (Some(unstaged_diff), true) = (&unstaged_diff, index_matches_head) {
239 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
240 uncommitted_diff.update_diff_from(&buffer, unstaged_diff, cx)
241 })?
242 } else {
243 BufferDiff::update_diff(
244 uncommitted_diff.clone(),
245 buffer.clone(),
246 head,
247 head_changed,
248 language_changed,
249 language.clone(),
250 language_registry.clone(),
251 &mut cx,
252 )
253 .await?
254 };
255
256 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
257 if language_changed {
258 cx.emit(BufferDiffEvent::LanguageChanged);
259 }
260 let changed_range = match (unstaged_changed_range, uncommitted_changed_range) {
261 (None, None) => None,
262 (Some(unstaged_range), None) => {
263 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
264 }
265 (None, Some(uncommitted_range)) => Some(uncommitted_range),
266 (Some(unstaged_range), Some(uncommitted_range)) => maybe!({
267 let expanded_range = uncommitted_diff.range_to_hunk_range(
268 unstaged_range,
269 &buffer,
270 cx,
271 )?;
272 let start = expanded_range.start.min(&uncommitted_range.start, &buffer);
273 let end = expanded_range.end.max(&uncommitted_range.end, &buffer);
274 Some(start..end)
275 }),
276 };
277 cx.emit(BufferDiffEvent::DiffChanged { changed_range });
278 })?;
279 }
280
281 if let Some(this) = this.upgrade() {
282 this.update(&mut cx, |this, _| {
283 this.index_changed = false;
284 this.head_changed = false;
285 this.language_changed = false;
286 for tx in this.diff_updated_futures.drain(..) {
287 tx.send(()).ok();
288 }
289 })?;
290 }
291
292 Ok(())
293 }));
294
295 rx
296 }
297}
298
299enum BufferStoreState {
300 Local(LocalBufferStore),
301 Remote(RemoteBufferStore),
302}
303
304struct RemoteBufferStore {
305 shared_with_me: HashSet<Entity<Buffer>>,
306 upstream_client: AnyProtoClient,
307 project_id: u64,
308 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
309 remote_buffer_listeners:
310 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
311 worktree_store: Entity<WorktreeStore>,
312}
313
314struct LocalBufferStore {
315 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
316 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
317 worktree_store: Entity<WorktreeStore>,
318 _subscription: Subscription,
319}
320
321enum OpenBuffer {
322 Complete {
323 buffer: WeakEntity<Buffer>,
324 diff_state: Entity<BufferDiffState>,
325 },
326 Operations(Vec<Operation>),
327}
328
329pub enum BufferStoreEvent {
330 BufferAdded(Entity<Buffer>),
331 BufferDropped(BufferId),
332 BufferChangedFilePath {
333 buffer: Entity<Buffer>,
334 old_file: Option<Arc<dyn language::File>>,
335 },
336}
337
338#[derive(Default, Debug)]
339pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
340
341impl EventEmitter<BufferStoreEvent> for BufferStore {}
342
343impl RemoteBufferStore {
344 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
345 let project_id = self.project_id;
346 let client = self.upstream_client.clone();
347 cx.background_executor().spawn(async move {
348 let response = client
349 .request(proto::OpenUnstagedDiff {
350 project_id,
351 buffer_id: buffer_id.to_proto(),
352 })
353 .await?;
354 Ok(response.staged_text)
355 })
356 }
357
358 fn open_uncommitted_diff(
359 &self,
360 buffer_id: BufferId,
361 cx: &App,
362 ) -> Task<Result<DiffBasesChange>> {
363 use proto::open_uncommitted_diff_response::Mode;
364
365 let project_id = self.project_id;
366 let client = self.upstream_client.clone();
367 cx.background_executor().spawn(async move {
368 let response = client
369 .request(proto::OpenUncommittedDiff {
370 project_id,
371 buffer_id: buffer_id.to_proto(),
372 })
373 .await?;
374 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
375 let bases = match mode {
376 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
377 Mode::IndexAndHead => DiffBasesChange::SetEach {
378 head: response.committed_text,
379 index: response.staged_text,
380 },
381 };
382 Ok(bases)
383 })
384 }
385
386 pub fn wait_for_remote_buffer(
387 &mut self,
388 id: BufferId,
389 cx: &mut Context<BufferStore>,
390 ) -> Task<Result<Entity<Buffer>>> {
391 let (tx, rx) = oneshot::channel();
392 self.remote_buffer_listeners.entry(id).or_default().push(tx);
393
394 cx.spawn(|this, cx| async move {
395 if let Some(buffer) = this
396 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
397 .ok()
398 .flatten()
399 {
400 return Ok(buffer);
401 }
402
403 cx.background_executor()
404 .spawn(async move { rx.await? })
405 .await
406 })
407 }
408
409 fn save_remote_buffer(
410 &self,
411 buffer_handle: Entity<Buffer>,
412 new_path: Option<proto::ProjectPath>,
413 cx: &Context<BufferStore>,
414 ) -> Task<Result<()>> {
415 let buffer = buffer_handle.read(cx);
416 let buffer_id = buffer.remote_id().into();
417 let version = buffer.version();
418 let rpc = self.upstream_client.clone();
419 let project_id = self.project_id;
420 cx.spawn(move |_, mut cx| async move {
421 let response = rpc
422 .request(proto::SaveBuffer {
423 project_id,
424 buffer_id,
425 new_path,
426 version: serialize_version(&version),
427 })
428 .await?;
429 let version = deserialize_version(&response.version);
430 let mtime = response.mtime.map(|mtime| mtime.into());
431
432 buffer_handle.update(&mut cx, |buffer, cx| {
433 buffer.did_save(version.clone(), mtime, cx);
434 })?;
435
436 Ok(())
437 })
438 }
439
440 pub fn handle_create_buffer_for_peer(
441 &mut self,
442 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
443 replica_id: u16,
444 capability: Capability,
445 cx: &mut Context<BufferStore>,
446 ) -> Result<Option<Entity<Buffer>>> {
447 match envelope
448 .payload
449 .variant
450 .ok_or_else(|| anyhow!("missing variant"))?
451 {
452 proto::create_buffer_for_peer::Variant::State(mut state) => {
453 let buffer_id = BufferId::new(state.id)?;
454
455 let buffer_result = maybe!({
456 let mut buffer_file = None;
457 if let Some(file) = state.file.take() {
458 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
459 let worktree = self
460 .worktree_store
461 .read(cx)
462 .worktree_for_id(worktree_id, cx)
463 .ok_or_else(|| {
464 anyhow!("no worktree found for id {}", file.worktree_id)
465 })?;
466 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
467 as Arc<dyn language::File>);
468 }
469 Buffer::from_proto(replica_id, capability, state, buffer_file)
470 });
471
472 match buffer_result {
473 Ok(buffer) => {
474 let buffer = cx.new(|_| buffer);
475 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
476 }
477 Err(error) => {
478 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
479 for listener in listeners {
480 listener.send(Err(anyhow!(error.cloned()))).ok();
481 }
482 }
483 }
484 }
485 }
486 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
487 let buffer_id = BufferId::new(chunk.buffer_id)?;
488 let buffer = self
489 .loading_remote_buffers_by_id
490 .get(&buffer_id)
491 .cloned()
492 .ok_or_else(|| {
493 anyhow!(
494 "received chunk for buffer {} without initial state",
495 chunk.buffer_id
496 )
497 })?;
498
499 let result = maybe!({
500 let operations = chunk
501 .operations
502 .into_iter()
503 .map(language::proto::deserialize_operation)
504 .collect::<Result<Vec<_>>>()?;
505 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
506 anyhow::Ok(())
507 });
508
509 if let Err(error) = result {
510 self.loading_remote_buffers_by_id.remove(&buffer_id);
511 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
512 for listener in listeners {
513 listener.send(Err(error.cloned())).ok();
514 }
515 }
516 } else if chunk.is_last {
517 self.loading_remote_buffers_by_id.remove(&buffer_id);
518 if self.upstream_client.is_via_collab() {
519 // retain buffers sent by peers to avoid races.
520 self.shared_with_me.insert(buffer.clone());
521 }
522
523 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
524 for sender in senders {
525 sender.send(Ok(buffer.clone())).ok();
526 }
527 }
528 return Ok(Some(buffer));
529 }
530 }
531 }
532 return Ok(None);
533 }
534
535 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
536 self.loading_remote_buffers_by_id
537 .keys()
538 .copied()
539 .collect::<Vec<_>>()
540 }
541
542 pub fn deserialize_project_transaction(
543 &self,
544 message: proto::ProjectTransaction,
545 push_to_history: bool,
546 cx: &mut Context<BufferStore>,
547 ) -> Task<Result<ProjectTransaction>> {
548 cx.spawn(|this, mut cx| async move {
549 let mut project_transaction = ProjectTransaction::default();
550 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
551 {
552 let buffer_id = BufferId::new(buffer_id)?;
553 let buffer = this
554 .update(&mut cx, |this, cx| {
555 this.wait_for_remote_buffer(buffer_id, cx)
556 })?
557 .await?;
558 let transaction = language::proto::deserialize_transaction(transaction)?;
559 project_transaction.0.insert(buffer, transaction);
560 }
561
562 for (buffer, transaction) in &project_transaction.0 {
563 buffer
564 .update(&mut cx, |buffer, _| {
565 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
566 })?
567 .await?;
568
569 if push_to_history {
570 buffer.update(&mut cx, |buffer, _| {
571 buffer.push_transaction(transaction.clone(), Instant::now());
572 })?;
573 }
574 }
575
576 Ok(project_transaction)
577 })
578 }
579
580 fn open_buffer(
581 &self,
582 path: Arc<Path>,
583 worktree: Entity<Worktree>,
584 cx: &mut Context<BufferStore>,
585 ) -> Task<Result<Entity<Buffer>>> {
586 let worktree_id = worktree.read(cx).id().to_proto();
587 let project_id = self.project_id;
588 let client = self.upstream_client.clone();
589 cx.spawn(move |this, mut cx| async move {
590 let response = client
591 .request(proto::OpenBufferByPath {
592 project_id,
593 worktree_id,
594 path: path.to_proto(),
595 })
596 .await?;
597 let buffer_id = BufferId::new(response.buffer_id)?;
598
599 let buffer = this
600 .update(&mut cx, {
601 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
602 })?
603 .await?;
604
605 Ok(buffer)
606 })
607 }
608
609 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
610 let create = self.upstream_client.request(proto::OpenNewBuffer {
611 project_id: self.project_id,
612 });
613 cx.spawn(|this, mut cx| async move {
614 let response = create.await?;
615 let buffer_id = BufferId::new(response.buffer_id)?;
616
617 this.update(&mut cx, |this, cx| {
618 this.wait_for_remote_buffer(buffer_id, cx)
619 })?
620 .await
621 })
622 }
623
624 fn reload_buffers(
625 &self,
626 buffers: HashSet<Entity<Buffer>>,
627 push_to_history: bool,
628 cx: &mut Context<BufferStore>,
629 ) -> Task<Result<ProjectTransaction>> {
630 let request = self.upstream_client.request(proto::ReloadBuffers {
631 project_id: self.project_id,
632 buffer_ids: buffers
633 .iter()
634 .map(|buffer| buffer.read(cx).remote_id().to_proto())
635 .collect(),
636 });
637
638 cx.spawn(|this, mut cx| async move {
639 let response = request
640 .await?
641 .transaction
642 .ok_or_else(|| anyhow!("missing transaction"))?;
643 this.update(&mut cx, |this, cx| {
644 this.deserialize_project_transaction(response, push_to_history, cx)
645 })?
646 .await
647 })
648 }
649}
650
651impl LocalBufferStore {
652 fn worktree_for_buffer(
653 &self,
654 buffer: &Entity<Buffer>,
655 cx: &App,
656 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
657 let file = buffer.read(cx).file()?;
658 let worktree_id = file.worktree_id(cx);
659 let path = file.path().clone();
660 let worktree = self
661 .worktree_store
662 .read(cx)
663 .worktree_for_id(worktree_id, cx)?;
664 Some((worktree, path))
665 }
666
667 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
668 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
669 worktree.read(cx).load_staged_file(path.as_ref(), cx)
670 } else {
671 return Task::ready(Err(anyhow!("no such worktree")));
672 }
673 }
674
675 fn load_committed_text(
676 &self,
677 buffer: &Entity<Buffer>,
678 cx: &App,
679 ) -> Task<Result<Option<String>>> {
680 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
681 worktree.read(cx).load_committed_file(path.as_ref(), cx)
682 } else {
683 Task::ready(Err(anyhow!("no such worktree")))
684 }
685 }
686
687 fn save_local_buffer(
688 &self,
689 buffer_handle: Entity<Buffer>,
690 worktree: Entity<Worktree>,
691 path: Arc<Path>,
692 mut has_changed_file: bool,
693 cx: &mut Context<BufferStore>,
694 ) -> Task<Result<()>> {
695 let buffer = buffer_handle.read(cx);
696
697 let text = buffer.as_rope().clone();
698 let line_ending = buffer.line_ending();
699 let version = buffer.version();
700 let buffer_id = buffer.remote_id();
701 if buffer
702 .file()
703 .is_some_and(|file| file.disk_state() == DiskState::New)
704 {
705 has_changed_file = true;
706 }
707
708 let save = worktree.update(cx, |worktree, cx| {
709 worktree.write_file(path.as_ref(), text, line_ending, cx)
710 });
711
712 cx.spawn(move |this, mut cx| async move {
713 let new_file = save.await?;
714 let mtime = new_file.disk_state().mtime();
715 this.update(&mut cx, |this, cx| {
716 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
717 if has_changed_file {
718 downstream_client
719 .send(proto::UpdateBufferFile {
720 project_id,
721 buffer_id: buffer_id.to_proto(),
722 file: Some(language::File::to_proto(&*new_file, cx)),
723 })
724 .log_err();
725 }
726 downstream_client
727 .send(proto::BufferSaved {
728 project_id,
729 buffer_id: buffer_id.to_proto(),
730 version: serialize_version(&version),
731 mtime: mtime.map(|time| time.into()),
732 })
733 .log_err();
734 }
735 })?;
736 buffer_handle.update(&mut cx, |buffer, cx| {
737 if has_changed_file {
738 buffer.file_updated(new_file, cx);
739 }
740 buffer.did_save(version.clone(), mtime, cx);
741 })
742 })
743 }
744
745 fn subscribe_to_worktree(
746 &mut self,
747 worktree: &Entity<Worktree>,
748 cx: &mut Context<BufferStore>,
749 ) {
750 cx.subscribe(worktree, |this, worktree, event, cx| {
751 if worktree.read(cx).is_local() {
752 match event {
753 worktree::Event::UpdatedEntries(changes) => {
754 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
755 }
756 worktree::Event::UpdatedGitRepositories(updated_repos) => {
757 Self::local_worktree_git_repos_changed(
758 this,
759 worktree.clone(),
760 updated_repos,
761 cx,
762 )
763 }
764 _ => {}
765 }
766 }
767 })
768 .detach();
769 }
770
771 fn local_worktree_entries_changed(
772 this: &mut BufferStore,
773 worktree_handle: &Entity<Worktree>,
774 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
775 cx: &mut Context<BufferStore>,
776 ) {
777 let snapshot = worktree_handle.read(cx).snapshot();
778 for (path, entry_id, _) in changes {
779 Self::local_worktree_entry_changed(
780 this,
781 *entry_id,
782 path,
783 worktree_handle,
784 &snapshot,
785 cx,
786 );
787 }
788 }
789
790 fn local_worktree_git_repos_changed(
791 this: &mut BufferStore,
792 worktree_handle: Entity<Worktree>,
793 changed_repos: &UpdatedGitRepositoriesSet,
794 cx: &mut Context<BufferStore>,
795 ) {
796 debug_assert!(worktree_handle.read(cx).is_local());
797
798 let mut diff_state_updates = Vec::new();
799 for buffer in this.opened_buffers.values() {
800 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
801 continue;
802 };
803 let Some(buffer) = buffer.upgrade() else {
804 continue;
805 };
806 let buffer = buffer.read(cx);
807 let Some(file) = File::from_dyn(buffer.file()) else {
808 continue;
809 };
810 if file.worktree != worktree_handle {
811 continue;
812 }
813 let diff_state = diff_state.read(cx);
814 if changed_repos
815 .iter()
816 .any(|(work_dir, _)| file.path.starts_with(work_dir))
817 {
818 let snapshot = buffer.text_snapshot();
819 diff_state_updates.push((
820 snapshot.clone(),
821 file.path.clone(),
822 diff_state
823 .unstaged_diff
824 .as_ref()
825 .and_then(|set| set.upgrade())
826 .is_some(),
827 diff_state
828 .uncommitted_diff
829 .as_ref()
830 .and_then(|set| set.upgrade())
831 .is_some(),
832 ))
833 }
834 }
835
836 if diff_state_updates.is_empty() {
837 return;
838 }
839
840 cx.spawn(move |this, mut cx| async move {
841 let snapshot =
842 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
843 let diff_bases_changes_by_buffer = cx
844 .background_executor()
845 .spawn(async move {
846 diff_state_updates
847 .into_iter()
848 .filter_map(
849 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
850 let local_repo = snapshot.local_repo_for_path(&path)?;
851 let relative_path = local_repo.relativize(&path).ok()?;
852 let staged_text = if needs_staged_text {
853 local_repo.repo().load_index_text(&relative_path)
854 } else {
855 None
856 };
857 let committed_text = if needs_committed_text {
858 local_repo.repo().load_committed_text(&relative_path)
859 } else {
860 None
861 };
862 let diff_bases_change =
863 match (needs_staged_text, needs_committed_text) {
864 (true, true) => Some(if staged_text == committed_text {
865 DiffBasesChange::SetBoth(committed_text)
866 } else {
867 DiffBasesChange::SetEach {
868 index: staged_text,
869 head: committed_text,
870 }
871 }),
872 (true, false) => {
873 Some(DiffBasesChange::SetIndex(staged_text))
874 }
875 (false, true) => {
876 Some(DiffBasesChange::SetHead(committed_text))
877 }
878 (false, false) => None,
879 };
880 Some((buffer_snapshot, diff_bases_change))
881 },
882 )
883 .collect::<Vec<_>>()
884 })
885 .await;
886
887 this.update(&mut cx, |this, cx| {
888 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
889 let Some(OpenBuffer::Complete { diff_state, .. }) =
890 this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
891 else {
892 continue;
893 };
894 let Some(diff_bases_change) = diff_bases_change else {
895 continue;
896 };
897
898 diff_state.update(cx, |diff_state, cx| {
899 use proto::update_diff_bases::Mode;
900
901 if let Some((client, project_id)) = this.downstream_client.as_ref() {
902 let buffer_id = buffer_snapshot.remote_id().to_proto();
903 let (staged_text, committed_text, mode) = match diff_bases_change
904 .clone()
905 {
906 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
907 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
908 DiffBasesChange::SetEach { index, head } => {
909 (index, head, Mode::IndexAndHead)
910 }
911 DiffBasesChange::SetBoth(text) => {
912 (None, text, Mode::IndexMatchesHead)
913 }
914 };
915 let message = proto::UpdateDiffBases {
916 project_id: *project_id,
917 buffer_id,
918 staged_text,
919 committed_text,
920 mode: mode as i32,
921 };
922
923 client.send(message).log_err();
924 }
925
926 let _ =
927 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
928 });
929 }
930 })
931 })
932 .detach_and_log_err(cx);
933 }
934
935 fn local_worktree_entry_changed(
936 this: &mut BufferStore,
937 entry_id: ProjectEntryId,
938 path: &Arc<Path>,
939 worktree: &Entity<worktree::Worktree>,
940 snapshot: &worktree::Snapshot,
941 cx: &mut Context<BufferStore>,
942 ) -> Option<()> {
943 let project_path = ProjectPath {
944 worktree_id: snapshot.id(),
945 path: path.clone(),
946 };
947
948 let buffer_id = {
949 let local = this.as_local_mut()?;
950 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
951 Some(&buffer_id) => buffer_id,
952 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
953 }
954 };
955
956 let buffer = if let Some(buffer) = this.get(buffer_id) {
957 Some(buffer)
958 } else {
959 this.opened_buffers.remove(&buffer_id);
960 None
961 };
962
963 let buffer = if let Some(buffer) = buffer {
964 buffer
965 } else {
966 let this = this.as_local_mut()?;
967 this.local_buffer_ids_by_path.remove(&project_path);
968 this.local_buffer_ids_by_entry_id.remove(&entry_id);
969 return None;
970 };
971
972 let events = buffer.update(cx, |buffer, cx| {
973 let local = this.as_local_mut()?;
974 let file = buffer.file()?;
975 let old_file = File::from_dyn(Some(file))?;
976 if old_file.worktree != *worktree {
977 return None;
978 }
979
980 let snapshot_entry = old_file
981 .entry_id
982 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
983 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
984
985 let new_file = if let Some(entry) = snapshot_entry {
986 File {
987 disk_state: match entry.mtime {
988 Some(mtime) => DiskState::Present { mtime },
989 None => old_file.disk_state,
990 },
991 is_local: true,
992 entry_id: Some(entry.id),
993 path: entry.path.clone(),
994 worktree: worktree.clone(),
995 is_private: entry.is_private,
996 }
997 } else {
998 File {
999 disk_state: DiskState::Deleted,
1000 is_local: true,
1001 entry_id: old_file.entry_id,
1002 path: old_file.path.clone(),
1003 worktree: worktree.clone(),
1004 is_private: old_file.is_private,
1005 }
1006 };
1007
1008 if new_file == *old_file {
1009 return None;
1010 }
1011
1012 let mut events = Vec::new();
1013 if new_file.path != old_file.path {
1014 local.local_buffer_ids_by_path.remove(&ProjectPath {
1015 path: old_file.path.clone(),
1016 worktree_id: old_file.worktree_id(cx),
1017 });
1018 local.local_buffer_ids_by_path.insert(
1019 ProjectPath {
1020 worktree_id: new_file.worktree_id(cx),
1021 path: new_file.path.clone(),
1022 },
1023 buffer_id,
1024 );
1025 events.push(BufferStoreEvent::BufferChangedFilePath {
1026 buffer: cx.entity(),
1027 old_file: buffer.file().cloned(),
1028 });
1029 }
1030
1031 if new_file.entry_id != old_file.entry_id {
1032 if let Some(entry_id) = old_file.entry_id {
1033 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1034 }
1035 if let Some(entry_id) = new_file.entry_id {
1036 local
1037 .local_buffer_ids_by_entry_id
1038 .insert(entry_id, buffer_id);
1039 }
1040 }
1041
1042 if let Some((client, project_id)) = &this.downstream_client {
1043 client
1044 .send(proto::UpdateBufferFile {
1045 project_id: *project_id,
1046 buffer_id: buffer_id.to_proto(),
1047 file: Some(new_file.to_proto(cx)),
1048 })
1049 .ok();
1050 }
1051
1052 buffer.file_updated(Arc::new(new_file), cx);
1053 Some(events)
1054 })?;
1055
1056 for event in events {
1057 cx.emit(event);
1058 }
1059
1060 None
1061 }
1062
1063 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1064 let file = File::from_dyn(buffer.read(cx).file())?;
1065
1066 let remote_id = buffer.read(cx).remote_id();
1067 if let Some(entry_id) = file.entry_id {
1068 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1069 Some(_) => {
1070 return None;
1071 }
1072 None => {
1073 self.local_buffer_ids_by_entry_id
1074 .insert(entry_id, remote_id);
1075 }
1076 }
1077 };
1078 self.local_buffer_ids_by_path.insert(
1079 ProjectPath {
1080 worktree_id: file.worktree_id(cx),
1081 path: file.path.clone(),
1082 },
1083 remote_id,
1084 );
1085
1086 Some(())
1087 }
1088
1089 fn save_buffer(
1090 &self,
1091 buffer: Entity<Buffer>,
1092 cx: &mut Context<BufferStore>,
1093 ) -> Task<Result<()>> {
1094 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1095 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1096 };
1097 let worktree = file.worktree.clone();
1098 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1099 }
1100
1101 fn save_buffer_as(
1102 &self,
1103 buffer: Entity<Buffer>,
1104 path: ProjectPath,
1105 cx: &mut Context<BufferStore>,
1106 ) -> Task<Result<()>> {
1107 let Some(worktree) = self
1108 .worktree_store
1109 .read(cx)
1110 .worktree_for_id(path.worktree_id, cx)
1111 else {
1112 return Task::ready(Err(anyhow!("no such worktree")));
1113 };
1114 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1115 }
1116
1117 fn open_buffer(
1118 &self,
1119 path: Arc<Path>,
1120 worktree: Entity<Worktree>,
1121 cx: &mut Context<BufferStore>,
1122 ) -> Task<Result<Entity<Buffer>>> {
1123 let load_buffer = worktree.update(cx, |worktree, cx| {
1124 let load_file = worktree.load_file(path.as_ref(), cx);
1125 let reservation = cx.reserve_entity();
1126 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1127 cx.spawn(move |_, mut cx| async move {
1128 let loaded = load_file.await?;
1129 let text_buffer = cx
1130 .background_executor()
1131 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1132 .await;
1133 cx.insert_entity(reservation, |_| {
1134 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1135 })
1136 })
1137 });
1138
1139 cx.spawn(move |this, mut cx| async move {
1140 let buffer = match load_buffer.await {
1141 Ok(buffer) => Ok(buffer),
1142 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1143 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1144 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1145 Buffer::build(
1146 text_buffer,
1147 Some(Arc::new(File {
1148 worktree,
1149 path,
1150 disk_state: DiskState::New,
1151 entry_id: None,
1152 is_local: true,
1153 is_private: false,
1154 })),
1155 Capability::ReadWrite,
1156 )
1157 }),
1158 Err(e) => Err(e),
1159 }?;
1160 this.update(&mut cx, |this, cx| {
1161 this.add_buffer(buffer.clone(), cx)?;
1162 let buffer_id = buffer.read(cx).remote_id();
1163 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1164 let this = this.as_local_mut().unwrap();
1165 this.local_buffer_ids_by_path.insert(
1166 ProjectPath {
1167 worktree_id: file.worktree_id(cx),
1168 path: file.path.clone(),
1169 },
1170 buffer_id,
1171 );
1172
1173 if let Some(entry_id) = file.entry_id {
1174 this.local_buffer_ids_by_entry_id
1175 .insert(entry_id, buffer_id);
1176 }
1177 }
1178
1179 anyhow::Ok(())
1180 })??;
1181
1182 Ok(buffer)
1183 })
1184 }
1185
1186 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1187 cx.spawn(|buffer_store, mut cx| async move {
1188 let buffer =
1189 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1190 buffer_store.update(&mut cx, |buffer_store, cx| {
1191 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1192 })?;
1193 Ok(buffer)
1194 })
1195 }
1196
1197 fn reload_buffers(
1198 &self,
1199 buffers: HashSet<Entity<Buffer>>,
1200 push_to_history: bool,
1201 cx: &mut Context<BufferStore>,
1202 ) -> Task<Result<ProjectTransaction>> {
1203 cx.spawn(move |_, mut cx| async move {
1204 let mut project_transaction = ProjectTransaction::default();
1205 for buffer in buffers {
1206 let transaction = buffer
1207 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1208 .await?;
1209 buffer.update(&mut cx, |buffer, cx| {
1210 if let Some(transaction) = transaction {
1211 if !push_to_history {
1212 buffer.forget_transaction(transaction.id);
1213 }
1214 project_transaction.0.insert(cx.entity(), transaction);
1215 }
1216 })?;
1217 }
1218
1219 Ok(project_transaction)
1220 })
1221 }
1222}
1223
1224impl BufferStore {
1225 pub fn init(client: &AnyProtoClient) {
1226 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1227 client.add_entity_message_handler(Self::handle_buffer_saved);
1228 client.add_entity_message_handler(Self::handle_update_buffer_file);
1229 client.add_entity_request_handler(Self::handle_save_buffer);
1230 client.add_entity_request_handler(Self::handle_blame_buffer);
1231 client.add_entity_request_handler(Self::handle_reload_buffers);
1232 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1233 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1234 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1235 client.add_entity_message_handler(Self::handle_update_diff_bases);
1236 }
1237
1238 /// Creates a buffer store, optionally retaining its buffers.
1239 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1240 Self {
1241 state: BufferStoreState::Local(LocalBufferStore {
1242 local_buffer_ids_by_path: Default::default(),
1243 local_buffer_ids_by_entry_id: Default::default(),
1244 worktree_store: worktree_store.clone(),
1245 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1246 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1247 let this = this.as_local_mut().unwrap();
1248 this.subscribe_to_worktree(worktree, cx);
1249 }
1250 }),
1251 }),
1252 downstream_client: None,
1253 opened_buffers: Default::default(),
1254 shared_buffers: Default::default(),
1255 loading_buffers: Default::default(),
1256 loading_diffs: Default::default(),
1257 worktree_store,
1258 }
1259 }
1260
1261 pub fn remote(
1262 worktree_store: Entity<WorktreeStore>,
1263 upstream_client: AnyProtoClient,
1264 remote_id: u64,
1265 _cx: &mut Context<Self>,
1266 ) -> Self {
1267 Self {
1268 state: BufferStoreState::Remote(RemoteBufferStore {
1269 shared_with_me: Default::default(),
1270 loading_remote_buffers_by_id: Default::default(),
1271 remote_buffer_listeners: Default::default(),
1272 project_id: remote_id,
1273 upstream_client,
1274 worktree_store: worktree_store.clone(),
1275 }),
1276 downstream_client: None,
1277 opened_buffers: Default::default(),
1278 loading_buffers: Default::default(),
1279 loading_diffs: Default::default(),
1280 shared_buffers: Default::default(),
1281 worktree_store,
1282 }
1283 }
1284
1285 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1286 match &mut self.state {
1287 BufferStoreState::Local(state) => Some(state),
1288 _ => None,
1289 }
1290 }
1291
1292 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1293 match &mut self.state {
1294 BufferStoreState::Remote(state) => Some(state),
1295 _ => None,
1296 }
1297 }
1298
1299 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1300 match &self.state {
1301 BufferStoreState::Remote(state) => Some(state),
1302 _ => None,
1303 }
1304 }
1305
1306 pub fn open_buffer(
1307 &mut self,
1308 project_path: ProjectPath,
1309 cx: &mut Context<Self>,
1310 ) -> Task<Result<Entity<Buffer>>> {
1311 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1312 return Task::ready(Ok(buffer));
1313 }
1314
1315 let task = match self.loading_buffers.entry(project_path.clone()) {
1316 hash_map::Entry::Occupied(e) => e.get().clone(),
1317 hash_map::Entry::Vacant(entry) => {
1318 let path = project_path.path.clone();
1319 let Some(worktree) = self
1320 .worktree_store
1321 .read(cx)
1322 .worktree_for_id(project_path.worktree_id, cx)
1323 else {
1324 return Task::ready(Err(anyhow!("no such worktree")));
1325 };
1326 let load_buffer = match &self.state {
1327 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1328 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1329 };
1330
1331 entry
1332 .insert(
1333 cx.spawn(move |this, mut cx| async move {
1334 let load_result = load_buffer.await;
1335 this.update(&mut cx, |this, _cx| {
1336 // Record the fact that the buffer is no longer loading.
1337 this.loading_buffers.remove(&project_path);
1338 })
1339 .ok();
1340 load_result.map_err(Arc::new)
1341 })
1342 .shared(),
1343 )
1344 .clone()
1345 }
1346 };
1347
1348 cx.background_executor()
1349 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1350 }
1351
1352 pub fn open_unstaged_diff(
1353 &mut self,
1354 buffer: Entity<Buffer>,
1355 cx: &mut Context<Self>,
1356 ) -> Task<Result<Entity<BufferDiff>>> {
1357 let buffer_id = buffer.read(cx).remote_id();
1358 if let Some(diff) = self.get_unstaged_diff(buffer_id, cx) {
1359 return Task::ready(Ok(diff));
1360 }
1361
1362 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1363 hash_map::Entry::Occupied(e) => e.get().clone(),
1364 hash_map::Entry::Vacant(entry) => {
1365 let staged_text = match &self.state {
1366 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1367 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1368 };
1369
1370 entry
1371 .insert(
1372 cx.spawn(move |this, cx| async move {
1373 Self::open_diff_internal(
1374 this,
1375 DiffKind::Unstaged,
1376 staged_text.await.map(DiffBasesChange::SetIndex),
1377 buffer,
1378 cx,
1379 )
1380 .await
1381 .map_err(Arc::new)
1382 })
1383 .shared(),
1384 )
1385 .clone()
1386 }
1387 };
1388
1389 cx.background_executor()
1390 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1391 }
1392
1393 pub fn open_uncommitted_diff(
1394 &mut self,
1395 buffer: Entity<Buffer>,
1396 cx: &mut Context<Self>,
1397 ) -> Task<Result<Entity<BufferDiff>>> {
1398 let buffer_id = buffer.read(cx).remote_id();
1399 if let Some(diff) = self.get_uncommitted_diff(buffer_id, cx) {
1400 return Task::ready(Ok(diff));
1401 }
1402
1403 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1404 hash_map::Entry::Occupied(e) => e.get().clone(),
1405 hash_map::Entry::Vacant(entry) => {
1406 let changes = match &self.state {
1407 BufferStoreState::Local(this) => {
1408 let committed_text = this.load_committed_text(&buffer, cx);
1409 let staged_text = this.load_staged_text(&buffer, cx);
1410 cx.background_executor().spawn(async move {
1411 let committed_text = committed_text.await?;
1412 let staged_text = staged_text.await?;
1413 let diff_bases_change = if committed_text == staged_text {
1414 DiffBasesChange::SetBoth(committed_text)
1415 } else {
1416 DiffBasesChange::SetEach {
1417 index: staged_text,
1418 head: committed_text,
1419 }
1420 };
1421 Ok(diff_bases_change)
1422 })
1423 }
1424 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1425 };
1426
1427 entry
1428 .insert(
1429 cx.spawn(move |this, cx| async move {
1430 Self::open_diff_internal(
1431 this,
1432 DiffKind::Uncommitted,
1433 changes.await,
1434 buffer,
1435 cx,
1436 )
1437 .await
1438 .map_err(Arc::new)
1439 })
1440 .shared(),
1441 )
1442 .clone()
1443 }
1444 };
1445
1446 cx.background_executor()
1447 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1448 }
1449
1450 async fn open_diff_internal(
1451 this: WeakEntity<Self>,
1452 kind: DiffKind,
1453 texts: Result<DiffBasesChange>,
1454 buffer_entity: Entity<Buffer>,
1455 mut cx: AsyncApp,
1456 ) -> Result<Entity<BufferDiff>> {
1457 let diff_bases_change = match texts {
1458 Err(e) => {
1459 this.update(&mut cx, |this, cx| {
1460 let buffer = buffer_entity.read(cx);
1461 let buffer_id = buffer.remote_id();
1462 this.loading_diffs.remove(&(buffer_id, kind));
1463 })?;
1464 return Err(e);
1465 }
1466 Ok(change) => change,
1467 };
1468
1469 this.update(&mut cx, |this, cx| {
1470 let buffer = buffer_entity.read(cx);
1471 let buffer_id = buffer.remote_id();
1472 let language = buffer.language().cloned();
1473 let language_registry = buffer.language_registry();
1474 let text_snapshot = buffer.text_snapshot();
1475 this.loading_diffs.remove(&(buffer_id, kind));
1476
1477 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1478 this.opened_buffers.get_mut(&buffer_id)
1479 {
1480 diff_state.update(cx, |diff_state, cx| {
1481 diff_state.language = language;
1482 diff_state.language_registry = language_registry;
1483
1484 let diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1485 match kind {
1486 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1487 DiffKind::Uncommitted => {
1488 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1489 diff
1490 } else {
1491 let unstaged_diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1492 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1493 unstaged_diff
1494 };
1495
1496 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
1497 diff_state.uncommitted_diff = Some(diff.downgrade())
1498 }
1499 };
1500
1501 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1502
1503 Ok(async move {
1504 rx.await.ok();
1505 Ok(diff)
1506 })
1507 })
1508 } else {
1509 Err(anyhow!("buffer was closed"))
1510 }
1511 })??
1512 .await
1513 }
1514
1515 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1516 match &self.state {
1517 BufferStoreState::Local(this) => this.create_buffer(cx),
1518 BufferStoreState::Remote(this) => this.create_buffer(cx),
1519 }
1520 }
1521
1522 pub fn save_buffer(
1523 &mut self,
1524 buffer: Entity<Buffer>,
1525 cx: &mut Context<Self>,
1526 ) -> Task<Result<()>> {
1527 match &mut self.state {
1528 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1529 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1530 }
1531 }
1532
1533 pub fn save_buffer_as(
1534 &mut self,
1535 buffer: Entity<Buffer>,
1536 path: ProjectPath,
1537 cx: &mut Context<Self>,
1538 ) -> Task<Result<()>> {
1539 let old_file = buffer.read(cx).file().cloned();
1540 let task = match &self.state {
1541 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1542 BufferStoreState::Remote(this) => {
1543 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1544 }
1545 };
1546 cx.spawn(|this, mut cx| async move {
1547 task.await?;
1548 this.update(&mut cx, |_, cx| {
1549 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1550 })
1551 })
1552 }
1553
1554 pub fn blame_buffer(
1555 &self,
1556 buffer: &Entity<Buffer>,
1557 version: Option<clock::Global>,
1558 cx: &App,
1559 ) -> Task<Result<Option<Blame>>> {
1560 let buffer = buffer.read(cx);
1561 let Some(file) = File::from_dyn(buffer.file()) else {
1562 return Task::ready(Err(anyhow!("buffer has no file")));
1563 };
1564
1565 match file.worktree.clone().read(cx) {
1566 Worktree::Local(worktree) => {
1567 let worktree = worktree.snapshot();
1568 let blame_params = maybe!({
1569 let local_repo = match worktree.local_repo_for_path(&file.path) {
1570 Some(repo_for_path) => repo_for_path,
1571 None => return Ok(None),
1572 };
1573
1574 let relative_path = local_repo
1575 .relativize(&file.path)
1576 .context("failed to relativize buffer path")?;
1577
1578 let repo = local_repo.repo().clone();
1579
1580 let content = match version {
1581 Some(version) => buffer.rope_for_version(&version).clone(),
1582 None => buffer.as_rope().clone(),
1583 };
1584
1585 anyhow::Ok(Some((repo, relative_path, content)))
1586 });
1587
1588 cx.background_executor().spawn(async move {
1589 let Some((repo, relative_path, content)) = blame_params? else {
1590 return Ok(None);
1591 };
1592 repo.blame(&relative_path, content)
1593 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1594 .map(Some)
1595 })
1596 }
1597 Worktree::Remote(worktree) => {
1598 let buffer_id = buffer.remote_id();
1599 let version = buffer.version();
1600 let project_id = worktree.project_id();
1601 let client = worktree.client();
1602 cx.spawn(|_| async move {
1603 let response = client
1604 .request(proto::BlameBuffer {
1605 project_id,
1606 buffer_id: buffer_id.into(),
1607 version: serialize_version(&version),
1608 })
1609 .await?;
1610 Ok(deserialize_blame_buffer_response(response))
1611 })
1612 }
1613 }
1614 }
1615
1616 pub fn get_permalink_to_line(
1617 &self,
1618 buffer: &Entity<Buffer>,
1619 selection: Range<u32>,
1620 cx: &App,
1621 ) -> Task<Result<url::Url>> {
1622 let buffer = buffer.read(cx);
1623 let Some(file) = File::from_dyn(buffer.file()) else {
1624 return Task::ready(Err(anyhow!("buffer has no file")));
1625 };
1626
1627 match file.worktree.read(cx) {
1628 Worktree::Local(worktree) => {
1629 let worktree_path = worktree.abs_path().clone();
1630 let Some((repo_entry, repo)) =
1631 worktree.repository_for_path(file.path()).and_then(|entry| {
1632 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1633 Some((entry, repo))
1634 })
1635 else {
1636 // If we're not in a Git repo, check whether this is a Rust source
1637 // file in the Cargo registry (presumably opened with go-to-definition
1638 // from a normal Rust file). If so, we can put together a permalink
1639 // using crate metadata.
1640 if !buffer
1641 .language()
1642 .is_some_and(|lang| lang.name() == "Rust".into())
1643 {
1644 return Task::ready(Err(anyhow!("no permalink available")));
1645 }
1646 let file_path = worktree_path.join(file.path());
1647 return cx.spawn(|cx| async move {
1648 let provider_registry =
1649 cx.update(GitHostingProviderRegistry::default_global)?;
1650 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1651 .map_err(|_| anyhow!("no permalink available"))
1652 });
1653 };
1654
1655 let path = match repo_entry.relativize(file.path()) {
1656 Ok(RepoPath(path)) => path,
1657 Err(e) => return Task::ready(Err(e)),
1658 };
1659
1660 cx.spawn(|cx| async move {
1661 const REMOTE_NAME: &str = "origin";
1662 let origin_url = repo
1663 .remote_url(REMOTE_NAME)
1664 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1665
1666 let sha = repo
1667 .head_sha()
1668 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1669
1670 let provider_registry =
1671 cx.update(GitHostingProviderRegistry::default_global)?;
1672
1673 let (provider, remote) =
1674 parse_git_remote_url(provider_registry, &origin_url)
1675 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1676
1677 let path = path
1678 .to_str()
1679 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1680
1681 Ok(provider.build_permalink(
1682 remote,
1683 BuildPermalinkParams {
1684 sha: &sha,
1685 path,
1686 selection: Some(selection),
1687 },
1688 ))
1689 })
1690 }
1691 Worktree::Remote(worktree) => {
1692 let buffer_id = buffer.remote_id();
1693 let project_id = worktree.project_id();
1694 let client = worktree.client();
1695 cx.spawn(|_| async move {
1696 let response = client
1697 .request(proto::GetPermalinkToLine {
1698 project_id,
1699 buffer_id: buffer_id.into(),
1700 selection: Some(proto::Range {
1701 start: selection.start as u64,
1702 end: selection.end as u64,
1703 }),
1704 })
1705 .await?;
1706
1707 url::Url::parse(&response.permalink).context("failed to parse permalink")
1708 })
1709 }
1710 }
1711 }
1712
1713 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1714 let buffer = buffer_entity.read(cx);
1715 let language = buffer.language().cloned();
1716 let language_registry = buffer.language_registry();
1717 let remote_id = buffer.remote_id();
1718 let is_remote = buffer.replica_id() != 0;
1719 let open_buffer = OpenBuffer::Complete {
1720 buffer: buffer_entity.downgrade(),
1721 diff_state: cx.new(|_| BufferDiffState {
1722 language,
1723 language_registry,
1724 ..Default::default()
1725 }),
1726 };
1727
1728 let handle = cx.entity().downgrade();
1729 buffer_entity.update(cx, move |_, cx| {
1730 cx.on_release(move |buffer, cx| {
1731 handle
1732 .update(cx, |_, cx| {
1733 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1734 })
1735 .ok();
1736 })
1737 .detach()
1738 });
1739
1740 match self.opened_buffers.entry(remote_id) {
1741 hash_map::Entry::Vacant(entry) => {
1742 entry.insert(open_buffer);
1743 }
1744 hash_map::Entry::Occupied(mut entry) => {
1745 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1746 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1747 } else if entry.get().upgrade().is_some() {
1748 if is_remote {
1749 return Ok(());
1750 } else {
1751 debug_panic!("buffer {} was already registered", remote_id);
1752 Err(anyhow!("buffer {} was already registered", remote_id))?;
1753 }
1754 }
1755 entry.insert(open_buffer);
1756 }
1757 }
1758
1759 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1760 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1761 Ok(())
1762 }
1763
1764 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1765 self.opened_buffers
1766 .values()
1767 .filter_map(|buffer| buffer.upgrade())
1768 }
1769
1770 pub fn loading_buffers(
1771 &self,
1772 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1773 self.loading_buffers.iter().map(|(path, task)| {
1774 let task = task.clone();
1775 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1776 })
1777 }
1778
1779 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1780 self.buffers().find_map(|buffer| {
1781 let file = File::from_dyn(buffer.read(cx).file())?;
1782 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1783 Some(buffer)
1784 } else {
1785 None
1786 }
1787 })
1788 }
1789
1790 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1791 self.opened_buffers.get(&buffer_id)?.upgrade()
1792 }
1793
1794 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1795 self.get(buffer_id)
1796 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1797 }
1798
1799 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1800 self.get(buffer_id).or_else(|| {
1801 self.as_remote()
1802 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1803 })
1804 }
1805
1806 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1807 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1808 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1809 } else {
1810 None
1811 }
1812 }
1813
1814 pub fn get_uncommitted_diff(
1815 &self,
1816 buffer_id: BufferId,
1817 cx: &App,
1818 ) -> Option<Entity<BufferDiff>> {
1819 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1820 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1821 } else {
1822 None
1823 }
1824 }
1825
1826 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1827 let buffers = self
1828 .buffers()
1829 .map(|buffer| {
1830 let buffer = buffer.read(cx);
1831 proto::BufferVersion {
1832 id: buffer.remote_id().into(),
1833 version: language::proto::serialize_version(&buffer.version),
1834 }
1835 })
1836 .collect();
1837 let incomplete_buffer_ids = self
1838 .as_remote()
1839 .map(|remote| remote.incomplete_buffer_ids())
1840 .unwrap_or_default();
1841 (buffers, incomplete_buffer_ids)
1842 }
1843
1844 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1845 for open_buffer in self.opened_buffers.values_mut() {
1846 if let Some(buffer) = open_buffer.upgrade() {
1847 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1848 }
1849 }
1850
1851 for buffer in self.buffers() {
1852 buffer.update(cx, |buffer, cx| {
1853 buffer.set_capability(Capability::ReadOnly, cx)
1854 });
1855 }
1856
1857 if let Some(remote) = self.as_remote_mut() {
1858 // Wake up all futures currently waiting on a buffer to get opened,
1859 // to give them a chance to fail now that we've disconnected.
1860 remote.remote_buffer_listeners.clear()
1861 }
1862 }
1863
1864 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1865 self.downstream_client = Some((downstream_client, remote_id));
1866 }
1867
1868 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1869 self.downstream_client.take();
1870 self.forget_shared_buffers();
1871 }
1872
1873 pub fn discard_incomplete(&mut self) {
1874 self.opened_buffers
1875 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1876 }
1877
1878 pub fn find_search_candidates(
1879 &mut self,
1880 query: &SearchQuery,
1881 mut limit: usize,
1882 fs: Arc<dyn Fs>,
1883 cx: &mut Context<Self>,
1884 ) -> Receiver<Entity<Buffer>> {
1885 let (tx, rx) = smol::channel::unbounded();
1886 let mut open_buffers = HashSet::default();
1887 let mut unnamed_buffers = Vec::new();
1888 for handle in self.buffers() {
1889 let buffer = handle.read(cx);
1890 if let Some(entry_id) = buffer.entry_id(cx) {
1891 open_buffers.insert(entry_id);
1892 } else {
1893 limit = limit.saturating_sub(1);
1894 unnamed_buffers.push(handle)
1895 };
1896 }
1897
1898 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1899 let project_paths_rx = self
1900 .worktree_store
1901 .update(cx, |worktree_store, cx| {
1902 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1903 })
1904 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1905
1906 cx.spawn(|this, mut cx| async move {
1907 for buffer in unnamed_buffers {
1908 tx.send(buffer).await.ok();
1909 }
1910
1911 let mut project_paths_rx = pin!(project_paths_rx);
1912 while let Some(project_paths) = project_paths_rx.next().await {
1913 let buffers = this.update(&mut cx, |this, cx| {
1914 project_paths
1915 .into_iter()
1916 .map(|project_path| this.open_buffer(project_path, cx))
1917 .collect::<Vec<_>>()
1918 })?;
1919 for buffer_task in buffers {
1920 if let Some(buffer) = buffer_task.await.log_err() {
1921 if tx.send(buffer).await.is_err() {
1922 return anyhow::Ok(());
1923 }
1924 }
1925 }
1926 }
1927 anyhow::Ok(())
1928 })
1929 .detach();
1930 rx
1931 }
1932
1933 pub fn recalculate_buffer_diffs(
1934 &mut self,
1935 buffers: Vec<Entity<Buffer>>,
1936 cx: &mut Context<Self>,
1937 ) -> impl Future<Output = ()> {
1938 let mut futures = Vec::new();
1939 for buffer in buffers {
1940 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1941 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1942 {
1943 let buffer = buffer.read(cx).text_snapshot();
1944 futures.push(diff_state.update(cx, |diff_state, cx| {
1945 diff_state.recalculate_diffs(buffer, cx)
1946 }));
1947 }
1948 }
1949 async move {
1950 futures::future::join_all(futures).await;
1951 }
1952 }
1953
1954 fn on_buffer_event(
1955 &mut self,
1956 buffer: Entity<Buffer>,
1957 event: &BufferEvent,
1958 cx: &mut Context<Self>,
1959 ) {
1960 match event {
1961 BufferEvent::FileHandleChanged => {
1962 if let Some(local) = self.as_local_mut() {
1963 local.buffer_changed_file(buffer, cx);
1964 }
1965 }
1966 BufferEvent::Reloaded => {
1967 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1968 return;
1969 };
1970 let buffer = buffer.read(cx);
1971 downstream_client
1972 .send(proto::BufferReloaded {
1973 project_id: *project_id,
1974 buffer_id: buffer.remote_id().to_proto(),
1975 version: serialize_version(&buffer.version()),
1976 mtime: buffer.saved_mtime().map(|t| t.into()),
1977 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1978 })
1979 .log_err();
1980 }
1981 BufferEvent::LanguageChanged => {
1982 let buffer_id = buffer.read(cx).remote_id();
1983 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1984 self.opened_buffers.get(&buffer_id)
1985 {
1986 diff_state.update(cx, |diff_state, cx| {
1987 diff_state.buffer_language_changed(buffer, cx);
1988 });
1989 }
1990 }
1991 _ => {}
1992 }
1993 }
1994
1995 pub async fn handle_update_buffer(
1996 this: Entity<Self>,
1997 envelope: TypedEnvelope<proto::UpdateBuffer>,
1998 mut cx: AsyncApp,
1999 ) -> Result<proto::Ack> {
2000 let payload = envelope.payload.clone();
2001 let buffer_id = BufferId::new(payload.buffer_id)?;
2002 let ops = payload
2003 .operations
2004 .into_iter()
2005 .map(language::proto::deserialize_operation)
2006 .collect::<Result<Vec<_>, _>>()?;
2007 this.update(&mut cx, |this, cx| {
2008 match this.opened_buffers.entry(buffer_id) {
2009 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2010 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2011 OpenBuffer::Complete { buffer, .. } => {
2012 if let Some(buffer) = buffer.upgrade() {
2013 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2014 }
2015 }
2016 },
2017 hash_map::Entry::Vacant(e) => {
2018 e.insert(OpenBuffer::Operations(ops));
2019 }
2020 }
2021 Ok(proto::Ack {})
2022 })?
2023 }
2024
2025 pub fn register_shared_lsp_handle(
2026 &mut self,
2027 peer_id: proto::PeerId,
2028 buffer_id: BufferId,
2029 handle: OpenLspBufferHandle,
2030 ) {
2031 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2032 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2033 buffer.lsp_handle = Some(handle);
2034 return;
2035 }
2036 }
2037 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2038 }
2039
2040 pub fn handle_synchronize_buffers(
2041 &mut self,
2042 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2043 cx: &mut Context<Self>,
2044 client: Arc<Client>,
2045 ) -> Result<proto::SynchronizeBuffersResponse> {
2046 let project_id = envelope.payload.project_id;
2047 let mut response = proto::SynchronizeBuffersResponse {
2048 buffers: Default::default(),
2049 };
2050 let Some(guest_id) = envelope.original_sender_id else {
2051 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2052 };
2053
2054 self.shared_buffers.entry(guest_id).or_default().clear();
2055 for buffer in envelope.payload.buffers {
2056 let buffer_id = BufferId::new(buffer.id)?;
2057 let remote_version = language::proto::deserialize_version(&buffer.version);
2058 if let Some(buffer) = self.get(buffer_id) {
2059 self.shared_buffers
2060 .entry(guest_id)
2061 .or_default()
2062 .entry(buffer_id)
2063 .or_insert_with(|| SharedBuffer {
2064 buffer: buffer.clone(),
2065 diff: None,
2066 lsp_handle: None,
2067 });
2068
2069 let buffer = buffer.read(cx);
2070 response.buffers.push(proto::BufferVersion {
2071 id: buffer_id.into(),
2072 version: language::proto::serialize_version(&buffer.version),
2073 });
2074
2075 let operations = buffer.serialize_ops(Some(remote_version), cx);
2076 let client = client.clone();
2077 if let Some(file) = buffer.file() {
2078 client
2079 .send(proto::UpdateBufferFile {
2080 project_id,
2081 buffer_id: buffer_id.into(),
2082 file: Some(file.to_proto(cx)),
2083 })
2084 .log_err();
2085 }
2086
2087 // TODO(max): do something
2088 // client
2089 // .send(proto::UpdateStagedText {
2090 // project_id,
2091 // buffer_id: buffer_id.into(),
2092 // diff_base: buffer.diff_base().map(ToString::to_string),
2093 // })
2094 // .log_err();
2095
2096 client
2097 .send(proto::BufferReloaded {
2098 project_id,
2099 buffer_id: buffer_id.into(),
2100 version: language::proto::serialize_version(buffer.saved_version()),
2101 mtime: buffer.saved_mtime().map(|time| time.into()),
2102 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2103 as i32,
2104 })
2105 .log_err();
2106
2107 cx.background_executor()
2108 .spawn(
2109 async move {
2110 let operations = operations.await;
2111 for chunk in split_operations(operations) {
2112 client
2113 .request(proto::UpdateBuffer {
2114 project_id,
2115 buffer_id: buffer_id.into(),
2116 operations: chunk,
2117 })
2118 .await?;
2119 }
2120 anyhow::Ok(())
2121 }
2122 .log_err(),
2123 )
2124 .detach();
2125 }
2126 }
2127 Ok(response)
2128 }
2129
2130 pub fn handle_create_buffer_for_peer(
2131 &mut self,
2132 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2133 replica_id: u16,
2134 capability: Capability,
2135 cx: &mut Context<Self>,
2136 ) -> Result<()> {
2137 let Some(remote) = self.as_remote_mut() else {
2138 return Err(anyhow!("buffer store is not a remote"));
2139 };
2140
2141 if let Some(buffer) =
2142 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2143 {
2144 self.add_buffer(buffer, cx)?;
2145 }
2146
2147 Ok(())
2148 }
2149
2150 pub async fn handle_update_buffer_file(
2151 this: Entity<Self>,
2152 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2153 mut cx: AsyncApp,
2154 ) -> Result<()> {
2155 let buffer_id = envelope.payload.buffer_id;
2156 let buffer_id = BufferId::new(buffer_id)?;
2157
2158 this.update(&mut cx, |this, cx| {
2159 let payload = envelope.payload.clone();
2160 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2161 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2162 let worktree = this
2163 .worktree_store
2164 .read(cx)
2165 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2166 .ok_or_else(|| anyhow!("no such worktree"))?;
2167 let file = File::from_proto(file, worktree, cx)?;
2168 let old_file = buffer.update(cx, |buffer, cx| {
2169 let old_file = buffer.file().cloned();
2170 let new_path = file.path.clone();
2171 buffer.file_updated(Arc::new(file), cx);
2172 if old_file
2173 .as_ref()
2174 .map_or(true, |old| *old.path() != new_path)
2175 {
2176 Some(old_file)
2177 } else {
2178 None
2179 }
2180 });
2181 if let Some(old_file) = old_file {
2182 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2183 }
2184 }
2185 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2186 downstream_client
2187 .send(proto::UpdateBufferFile {
2188 project_id: *project_id,
2189 buffer_id: buffer_id.into(),
2190 file: envelope.payload.file,
2191 })
2192 .log_err();
2193 }
2194 Ok(())
2195 })?
2196 }
2197
2198 pub async fn handle_save_buffer(
2199 this: Entity<Self>,
2200 envelope: TypedEnvelope<proto::SaveBuffer>,
2201 mut cx: AsyncApp,
2202 ) -> Result<proto::BufferSaved> {
2203 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2204 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2205 anyhow::Ok((
2206 this.get_existing(buffer_id)?,
2207 this.downstream_client
2208 .as_ref()
2209 .map(|(_, project_id)| *project_id)
2210 .context("project is not shared")?,
2211 ))
2212 })??;
2213 buffer
2214 .update(&mut cx, |buffer, _| {
2215 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2216 })?
2217 .await?;
2218 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2219
2220 if let Some(new_path) = envelope.payload.new_path {
2221 let new_path = ProjectPath::from_proto(new_path);
2222 this.update(&mut cx, |this, cx| {
2223 this.save_buffer_as(buffer.clone(), new_path, cx)
2224 })?
2225 .await?;
2226 } else {
2227 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2228 .await?;
2229 }
2230
2231 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2232 project_id,
2233 buffer_id: buffer_id.into(),
2234 version: serialize_version(buffer.saved_version()),
2235 mtime: buffer.saved_mtime().map(|time| time.into()),
2236 })
2237 }
2238
2239 pub async fn handle_close_buffer(
2240 this: Entity<Self>,
2241 envelope: TypedEnvelope<proto::CloseBuffer>,
2242 mut cx: AsyncApp,
2243 ) -> Result<()> {
2244 let peer_id = envelope.sender_id;
2245 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2246 this.update(&mut cx, |this, _| {
2247 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2248 if shared.remove(&buffer_id).is_some() {
2249 if shared.is_empty() {
2250 this.shared_buffers.remove(&peer_id);
2251 }
2252 return;
2253 }
2254 }
2255 debug_panic!(
2256 "peer_id {} closed buffer_id {} which was either not open or already closed",
2257 peer_id,
2258 buffer_id
2259 )
2260 })
2261 }
2262
2263 pub async fn handle_buffer_saved(
2264 this: Entity<Self>,
2265 envelope: TypedEnvelope<proto::BufferSaved>,
2266 mut cx: AsyncApp,
2267 ) -> Result<()> {
2268 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2269 let version = deserialize_version(&envelope.payload.version);
2270 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2271 this.update(&mut cx, move |this, cx| {
2272 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2273 buffer.update(cx, |buffer, cx| {
2274 buffer.did_save(version, mtime, cx);
2275 });
2276 }
2277
2278 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2279 downstream_client
2280 .send(proto::BufferSaved {
2281 project_id: *project_id,
2282 buffer_id: buffer_id.into(),
2283 mtime: envelope.payload.mtime,
2284 version: envelope.payload.version,
2285 })
2286 .log_err();
2287 }
2288 })
2289 }
2290
2291 pub async fn handle_buffer_reloaded(
2292 this: Entity<Self>,
2293 envelope: TypedEnvelope<proto::BufferReloaded>,
2294 mut cx: AsyncApp,
2295 ) -> Result<()> {
2296 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2297 let version = deserialize_version(&envelope.payload.version);
2298 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2299 let line_ending = deserialize_line_ending(
2300 proto::LineEnding::from_i32(envelope.payload.line_ending)
2301 .ok_or_else(|| anyhow!("missing line ending"))?,
2302 );
2303 this.update(&mut cx, |this, cx| {
2304 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2305 buffer.update(cx, |buffer, cx| {
2306 buffer.did_reload(version, line_ending, mtime, cx);
2307 });
2308 }
2309
2310 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2311 downstream_client
2312 .send(proto::BufferReloaded {
2313 project_id: *project_id,
2314 buffer_id: buffer_id.into(),
2315 mtime: envelope.payload.mtime,
2316 version: envelope.payload.version,
2317 line_ending: envelope.payload.line_ending,
2318 })
2319 .log_err();
2320 }
2321 })
2322 }
2323
2324 pub async fn handle_blame_buffer(
2325 this: Entity<Self>,
2326 envelope: TypedEnvelope<proto::BlameBuffer>,
2327 mut cx: AsyncApp,
2328 ) -> Result<proto::BlameBufferResponse> {
2329 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2330 let version = deserialize_version(&envelope.payload.version);
2331 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2332 buffer
2333 .update(&mut cx, |buffer, _| {
2334 buffer.wait_for_version(version.clone())
2335 })?
2336 .await?;
2337 let blame = this
2338 .update(&mut cx, |this, cx| {
2339 this.blame_buffer(&buffer, Some(version), cx)
2340 })?
2341 .await?;
2342 Ok(serialize_blame_buffer_response(blame))
2343 }
2344
2345 pub async fn handle_get_permalink_to_line(
2346 this: Entity<Self>,
2347 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2348 mut cx: AsyncApp,
2349 ) -> Result<proto::GetPermalinkToLineResponse> {
2350 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2351 // let version = deserialize_version(&envelope.payload.version);
2352 let selection = {
2353 let proto_selection = envelope
2354 .payload
2355 .selection
2356 .context("no selection to get permalink for defined")?;
2357 proto_selection.start as u32..proto_selection.end as u32
2358 };
2359 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2360 let permalink = this
2361 .update(&mut cx, |this, cx| {
2362 this.get_permalink_to_line(&buffer, selection, cx)
2363 })?
2364 .await?;
2365 Ok(proto::GetPermalinkToLineResponse {
2366 permalink: permalink.to_string(),
2367 })
2368 }
2369
2370 pub async fn handle_open_unstaged_diff(
2371 this: Entity<Self>,
2372 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2373 mut cx: AsyncApp,
2374 ) -> Result<proto::OpenUnstagedDiffResponse> {
2375 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2376 let diff = this
2377 .update(&mut cx, |this, cx| {
2378 let buffer = this.get(buffer_id)?;
2379 Some(this.open_unstaged_diff(buffer, cx))
2380 })?
2381 .ok_or_else(|| anyhow!("no such buffer"))?
2382 .await?;
2383 this.update(&mut cx, |this, _| {
2384 let shared_buffers = this
2385 .shared_buffers
2386 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2387 .or_default();
2388 debug_assert!(shared_buffers.contains_key(&buffer_id));
2389 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2390 shared.diff = Some(diff.clone());
2391 }
2392 })?;
2393 let staged_text =
2394 diff.read_with(&cx, |diff, _| diff.base_text().map(|buffer| buffer.text()))?;
2395 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2396 }
2397
2398 pub async fn handle_open_uncommitted_diff(
2399 this: Entity<Self>,
2400 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2401 mut cx: AsyncApp,
2402 ) -> Result<proto::OpenUncommittedDiffResponse> {
2403 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2404 let diff = this
2405 .update(&mut cx, |this, cx| {
2406 let buffer = this.get(buffer_id)?;
2407 Some(this.open_uncommitted_diff(buffer, cx))
2408 })?
2409 .ok_or_else(|| anyhow!("no such buffer"))?
2410 .await?;
2411 this.update(&mut cx, |this, _| {
2412 let shared_buffers = this
2413 .shared_buffers
2414 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2415 .or_default();
2416 debug_assert!(shared_buffers.contains_key(&buffer_id));
2417 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2418 shared.diff = Some(diff.clone());
2419 }
2420 })?;
2421 diff.read_with(&cx, |diff, cx| {
2422 use proto::open_uncommitted_diff_response::Mode;
2423
2424 let staged_buffer = diff
2425 .secondary_diff()
2426 .and_then(|diff| diff.read(cx).base_text());
2427
2428 let mode;
2429 let staged_text;
2430 let committed_text;
2431 if let Some(committed_buffer) = diff.base_text() {
2432 committed_text = Some(committed_buffer.text());
2433 if let Some(staged_buffer) = staged_buffer {
2434 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2435 mode = Mode::IndexMatchesHead;
2436 staged_text = None;
2437 } else {
2438 mode = Mode::IndexAndHead;
2439 staged_text = Some(staged_buffer.text());
2440 }
2441 } else {
2442 mode = Mode::IndexAndHead;
2443 staged_text = None;
2444 }
2445 } else {
2446 mode = Mode::IndexAndHead;
2447 committed_text = None;
2448 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2449 }
2450
2451 proto::OpenUncommittedDiffResponse {
2452 committed_text,
2453 staged_text,
2454 mode: mode.into(),
2455 }
2456 })
2457 }
2458
2459 pub async fn handle_update_diff_bases(
2460 this: Entity<Self>,
2461 request: TypedEnvelope<proto::UpdateDiffBases>,
2462 mut cx: AsyncApp,
2463 ) -> Result<()> {
2464 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2465 this.update(&mut cx, |this, cx| {
2466 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2467 this.opened_buffers.get_mut(&buffer_id)
2468 {
2469 if let Some(buffer) = buffer.upgrade() {
2470 let buffer = buffer.read(cx).text_snapshot();
2471 diff_state.update(cx, |diff_state, cx| {
2472 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2473 })
2474 }
2475 }
2476 })
2477 }
2478
2479 pub fn reload_buffers(
2480 &self,
2481 buffers: HashSet<Entity<Buffer>>,
2482 push_to_history: bool,
2483 cx: &mut Context<Self>,
2484 ) -> Task<Result<ProjectTransaction>> {
2485 if buffers.is_empty() {
2486 return Task::ready(Ok(ProjectTransaction::default()));
2487 }
2488 match &self.state {
2489 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2490 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2491 }
2492 }
2493
2494 async fn handle_reload_buffers(
2495 this: Entity<Self>,
2496 envelope: TypedEnvelope<proto::ReloadBuffers>,
2497 mut cx: AsyncApp,
2498 ) -> Result<proto::ReloadBuffersResponse> {
2499 let sender_id = envelope.original_sender_id().unwrap_or_default();
2500 let reload = this.update(&mut cx, |this, cx| {
2501 let mut buffers = HashSet::default();
2502 for buffer_id in &envelope.payload.buffer_ids {
2503 let buffer_id = BufferId::new(*buffer_id)?;
2504 buffers.insert(this.get_existing(buffer_id)?);
2505 }
2506 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2507 })??;
2508
2509 let project_transaction = reload.await?;
2510 let project_transaction = this.update(&mut cx, |this, cx| {
2511 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2512 })?;
2513 Ok(proto::ReloadBuffersResponse {
2514 transaction: Some(project_transaction),
2515 })
2516 }
2517
2518 pub fn create_buffer_for_peer(
2519 &mut self,
2520 buffer: &Entity<Buffer>,
2521 peer_id: proto::PeerId,
2522 cx: &mut Context<Self>,
2523 ) -> Task<Result<()>> {
2524 let buffer_id = buffer.read(cx).remote_id();
2525 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2526 if shared_buffers.contains_key(&buffer_id) {
2527 return Task::ready(Ok(()));
2528 }
2529 shared_buffers.insert(
2530 buffer_id,
2531 SharedBuffer {
2532 buffer: buffer.clone(),
2533 diff: None,
2534 lsp_handle: None,
2535 },
2536 );
2537
2538 let Some((client, project_id)) = self.downstream_client.clone() else {
2539 return Task::ready(Ok(()));
2540 };
2541
2542 cx.spawn(|this, mut cx| async move {
2543 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2544 return anyhow::Ok(());
2545 };
2546
2547 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2548 let operations = operations.await;
2549 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2550
2551 let initial_state = proto::CreateBufferForPeer {
2552 project_id,
2553 peer_id: Some(peer_id),
2554 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2555 };
2556
2557 if client.send(initial_state).log_err().is_some() {
2558 let client = client.clone();
2559 cx.background_executor()
2560 .spawn(async move {
2561 let mut chunks = split_operations(operations).peekable();
2562 while let Some(chunk) = chunks.next() {
2563 let is_last = chunks.peek().is_none();
2564 client.send(proto::CreateBufferForPeer {
2565 project_id,
2566 peer_id: Some(peer_id),
2567 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2568 proto::BufferChunk {
2569 buffer_id: buffer_id.into(),
2570 operations: chunk,
2571 is_last,
2572 },
2573 )),
2574 })?;
2575 }
2576 anyhow::Ok(())
2577 })
2578 .await
2579 .log_err();
2580 }
2581 Ok(())
2582 })
2583 }
2584
2585 pub fn forget_shared_buffers(&mut self) {
2586 self.shared_buffers.clear();
2587 }
2588
2589 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2590 self.shared_buffers.remove(peer_id);
2591 }
2592
2593 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2594 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2595 self.shared_buffers.insert(new_peer_id, buffers);
2596 }
2597 }
2598
2599 pub fn has_shared_buffers(&self) -> bool {
2600 !self.shared_buffers.is_empty()
2601 }
2602
2603 pub fn create_local_buffer(
2604 &mut self,
2605 text: &str,
2606 language: Option<Arc<Language>>,
2607 cx: &mut Context<Self>,
2608 ) -> Entity<Buffer> {
2609 let buffer = cx.new(|cx| {
2610 Buffer::local(text, cx)
2611 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2612 });
2613
2614 self.add_buffer(buffer.clone(), cx).log_err();
2615 let buffer_id = buffer.read(cx).remote_id();
2616
2617 let this = self
2618 .as_local_mut()
2619 .expect("local-only method called in a non-local context");
2620 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2621 this.local_buffer_ids_by_path.insert(
2622 ProjectPath {
2623 worktree_id: file.worktree_id(cx),
2624 path: file.path.clone(),
2625 },
2626 buffer_id,
2627 );
2628
2629 if let Some(entry_id) = file.entry_id {
2630 this.local_buffer_ids_by_entry_id
2631 .insert(entry_id, buffer_id);
2632 }
2633 }
2634 buffer
2635 }
2636
2637 pub fn deserialize_project_transaction(
2638 &mut self,
2639 message: proto::ProjectTransaction,
2640 push_to_history: bool,
2641 cx: &mut Context<Self>,
2642 ) -> Task<Result<ProjectTransaction>> {
2643 if let Some(this) = self.as_remote_mut() {
2644 this.deserialize_project_transaction(message, push_to_history, cx)
2645 } else {
2646 debug_panic!("not a remote buffer store");
2647 Task::ready(Err(anyhow!("not a remote buffer store")))
2648 }
2649 }
2650
2651 pub fn wait_for_remote_buffer(
2652 &mut self,
2653 id: BufferId,
2654 cx: &mut Context<BufferStore>,
2655 ) -> Task<Result<Entity<Buffer>>> {
2656 if let Some(this) = self.as_remote_mut() {
2657 this.wait_for_remote_buffer(id, cx)
2658 } else {
2659 debug_panic!("not a remote buffer store");
2660 Task::ready(Err(anyhow!("not a remote buffer store")))
2661 }
2662 }
2663
2664 pub fn serialize_project_transaction_for_peer(
2665 &mut self,
2666 project_transaction: ProjectTransaction,
2667 peer_id: proto::PeerId,
2668 cx: &mut Context<Self>,
2669 ) -> proto::ProjectTransaction {
2670 let mut serialized_transaction = proto::ProjectTransaction {
2671 buffer_ids: Default::default(),
2672 transactions: Default::default(),
2673 };
2674 for (buffer, transaction) in project_transaction.0 {
2675 self.create_buffer_for_peer(&buffer, peer_id, cx)
2676 .detach_and_log_err(cx);
2677 serialized_transaction
2678 .buffer_ids
2679 .push(buffer.read(cx).remote_id().into());
2680 serialized_transaction
2681 .transactions
2682 .push(language::proto::serialize_transaction(&transaction));
2683 }
2684 serialized_transaction
2685 }
2686}
2687
2688impl OpenBuffer {
2689 fn upgrade(&self) -> Option<Entity<Buffer>> {
2690 match self {
2691 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2692 OpenBuffer::Operations(_) => None,
2693 }
2694 }
2695}
2696
2697fn is_not_found_error(error: &anyhow::Error) -> bool {
2698 error
2699 .root_cause()
2700 .downcast_ref::<io::Error>()
2701 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2702}
2703
2704fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2705 let Some(blame) = blame else {
2706 return proto::BlameBufferResponse {
2707 blame_response: None,
2708 };
2709 };
2710
2711 let entries = blame
2712 .entries
2713 .into_iter()
2714 .map(|entry| proto::BlameEntry {
2715 sha: entry.sha.as_bytes().into(),
2716 start_line: entry.range.start,
2717 end_line: entry.range.end,
2718 original_line_number: entry.original_line_number,
2719 author: entry.author.clone(),
2720 author_mail: entry.author_mail.clone(),
2721 author_time: entry.author_time,
2722 author_tz: entry.author_tz.clone(),
2723 committer: entry.committer.clone(),
2724 committer_mail: entry.committer_mail.clone(),
2725 committer_time: entry.committer_time,
2726 committer_tz: entry.committer_tz.clone(),
2727 summary: entry.summary.clone(),
2728 previous: entry.previous.clone(),
2729 filename: entry.filename.clone(),
2730 })
2731 .collect::<Vec<_>>();
2732
2733 let messages = blame
2734 .messages
2735 .into_iter()
2736 .map(|(oid, message)| proto::CommitMessage {
2737 oid: oid.as_bytes().into(),
2738 message,
2739 })
2740 .collect::<Vec<_>>();
2741
2742 let permalinks = blame
2743 .permalinks
2744 .into_iter()
2745 .map(|(oid, url)| proto::CommitPermalink {
2746 oid: oid.as_bytes().into(),
2747 permalink: url.to_string(),
2748 })
2749 .collect::<Vec<_>>();
2750
2751 proto::BlameBufferResponse {
2752 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2753 entries,
2754 messages,
2755 permalinks,
2756 remote_url: blame.remote_url,
2757 }),
2758 }
2759}
2760
2761fn deserialize_blame_buffer_response(
2762 response: proto::BlameBufferResponse,
2763) -> Option<git::blame::Blame> {
2764 let response = response.blame_response?;
2765 let entries = response
2766 .entries
2767 .into_iter()
2768 .filter_map(|entry| {
2769 Some(git::blame::BlameEntry {
2770 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2771 range: entry.start_line..entry.end_line,
2772 original_line_number: entry.original_line_number,
2773 committer: entry.committer,
2774 committer_time: entry.committer_time,
2775 committer_tz: entry.committer_tz,
2776 committer_mail: entry.committer_mail,
2777 author: entry.author,
2778 author_mail: entry.author_mail,
2779 author_time: entry.author_time,
2780 author_tz: entry.author_tz,
2781 summary: entry.summary,
2782 previous: entry.previous,
2783 filename: entry.filename,
2784 })
2785 })
2786 .collect::<Vec<_>>();
2787
2788 let messages = response
2789 .messages
2790 .into_iter()
2791 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2792 .collect::<HashMap<_, _>>();
2793
2794 let permalinks = response
2795 .permalinks
2796 .into_iter()
2797 .filter_map(|permalink| {
2798 Some((
2799 git::Oid::from_bytes(&permalink.oid).ok()?,
2800 Url::from_str(&permalink.permalink).ok()?,
2801 ))
2802 })
2803 .collect::<HashMap<_, _>>();
2804
2805 Some(Blame {
2806 entries,
2807 permalinks,
2808 messages,
2809 remote_url: response.remote_url,
2810 })
2811}
2812
2813fn get_permalink_in_rust_registry_src(
2814 provider_registry: Arc<GitHostingProviderRegistry>,
2815 path: PathBuf,
2816 selection: Range<u32>,
2817) -> Result<url::Url> {
2818 #[derive(Deserialize)]
2819 struct CargoVcsGit {
2820 sha1: String,
2821 }
2822
2823 #[derive(Deserialize)]
2824 struct CargoVcsInfo {
2825 git: CargoVcsGit,
2826 path_in_vcs: String,
2827 }
2828
2829 #[derive(Deserialize)]
2830 struct CargoPackage {
2831 repository: String,
2832 }
2833
2834 #[derive(Deserialize)]
2835 struct CargoToml {
2836 package: CargoPackage,
2837 }
2838
2839 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2840 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2841 Some((dir, json))
2842 }) else {
2843 bail!("No .cargo_vcs_info.json found in parent directories")
2844 };
2845 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2846 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2847 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2848 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2849 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2850 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2851 let permalink = provider.build_permalink(
2852 remote,
2853 BuildPermalinkParams {
2854 sha: &cargo_vcs_info.git.sha1,
2855 path: &path.to_string_lossy(),
2856 selection: Some(selection),
2857 },
2858 );
2859 Ok(permalink)
2860}