1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use buffer_diff::{BufferDiff, BufferDiffEvent};
10use client::Client;
11use collections::{hash_map, HashMap, HashSet};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{
27 proto::{self, ToProto},
28 AnyProtoClient, ErrorExt as _, TypedEnvelope,
29};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::BufferId;
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum DiffKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_diffs:
58 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
59 worktree_store: Entity<WorktreeStore>,
60 opened_buffers: HashMap<BufferId, OpenBuffer>,
61 downstream_client: Option<(AnyProtoClient, u64)>,
62 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
63}
64
65#[derive(Hash, Eq, PartialEq, Clone)]
66struct SharedBuffer {
67 buffer: Entity<Buffer>,
68 diff: Option<Entity<BufferDiff>>,
69 lsp_handle: Option<OpenLspBufferHandle>,
70}
71
72#[derive(Default)]
73struct BufferDiffState {
74 unstaged_diff: Option<WeakEntity<BufferDiff>>,
75 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
76 recalculate_diff_task: Option<Task<Result<()>>>,
77 language: Option<Arc<Language>>,
78 language_registry: Option<Arc<LanguageRegistry>>,
79 diff_updated_futures: Vec<oneshot::Sender<()>>,
80
81 head_text: Option<Arc<String>>,
82 index_text: Option<Arc<String>>,
83 head_changed: bool,
84 index_changed: bool,
85 language_changed: bool,
86}
87
88#[derive(Clone, Debug)]
89enum DiffBasesChange {
90 SetIndex(Option<String>),
91 SetHead(Option<String>),
92 SetEach {
93 index: Option<String>,
94 head: Option<String>,
95 },
96 SetBoth(Option<String>),
97}
98
99impl BufferDiffState {
100 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
101 self.language = buffer.read(cx).language().cloned();
102 self.language_changed = true;
103 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
104 }
105
106 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
107 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
108 }
109
110 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
111 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
112 }
113
114 fn handle_base_texts_updated(
115 &mut self,
116 buffer: text::BufferSnapshot,
117 message: proto::UpdateDiffBases,
118 cx: &mut Context<Self>,
119 ) {
120 use proto::update_diff_bases::Mode;
121
122 let Some(mode) = Mode::from_i32(message.mode) else {
123 return;
124 };
125
126 let diff_bases_change = match mode {
127 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
128 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
129 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
130 Mode::IndexAndHead => DiffBasesChange::SetEach {
131 index: message.staged_text,
132 head: message.committed_text,
133 },
134 };
135
136 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
137 }
138
139 fn diff_bases_changed(
140 &mut self,
141 buffer: text::BufferSnapshot,
142 diff_bases_change: DiffBasesChange,
143 cx: &mut Context<Self>,
144 ) -> oneshot::Receiver<()> {
145 match diff_bases_change {
146 DiffBasesChange::SetIndex(index) => {
147 self.index_text = index.map(|mut index| {
148 text::LineEnding::normalize(&mut index);
149 Arc::new(index)
150 });
151 self.index_changed = true;
152 }
153 DiffBasesChange::SetHead(head) => {
154 self.head_text = head.map(|mut head| {
155 text::LineEnding::normalize(&mut head);
156 Arc::new(head)
157 });
158 self.head_changed = true;
159 }
160 DiffBasesChange::SetBoth(text) => {
161 let text = text.map(|mut text| {
162 text::LineEnding::normalize(&mut text);
163 Arc::new(text)
164 });
165 self.head_text = text.clone();
166 self.index_text = text;
167 self.head_changed = true;
168 self.index_changed = true;
169 }
170 DiffBasesChange::SetEach { index, head } => {
171 self.index_text = index.map(|mut index| {
172 text::LineEnding::normalize(&mut index);
173 Arc::new(index)
174 });
175 self.index_changed = true;
176 self.head_text = head.map(|mut head| {
177 text::LineEnding::normalize(&mut head);
178 Arc::new(head)
179 });
180 self.head_changed = true;
181 }
182 }
183
184 self.recalculate_diffs(buffer, cx)
185 }
186
187 fn recalculate_diffs(
188 &mut self,
189 buffer: text::BufferSnapshot,
190 cx: &mut Context<Self>,
191 ) -> oneshot::Receiver<()> {
192 log::debug!("recalculate diffs");
193 let (tx, rx) = oneshot::channel();
194 self.diff_updated_futures.push(tx);
195
196 let language = self.language.clone();
197 let language_registry = self.language_registry.clone();
198 let unstaged_diff = self.unstaged_diff();
199 let uncommitted_diff = self.uncommitted_diff();
200 let head = self.head_text.clone();
201 let index = self.index_text.clone();
202 let index_changed = self.index_changed;
203 let head_changed = self.head_changed;
204 let language_changed = self.language_changed;
205 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
206 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
207 (None, None) => true,
208 _ => false,
209 };
210 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
211 let mut unstaged_changed_range = None;
212 if let Some(unstaged_diff) = &unstaged_diff {
213 unstaged_changed_range = BufferDiff::update_diff(
214 unstaged_diff.clone(),
215 buffer.clone(),
216 index,
217 index_changed,
218 language_changed,
219 language.clone(),
220 language_registry.clone(),
221 &mut cx,
222 )
223 .await?;
224
225 unstaged_diff.update(&mut cx, |_, cx| {
226 if language_changed {
227 cx.emit(BufferDiffEvent::LanguageChanged);
228 }
229 if let Some(changed_range) = unstaged_changed_range.clone() {
230 cx.emit(BufferDiffEvent::DiffChanged {
231 changed_range: Some(changed_range),
232 })
233 }
234 })?;
235 }
236
237 if let Some(uncommitted_diff) = &uncommitted_diff {
238 let uncommitted_changed_range =
239 if let (Some(unstaged_diff), true) = (&unstaged_diff, index_matches_head) {
240 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
241 uncommitted_diff.update_diff_from(&buffer, unstaged_diff, cx)
242 })?
243 } else {
244 BufferDiff::update_diff(
245 uncommitted_diff.clone(),
246 buffer.clone(),
247 head,
248 head_changed,
249 language_changed,
250 language.clone(),
251 language_registry.clone(),
252 &mut cx,
253 )
254 .await?
255 };
256
257 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
258 if language_changed {
259 cx.emit(BufferDiffEvent::LanguageChanged);
260 }
261 let changed_range = match (unstaged_changed_range, uncommitted_changed_range) {
262 (None, None) => None,
263 (Some(unstaged_range), None) => {
264 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
265 }
266 (None, Some(uncommitted_range)) => Some(uncommitted_range),
267 (Some(unstaged_range), Some(uncommitted_range)) => {
268 let mut start = uncommitted_range.start;
269 let mut end = uncommitted_range.end;
270 if let Some(unstaged_range) =
271 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
272 {
273 start = unstaged_range.start.min(&uncommitted_range.start, &buffer);
274 end = unstaged_range.end.max(&uncommitted_range.end, &buffer);
275 }
276 Some(start..end)
277 }
278 };
279 cx.emit(BufferDiffEvent::DiffChanged { changed_range });
280 })?;
281 }
282
283 if let Some(this) = this.upgrade() {
284 this.update(&mut cx, |this, _| {
285 this.index_changed = false;
286 this.head_changed = false;
287 this.language_changed = false;
288 for tx in this.diff_updated_futures.drain(..) {
289 tx.send(()).ok();
290 }
291 })?;
292 }
293
294 Ok(())
295 }));
296
297 rx
298 }
299}
300
301enum BufferStoreState {
302 Local(LocalBufferStore),
303 Remote(RemoteBufferStore),
304}
305
306struct RemoteBufferStore {
307 shared_with_me: HashSet<Entity<Buffer>>,
308 upstream_client: AnyProtoClient,
309 project_id: u64,
310 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
311 remote_buffer_listeners:
312 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
313 worktree_store: Entity<WorktreeStore>,
314}
315
316struct LocalBufferStore {
317 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
318 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
319 worktree_store: Entity<WorktreeStore>,
320 _subscription: Subscription,
321}
322
323enum OpenBuffer {
324 Complete {
325 buffer: WeakEntity<Buffer>,
326 diff_state: Entity<BufferDiffState>,
327 },
328 Operations(Vec<Operation>),
329}
330
331pub enum BufferStoreEvent {
332 BufferAdded(Entity<Buffer>),
333 BufferDropped(BufferId),
334 BufferChangedFilePath {
335 buffer: Entity<Buffer>,
336 old_file: Option<Arc<dyn language::File>>,
337 },
338}
339
340#[derive(Default, Debug)]
341pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
342
343impl EventEmitter<BufferStoreEvent> for BufferStore {}
344
345impl RemoteBufferStore {
346 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
347 let project_id = self.project_id;
348 let client = self.upstream_client.clone();
349 cx.background_spawn(async move {
350 let response = client
351 .request(proto::OpenUnstagedDiff {
352 project_id,
353 buffer_id: buffer_id.to_proto(),
354 })
355 .await?;
356 Ok(response.staged_text)
357 })
358 }
359
360 fn open_uncommitted_diff(
361 &self,
362 buffer_id: BufferId,
363 cx: &App,
364 ) -> Task<Result<DiffBasesChange>> {
365 use proto::open_uncommitted_diff_response::Mode;
366
367 let project_id = self.project_id;
368 let client = self.upstream_client.clone();
369 cx.background_spawn(async move {
370 let response = client
371 .request(proto::OpenUncommittedDiff {
372 project_id,
373 buffer_id: buffer_id.to_proto(),
374 })
375 .await?;
376 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
377 let bases = match mode {
378 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
379 Mode::IndexAndHead => DiffBasesChange::SetEach {
380 head: response.committed_text,
381 index: response.staged_text,
382 },
383 };
384 Ok(bases)
385 })
386 }
387
388 pub fn wait_for_remote_buffer(
389 &mut self,
390 id: BufferId,
391 cx: &mut Context<BufferStore>,
392 ) -> Task<Result<Entity<Buffer>>> {
393 let (tx, rx) = oneshot::channel();
394 self.remote_buffer_listeners.entry(id).or_default().push(tx);
395
396 cx.spawn(|this, cx| async move {
397 if let Some(buffer) = this
398 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
399 .ok()
400 .flatten()
401 {
402 return Ok(buffer);
403 }
404
405 cx.background_spawn(async move { rx.await? }).await
406 })
407 }
408
409 fn save_remote_buffer(
410 &self,
411 buffer_handle: Entity<Buffer>,
412 new_path: Option<proto::ProjectPath>,
413 cx: &Context<BufferStore>,
414 ) -> Task<Result<()>> {
415 let buffer = buffer_handle.read(cx);
416 let buffer_id = buffer.remote_id().into();
417 let version = buffer.version();
418 let rpc = self.upstream_client.clone();
419 let project_id = self.project_id;
420 cx.spawn(move |_, mut cx| async move {
421 let response = rpc
422 .request(proto::SaveBuffer {
423 project_id,
424 buffer_id,
425 new_path,
426 version: serialize_version(&version),
427 })
428 .await?;
429 let version = deserialize_version(&response.version);
430 let mtime = response.mtime.map(|mtime| mtime.into());
431
432 buffer_handle.update(&mut cx, |buffer, cx| {
433 buffer.did_save(version.clone(), mtime, cx);
434 })?;
435
436 Ok(())
437 })
438 }
439
440 pub fn handle_create_buffer_for_peer(
441 &mut self,
442 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
443 replica_id: u16,
444 capability: Capability,
445 cx: &mut Context<BufferStore>,
446 ) -> Result<Option<Entity<Buffer>>> {
447 match envelope
448 .payload
449 .variant
450 .ok_or_else(|| anyhow!("missing variant"))?
451 {
452 proto::create_buffer_for_peer::Variant::State(mut state) => {
453 let buffer_id = BufferId::new(state.id)?;
454
455 let buffer_result = maybe!({
456 let mut buffer_file = None;
457 if let Some(file) = state.file.take() {
458 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
459 let worktree = self
460 .worktree_store
461 .read(cx)
462 .worktree_for_id(worktree_id, cx)
463 .ok_or_else(|| {
464 anyhow!("no worktree found for id {}", file.worktree_id)
465 })?;
466 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
467 as Arc<dyn language::File>);
468 }
469 Buffer::from_proto(replica_id, capability, state, buffer_file)
470 });
471
472 match buffer_result {
473 Ok(buffer) => {
474 let buffer = cx.new(|_| buffer);
475 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
476 }
477 Err(error) => {
478 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
479 for listener in listeners {
480 listener.send(Err(anyhow!(error.cloned()))).ok();
481 }
482 }
483 }
484 }
485 }
486 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
487 let buffer_id = BufferId::new(chunk.buffer_id)?;
488 let buffer = self
489 .loading_remote_buffers_by_id
490 .get(&buffer_id)
491 .cloned()
492 .ok_or_else(|| {
493 anyhow!(
494 "received chunk for buffer {} without initial state",
495 chunk.buffer_id
496 )
497 })?;
498
499 let result = maybe!({
500 let operations = chunk
501 .operations
502 .into_iter()
503 .map(language::proto::deserialize_operation)
504 .collect::<Result<Vec<_>>>()?;
505 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
506 anyhow::Ok(())
507 });
508
509 if let Err(error) = result {
510 self.loading_remote_buffers_by_id.remove(&buffer_id);
511 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
512 for listener in listeners {
513 listener.send(Err(error.cloned())).ok();
514 }
515 }
516 } else if chunk.is_last {
517 self.loading_remote_buffers_by_id.remove(&buffer_id);
518 if self.upstream_client.is_via_collab() {
519 // retain buffers sent by peers to avoid races.
520 self.shared_with_me.insert(buffer.clone());
521 }
522
523 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
524 for sender in senders {
525 sender.send(Ok(buffer.clone())).ok();
526 }
527 }
528 return Ok(Some(buffer));
529 }
530 }
531 }
532 return Ok(None);
533 }
534
535 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
536 self.loading_remote_buffers_by_id
537 .keys()
538 .copied()
539 .collect::<Vec<_>>()
540 }
541
542 pub fn deserialize_project_transaction(
543 &self,
544 message: proto::ProjectTransaction,
545 push_to_history: bool,
546 cx: &mut Context<BufferStore>,
547 ) -> Task<Result<ProjectTransaction>> {
548 cx.spawn(|this, mut cx| async move {
549 let mut project_transaction = ProjectTransaction::default();
550 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
551 {
552 let buffer_id = BufferId::new(buffer_id)?;
553 let buffer = this
554 .update(&mut cx, |this, cx| {
555 this.wait_for_remote_buffer(buffer_id, cx)
556 })?
557 .await?;
558 let transaction = language::proto::deserialize_transaction(transaction)?;
559 project_transaction.0.insert(buffer, transaction);
560 }
561
562 for (buffer, transaction) in &project_transaction.0 {
563 buffer
564 .update(&mut cx, |buffer, _| {
565 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
566 })?
567 .await?;
568
569 if push_to_history {
570 buffer.update(&mut cx, |buffer, _| {
571 buffer.push_transaction(transaction.clone(), Instant::now());
572 })?;
573 }
574 }
575
576 Ok(project_transaction)
577 })
578 }
579
580 fn open_buffer(
581 &self,
582 path: Arc<Path>,
583 worktree: Entity<Worktree>,
584 cx: &mut Context<BufferStore>,
585 ) -> Task<Result<Entity<Buffer>>> {
586 let worktree_id = worktree.read(cx).id().to_proto();
587 let project_id = self.project_id;
588 let client = self.upstream_client.clone();
589 cx.spawn(move |this, mut cx| async move {
590 let response = client
591 .request(proto::OpenBufferByPath {
592 project_id,
593 worktree_id,
594 path: path.to_proto(),
595 })
596 .await?;
597 let buffer_id = BufferId::new(response.buffer_id)?;
598
599 let buffer = this
600 .update(&mut cx, {
601 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
602 })?
603 .await?;
604
605 Ok(buffer)
606 })
607 }
608
609 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
610 let create = self.upstream_client.request(proto::OpenNewBuffer {
611 project_id: self.project_id,
612 });
613 cx.spawn(|this, mut cx| async move {
614 let response = create.await?;
615 let buffer_id = BufferId::new(response.buffer_id)?;
616
617 this.update(&mut cx, |this, cx| {
618 this.wait_for_remote_buffer(buffer_id, cx)
619 })?
620 .await
621 })
622 }
623
624 fn reload_buffers(
625 &self,
626 buffers: HashSet<Entity<Buffer>>,
627 push_to_history: bool,
628 cx: &mut Context<BufferStore>,
629 ) -> Task<Result<ProjectTransaction>> {
630 let request = self.upstream_client.request(proto::ReloadBuffers {
631 project_id: self.project_id,
632 buffer_ids: buffers
633 .iter()
634 .map(|buffer| buffer.read(cx).remote_id().to_proto())
635 .collect(),
636 });
637
638 cx.spawn(|this, mut cx| async move {
639 let response = request
640 .await?
641 .transaction
642 .ok_or_else(|| anyhow!("missing transaction"))?;
643 this.update(&mut cx, |this, cx| {
644 this.deserialize_project_transaction(response, push_to_history, cx)
645 })?
646 .await
647 })
648 }
649}
650
651impl LocalBufferStore {
652 fn worktree_for_buffer(
653 &self,
654 buffer: &Entity<Buffer>,
655 cx: &App,
656 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
657 let file = buffer.read(cx).file()?;
658 let worktree_id = file.worktree_id(cx);
659 let path = file.path().clone();
660 let worktree = self
661 .worktree_store
662 .read(cx)
663 .worktree_for_id(worktree_id, cx)?;
664 Some((worktree, path))
665 }
666
667 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
668 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
669 worktree.read(cx).load_staged_file(path.as_ref(), cx)
670 } else {
671 return Task::ready(Err(anyhow!("no such worktree")));
672 }
673 }
674
675 fn load_committed_text(
676 &self,
677 buffer: &Entity<Buffer>,
678 cx: &App,
679 ) -> Task<Result<Option<String>>> {
680 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
681 worktree.read(cx).load_committed_file(path.as_ref(), cx)
682 } else {
683 Task::ready(Err(anyhow!("no such worktree")))
684 }
685 }
686
687 fn save_local_buffer(
688 &self,
689 buffer_handle: Entity<Buffer>,
690 worktree: Entity<Worktree>,
691 path: Arc<Path>,
692 mut has_changed_file: bool,
693 cx: &mut Context<BufferStore>,
694 ) -> Task<Result<()>> {
695 let buffer = buffer_handle.read(cx);
696
697 let text = buffer.as_rope().clone();
698 let line_ending = buffer.line_ending();
699 let version = buffer.version();
700 let buffer_id = buffer.remote_id();
701 if buffer
702 .file()
703 .is_some_and(|file| file.disk_state() == DiskState::New)
704 {
705 has_changed_file = true;
706 }
707
708 let save = worktree.update(cx, |worktree, cx| {
709 worktree.write_file(path.as_ref(), text, line_ending, cx)
710 });
711
712 cx.spawn(move |this, mut cx| async move {
713 let new_file = save.await?;
714 let mtime = new_file.disk_state().mtime();
715 this.update(&mut cx, |this, cx| {
716 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
717 if has_changed_file {
718 downstream_client
719 .send(proto::UpdateBufferFile {
720 project_id,
721 buffer_id: buffer_id.to_proto(),
722 file: Some(language::File::to_proto(&*new_file, cx)),
723 })
724 .log_err();
725 }
726 downstream_client
727 .send(proto::BufferSaved {
728 project_id,
729 buffer_id: buffer_id.to_proto(),
730 version: serialize_version(&version),
731 mtime: mtime.map(|time| time.into()),
732 })
733 .log_err();
734 }
735 })?;
736 buffer_handle.update(&mut cx, |buffer, cx| {
737 if has_changed_file {
738 buffer.file_updated(new_file, cx);
739 }
740 buffer.did_save(version.clone(), mtime, cx);
741 })
742 })
743 }
744
745 fn subscribe_to_worktree(
746 &mut self,
747 worktree: &Entity<Worktree>,
748 cx: &mut Context<BufferStore>,
749 ) {
750 cx.subscribe(worktree, |this, worktree, event, cx| {
751 if worktree.read(cx).is_local() {
752 match event {
753 worktree::Event::UpdatedEntries(changes) => {
754 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
755 }
756 worktree::Event::UpdatedGitRepositories(updated_repos) => {
757 Self::local_worktree_git_repos_changed(
758 this,
759 worktree.clone(),
760 updated_repos,
761 cx,
762 )
763 }
764 _ => {}
765 }
766 }
767 })
768 .detach();
769 }
770
771 fn local_worktree_entries_changed(
772 this: &mut BufferStore,
773 worktree_handle: &Entity<Worktree>,
774 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
775 cx: &mut Context<BufferStore>,
776 ) {
777 let snapshot = worktree_handle.read(cx).snapshot();
778 for (path, entry_id, _) in changes {
779 Self::local_worktree_entry_changed(
780 this,
781 *entry_id,
782 path,
783 worktree_handle,
784 &snapshot,
785 cx,
786 );
787 }
788 }
789
790 fn local_worktree_git_repos_changed(
791 this: &mut BufferStore,
792 worktree_handle: Entity<Worktree>,
793 changed_repos: &UpdatedGitRepositoriesSet,
794 cx: &mut Context<BufferStore>,
795 ) {
796 debug_assert!(worktree_handle.read(cx).is_local());
797
798 let mut diff_state_updates = Vec::new();
799 for buffer in this.opened_buffers.values() {
800 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
801 continue;
802 };
803 let Some(buffer) = buffer.upgrade() else {
804 continue;
805 };
806 let buffer = buffer.read(cx);
807 let Some(file) = File::from_dyn(buffer.file()) else {
808 continue;
809 };
810 if file.worktree != worktree_handle {
811 continue;
812 }
813 let diff_state = diff_state.read(cx);
814 if changed_repos
815 .iter()
816 .any(|(work_dir, _)| file.path.starts_with(work_dir))
817 {
818 let snapshot = buffer.text_snapshot();
819 diff_state_updates.push((
820 snapshot.clone(),
821 file.path.clone(),
822 diff_state
823 .unstaged_diff
824 .as_ref()
825 .and_then(|set| set.upgrade())
826 .is_some(),
827 diff_state
828 .uncommitted_diff
829 .as_ref()
830 .and_then(|set| set.upgrade())
831 .is_some(),
832 ))
833 }
834 }
835
836 if diff_state_updates.is_empty() {
837 return;
838 }
839
840 cx.spawn(move |this, mut cx| async move {
841 let snapshot =
842 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
843 let diff_bases_changes_by_buffer = cx
844 .background_spawn(async move {
845 diff_state_updates
846 .into_iter()
847 .filter_map(
848 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
849 let local_repo = snapshot.local_repo_for_path(&path)?;
850 let relative_path = local_repo.relativize(&path).ok()?;
851 let staged_text = if needs_staged_text {
852 local_repo.repo().load_index_text(&relative_path)
853 } else {
854 None
855 };
856 let committed_text = if needs_committed_text {
857 local_repo.repo().load_committed_text(&relative_path)
858 } else {
859 None
860 };
861 let diff_bases_change =
862 match (needs_staged_text, needs_committed_text) {
863 (true, true) => Some(if staged_text == committed_text {
864 DiffBasesChange::SetBoth(committed_text)
865 } else {
866 DiffBasesChange::SetEach {
867 index: staged_text,
868 head: committed_text,
869 }
870 }),
871 (true, false) => {
872 Some(DiffBasesChange::SetIndex(staged_text))
873 }
874 (false, true) => {
875 Some(DiffBasesChange::SetHead(committed_text))
876 }
877 (false, false) => None,
878 };
879 Some((buffer_snapshot, diff_bases_change))
880 },
881 )
882 .collect::<Vec<_>>()
883 })
884 .await;
885
886 this.update(&mut cx, |this, cx| {
887 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
888 let Some(OpenBuffer::Complete { diff_state, .. }) =
889 this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
890 else {
891 continue;
892 };
893 let Some(diff_bases_change) = diff_bases_change else {
894 continue;
895 };
896
897 diff_state.update(cx, |diff_state, cx| {
898 use proto::update_diff_bases::Mode;
899
900 if let Some((client, project_id)) = this.downstream_client.as_ref() {
901 let buffer_id = buffer_snapshot.remote_id().to_proto();
902 let (staged_text, committed_text, mode) = match diff_bases_change
903 .clone()
904 {
905 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
906 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
907 DiffBasesChange::SetEach { index, head } => {
908 (index, head, Mode::IndexAndHead)
909 }
910 DiffBasesChange::SetBoth(text) => {
911 (None, text, Mode::IndexMatchesHead)
912 }
913 };
914 let message = proto::UpdateDiffBases {
915 project_id: *project_id,
916 buffer_id,
917 staged_text,
918 committed_text,
919 mode: mode as i32,
920 };
921
922 client.send(message).log_err();
923 }
924
925 let _ =
926 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
927 });
928 }
929 })
930 })
931 .detach_and_log_err(cx);
932 }
933
934 fn local_worktree_entry_changed(
935 this: &mut BufferStore,
936 entry_id: ProjectEntryId,
937 path: &Arc<Path>,
938 worktree: &Entity<worktree::Worktree>,
939 snapshot: &worktree::Snapshot,
940 cx: &mut Context<BufferStore>,
941 ) -> Option<()> {
942 let project_path = ProjectPath {
943 worktree_id: snapshot.id(),
944 path: path.clone(),
945 };
946
947 let buffer_id = {
948 let local = this.as_local_mut()?;
949 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
950 Some(&buffer_id) => buffer_id,
951 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
952 }
953 };
954
955 let buffer = if let Some(buffer) = this.get(buffer_id) {
956 Some(buffer)
957 } else {
958 this.opened_buffers.remove(&buffer_id);
959 None
960 };
961
962 let buffer = if let Some(buffer) = buffer {
963 buffer
964 } else {
965 let this = this.as_local_mut()?;
966 this.local_buffer_ids_by_path.remove(&project_path);
967 this.local_buffer_ids_by_entry_id.remove(&entry_id);
968 return None;
969 };
970
971 let events = buffer.update(cx, |buffer, cx| {
972 let local = this.as_local_mut()?;
973 let file = buffer.file()?;
974 let old_file = File::from_dyn(Some(file))?;
975 if old_file.worktree != *worktree {
976 return None;
977 }
978
979 let snapshot_entry = old_file
980 .entry_id
981 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
982 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
983
984 let new_file = if let Some(entry) = snapshot_entry {
985 File {
986 disk_state: match entry.mtime {
987 Some(mtime) => DiskState::Present { mtime },
988 None => old_file.disk_state,
989 },
990 is_local: true,
991 entry_id: Some(entry.id),
992 path: entry.path.clone(),
993 worktree: worktree.clone(),
994 is_private: entry.is_private,
995 }
996 } else {
997 File {
998 disk_state: DiskState::Deleted,
999 is_local: true,
1000 entry_id: old_file.entry_id,
1001 path: old_file.path.clone(),
1002 worktree: worktree.clone(),
1003 is_private: old_file.is_private,
1004 }
1005 };
1006
1007 if new_file == *old_file {
1008 return None;
1009 }
1010
1011 let mut events = Vec::new();
1012 if new_file.path != old_file.path {
1013 local.local_buffer_ids_by_path.remove(&ProjectPath {
1014 path: old_file.path.clone(),
1015 worktree_id: old_file.worktree_id(cx),
1016 });
1017 local.local_buffer_ids_by_path.insert(
1018 ProjectPath {
1019 worktree_id: new_file.worktree_id(cx),
1020 path: new_file.path.clone(),
1021 },
1022 buffer_id,
1023 );
1024 events.push(BufferStoreEvent::BufferChangedFilePath {
1025 buffer: cx.entity(),
1026 old_file: buffer.file().cloned(),
1027 });
1028 }
1029
1030 if new_file.entry_id != old_file.entry_id {
1031 if let Some(entry_id) = old_file.entry_id {
1032 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1033 }
1034 if let Some(entry_id) = new_file.entry_id {
1035 local
1036 .local_buffer_ids_by_entry_id
1037 .insert(entry_id, buffer_id);
1038 }
1039 }
1040
1041 if let Some((client, project_id)) = &this.downstream_client {
1042 client
1043 .send(proto::UpdateBufferFile {
1044 project_id: *project_id,
1045 buffer_id: buffer_id.to_proto(),
1046 file: Some(new_file.to_proto(cx)),
1047 })
1048 .ok();
1049 }
1050
1051 buffer.file_updated(Arc::new(new_file), cx);
1052 Some(events)
1053 })?;
1054
1055 for event in events {
1056 cx.emit(event);
1057 }
1058
1059 None
1060 }
1061
1062 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1063 let file = File::from_dyn(buffer.read(cx).file())?;
1064
1065 let remote_id = buffer.read(cx).remote_id();
1066 if let Some(entry_id) = file.entry_id {
1067 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1068 Some(_) => {
1069 return None;
1070 }
1071 None => {
1072 self.local_buffer_ids_by_entry_id
1073 .insert(entry_id, remote_id);
1074 }
1075 }
1076 };
1077 self.local_buffer_ids_by_path.insert(
1078 ProjectPath {
1079 worktree_id: file.worktree_id(cx),
1080 path: file.path.clone(),
1081 },
1082 remote_id,
1083 );
1084
1085 Some(())
1086 }
1087
1088 fn save_buffer(
1089 &self,
1090 buffer: Entity<Buffer>,
1091 cx: &mut Context<BufferStore>,
1092 ) -> Task<Result<()>> {
1093 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1094 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1095 };
1096 let worktree = file.worktree.clone();
1097 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1098 }
1099
1100 fn save_buffer_as(
1101 &self,
1102 buffer: Entity<Buffer>,
1103 path: ProjectPath,
1104 cx: &mut Context<BufferStore>,
1105 ) -> Task<Result<()>> {
1106 let Some(worktree) = self
1107 .worktree_store
1108 .read(cx)
1109 .worktree_for_id(path.worktree_id, cx)
1110 else {
1111 return Task::ready(Err(anyhow!("no such worktree")));
1112 };
1113 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1114 }
1115
1116 fn open_buffer(
1117 &self,
1118 path: Arc<Path>,
1119 worktree: Entity<Worktree>,
1120 cx: &mut Context<BufferStore>,
1121 ) -> Task<Result<Entity<Buffer>>> {
1122 let load_buffer = worktree.update(cx, |worktree, cx| {
1123 let load_file = worktree.load_file(path.as_ref(), cx);
1124 let reservation = cx.reserve_entity();
1125 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1126 cx.spawn(move |_, mut cx| async move {
1127 let loaded = load_file.await?;
1128 let text_buffer = cx
1129 .background_spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1130 .await;
1131 cx.insert_entity(reservation, |_| {
1132 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1133 })
1134 })
1135 });
1136
1137 cx.spawn(move |this, mut cx| async move {
1138 let buffer = match load_buffer.await {
1139 Ok(buffer) => Ok(buffer),
1140 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1141 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1142 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1143 Buffer::build(
1144 text_buffer,
1145 Some(Arc::new(File {
1146 worktree,
1147 path,
1148 disk_state: DiskState::New,
1149 entry_id: None,
1150 is_local: true,
1151 is_private: false,
1152 })),
1153 Capability::ReadWrite,
1154 )
1155 }),
1156 Err(e) => Err(e),
1157 }?;
1158 this.update(&mut cx, |this, cx| {
1159 this.add_buffer(buffer.clone(), cx)?;
1160 let buffer_id = buffer.read(cx).remote_id();
1161 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1162 let this = this.as_local_mut().unwrap();
1163 this.local_buffer_ids_by_path.insert(
1164 ProjectPath {
1165 worktree_id: file.worktree_id(cx),
1166 path: file.path.clone(),
1167 },
1168 buffer_id,
1169 );
1170
1171 if let Some(entry_id) = file.entry_id {
1172 this.local_buffer_ids_by_entry_id
1173 .insert(entry_id, buffer_id);
1174 }
1175 }
1176
1177 anyhow::Ok(())
1178 })??;
1179
1180 Ok(buffer)
1181 })
1182 }
1183
1184 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1185 cx.spawn(|buffer_store, mut cx| async move {
1186 let buffer =
1187 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1188 buffer_store.update(&mut cx, |buffer_store, cx| {
1189 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1190 })?;
1191 Ok(buffer)
1192 })
1193 }
1194
1195 fn reload_buffers(
1196 &self,
1197 buffers: HashSet<Entity<Buffer>>,
1198 push_to_history: bool,
1199 cx: &mut Context<BufferStore>,
1200 ) -> Task<Result<ProjectTransaction>> {
1201 cx.spawn(move |_, mut cx| async move {
1202 let mut project_transaction = ProjectTransaction::default();
1203 for buffer in buffers {
1204 let transaction = buffer
1205 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1206 .await?;
1207 buffer.update(&mut cx, |buffer, cx| {
1208 if let Some(transaction) = transaction {
1209 if !push_to_history {
1210 buffer.forget_transaction(transaction.id);
1211 }
1212 project_transaction.0.insert(cx.entity(), transaction);
1213 }
1214 })?;
1215 }
1216
1217 Ok(project_transaction)
1218 })
1219 }
1220}
1221
1222impl BufferStore {
1223 pub fn init(client: &AnyProtoClient) {
1224 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1225 client.add_entity_message_handler(Self::handle_buffer_saved);
1226 client.add_entity_message_handler(Self::handle_update_buffer_file);
1227 client.add_entity_request_handler(Self::handle_save_buffer);
1228 client.add_entity_request_handler(Self::handle_blame_buffer);
1229 client.add_entity_request_handler(Self::handle_reload_buffers);
1230 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1231 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1232 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1233 client.add_entity_message_handler(Self::handle_update_diff_bases);
1234 }
1235
1236 /// Creates a buffer store, optionally retaining its buffers.
1237 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1238 Self {
1239 state: BufferStoreState::Local(LocalBufferStore {
1240 local_buffer_ids_by_path: Default::default(),
1241 local_buffer_ids_by_entry_id: Default::default(),
1242 worktree_store: worktree_store.clone(),
1243 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1244 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1245 let this = this.as_local_mut().unwrap();
1246 this.subscribe_to_worktree(worktree, cx);
1247 }
1248 }),
1249 }),
1250 downstream_client: None,
1251 opened_buffers: Default::default(),
1252 shared_buffers: Default::default(),
1253 loading_buffers: Default::default(),
1254 loading_diffs: Default::default(),
1255 worktree_store,
1256 }
1257 }
1258
1259 pub fn remote(
1260 worktree_store: Entity<WorktreeStore>,
1261 upstream_client: AnyProtoClient,
1262 remote_id: u64,
1263 _cx: &mut Context<Self>,
1264 ) -> Self {
1265 Self {
1266 state: BufferStoreState::Remote(RemoteBufferStore {
1267 shared_with_me: Default::default(),
1268 loading_remote_buffers_by_id: Default::default(),
1269 remote_buffer_listeners: Default::default(),
1270 project_id: remote_id,
1271 upstream_client,
1272 worktree_store: worktree_store.clone(),
1273 }),
1274 downstream_client: None,
1275 opened_buffers: Default::default(),
1276 loading_buffers: Default::default(),
1277 loading_diffs: Default::default(),
1278 shared_buffers: Default::default(),
1279 worktree_store,
1280 }
1281 }
1282
1283 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1284 match &mut self.state {
1285 BufferStoreState::Local(state) => Some(state),
1286 _ => None,
1287 }
1288 }
1289
1290 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1291 match &mut self.state {
1292 BufferStoreState::Remote(state) => Some(state),
1293 _ => None,
1294 }
1295 }
1296
1297 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1298 match &self.state {
1299 BufferStoreState::Remote(state) => Some(state),
1300 _ => None,
1301 }
1302 }
1303
1304 pub fn open_buffer(
1305 &mut self,
1306 project_path: ProjectPath,
1307 cx: &mut Context<Self>,
1308 ) -> Task<Result<Entity<Buffer>>> {
1309 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1310 return Task::ready(Ok(buffer));
1311 }
1312
1313 let task = match self.loading_buffers.entry(project_path.clone()) {
1314 hash_map::Entry::Occupied(e) => e.get().clone(),
1315 hash_map::Entry::Vacant(entry) => {
1316 let path = project_path.path.clone();
1317 let Some(worktree) = self
1318 .worktree_store
1319 .read(cx)
1320 .worktree_for_id(project_path.worktree_id, cx)
1321 else {
1322 return Task::ready(Err(anyhow!("no such worktree")));
1323 };
1324 let load_buffer = match &self.state {
1325 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1326 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1327 };
1328
1329 entry
1330 .insert(
1331 cx.spawn(move |this, mut cx| async move {
1332 let load_result = load_buffer.await;
1333 this.update(&mut cx, |this, _cx| {
1334 // Record the fact that the buffer is no longer loading.
1335 this.loading_buffers.remove(&project_path);
1336 })
1337 .ok();
1338 load_result.map_err(Arc::new)
1339 })
1340 .shared(),
1341 )
1342 .clone()
1343 }
1344 };
1345
1346 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1347 }
1348
1349 pub fn open_unstaged_diff(
1350 &mut self,
1351 buffer: Entity<Buffer>,
1352 cx: &mut Context<Self>,
1353 ) -> Task<Result<Entity<BufferDiff>>> {
1354 let buffer_id = buffer.read(cx).remote_id();
1355 if let Some(diff) = self.get_unstaged_diff(buffer_id, cx) {
1356 return Task::ready(Ok(diff));
1357 }
1358
1359 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1360 hash_map::Entry::Occupied(e) => e.get().clone(),
1361 hash_map::Entry::Vacant(entry) => {
1362 let staged_text = match &self.state {
1363 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1364 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1365 };
1366
1367 entry
1368 .insert(
1369 cx.spawn(move |this, cx| async move {
1370 Self::open_diff_internal(
1371 this,
1372 DiffKind::Unstaged,
1373 staged_text.await.map(DiffBasesChange::SetIndex),
1374 buffer,
1375 cx,
1376 )
1377 .await
1378 .map_err(Arc::new)
1379 })
1380 .shared(),
1381 )
1382 .clone()
1383 }
1384 };
1385
1386 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1387 }
1388
1389 pub fn open_uncommitted_diff(
1390 &mut self,
1391 buffer: Entity<Buffer>,
1392 cx: &mut Context<Self>,
1393 ) -> Task<Result<Entity<BufferDiff>>> {
1394 let buffer_id = buffer.read(cx).remote_id();
1395 if let Some(diff) = self.get_uncommitted_diff(buffer_id, cx) {
1396 return Task::ready(Ok(diff));
1397 }
1398
1399 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1400 hash_map::Entry::Occupied(e) => e.get().clone(),
1401 hash_map::Entry::Vacant(entry) => {
1402 let changes = match &self.state {
1403 BufferStoreState::Local(this) => {
1404 let committed_text = this.load_committed_text(&buffer, cx);
1405 let staged_text = this.load_staged_text(&buffer, cx);
1406 cx.background_spawn(async move {
1407 let committed_text = committed_text.await?;
1408 let staged_text = staged_text.await?;
1409 let diff_bases_change = if committed_text == staged_text {
1410 DiffBasesChange::SetBoth(committed_text)
1411 } else {
1412 DiffBasesChange::SetEach {
1413 index: staged_text,
1414 head: committed_text,
1415 }
1416 };
1417 Ok(diff_bases_change)
1418 })
1419 }
1420 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1421 };
1422
1423 entry
1424 .insert(
1425 cx.spawn(move |this, cx| async move {
1426 Self::open_diff_internal(
1427 this,
1428 DiffKind::Uncommitted,
1429 changes.await,
1430 buffer,
1431 cx,
1432 )
1433 .await
1434 .map_err(Arc::new)
1435 })
1436 .shared(),
1437 )
1438 .clone()
1439 }
1440 };
1441
1442 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1443 }
1444
1445 async fn open_diff_internal(
1446 this: WeakEntity<Self>,
1447 kind: DiffKind,
1448 texts: Result<DiffBasesChange>,
1449 buffer_entity: Entity<Buffer>,
1450 mut cx: AsyncApp,
1451 ) -> Result<Entity<BufferDiff>> {
1452 let diff_bases_change = match texts {
1453 Err(e) => {
1454 this.update(&mut cx, |this, cx| {
1455 let buffer = buffer_entity.read(cx);
1456 let buffer_id = buffer.remote_id();
1457 this.loading_diffs.remove(&(buffer_id, kind));
1458 })?;
1459 return Err(e);
1460 }
1461 Ok(change) => change,
1462 };
1463
1464 this.update(&mut cx, |this, cx| {
1465 let buffer = buffer_entity.read(cx);
1466 let buffer_id = buffer.remote_id();
1467 let language = buffer.language().cloned();
1468 let language_registry = buffer.language_registry();
1469 let text_snapshot = buffer.text_snapshot();
1470 this.loading_diffs.remove(&(buffer_id, kind));
1471
1472 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1473 this.opened_buffers.get_mut(&buffer_id)
1474 {
1475 diff_state.update(cx, |diff_state, cx| {
1476 diff_state.language = language;
1477 diff_state.language_registry = language_registry;
1478
1479 let diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1480 match kind {
1481 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1482 DiffKind::Uncommitted => {
1483 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1484 diff
1485 } else {
1486 let unstaged_diff = cx.new(|_| BufferDiff::new(&text_snapshot));
1487 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1488 unstaged_diff
1489 };
1490
1491 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
1492 diff_state.uncommitted_diff = Some(diff.downgrade())
1493 }
1494 };
1495
1496 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1497
1498 Ok(async move {
1499 rx.await.ok();
1500 Ok(diff)
1501 })
1502 })
1503 } else {
1504 Err(anyhow!("buffer was closed"))
1505 }
1506 })??
1507 .await
1508 }
1509
1510 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1511 match &self.state {
1512 BufferStoreState::Local(this) => this.create_buffer(cx),
1513 BufferStoreState::Remote(this) => this.create_buffer(cx),
1514 }
1515 }
1516
1517 pub fn save_buffer(
1518 &mut self,
1519 buffer: Entity<Buffer>,
1520 cx: &mut Context<Self>,
1521 ) -> Task<Result<()>> {
1522 match &mut self.state {
1523 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1524 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1525 }
1526 }
1527
1528 pub fn save_buffer_as(
1529 &mut self,
1530 buffer: Entity<Buffer>,
1531 path: ProjectPath,
1532 cx: &mut Context<Self>,
1533 ) -> Task<Result<()>> {
1534 let old_file = buffer.read(cx).file().cloned();
1535 let task = match &self.state {
1536 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1537 BufferStoreState::Remote(this) => {
1538 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1539 }
1540 };
1541 cx.spawn(|this, mut cx| async move {
1542 task.await?;
1543 this.update(&mut cx, |_, cx| {
1544 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1545 })
1546 })
1547 }
1548
1549 pub fn blame_buffer(
1550 &self,
1551 buffer: &Entity<Buffer>,
1552 version: Option<clock::Global>,
1553 cx: &App,
1554 ) -> Task<Result<Option<Blame>>> {
1555 let buffer = buffer.read(cx);
1556 let Some(file) = File::from_dyn(buffer.file()) else {
1557 return Task::ready(Err(anyhow!("buffer has no file")));
1558 };
1559
1560 match file.worktree.clone().read(cx) {
1561 Worktree::Local(worktree) => {
1562 let worktree = worktree.snapshot();
1563 let blame_params = maybe!({
1564 let local_repo = match worktree.local_repo_for_path(&file.path) {
1565 Some(repo_for_path) => repo_for_path,
1566 None => return Ok(None),
1567 };
1568
1569 let relative_path = local_repo
1570 .relativize(&file.path)
1571 .context("failed to relativize buffer path")?;
1572
1573 let repo = local_repo.repo().clone();
1574
1575 let content = match version {
1576 Some(version) => buffer.rope_for_version(&version).clone(),
1577 None => buffer.as_rope().clone(),
1578 };
1579
1580 anyhow::Ok(Some((repo, relative_path, content)))
1581 });
1582
1583 cx.background_spawn(async move {
1584 let Some((repo, relative_path, content)) = blame_params? else {
1585 return Ok(None);
1586 };
1587 repo.blame(&relative_path, content)
1588 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1589 .map(Some)
1590 })
1591 }
1592 Worktree::Remote(worktree) => {
1593 let buffer_id = buffer.remote_id();
1594 let version = buffer.version();
1595 let project_id = worktree.project_id();
1596 let client = worktree.client();
1597 cx.spawn(|_| async move {
1598 let response = client
1599 .request(proto::BlameBuffer {
1600 project_id,
1601 buffer_id: buffer_id.into(),
1602 version: serialize_version(&version),
1603 })
1604 .await?;
1605 Ok(deserialize_blame_buffer_response(response))
1606 })
1607 }
1608 }
1609 }
1610
1611 pub fn get_permalink_to_line(
1612 &self,
1613 buffer: &Entity<Buffer>,
1614 selection: Range<u32>,
1615 cx: &App,
1616 ) -> Task<Result<url::Url>> {
1617 let buffer = buffer.read(cx);
1618 let Some(file) = File::from_dyn(buffer.file()) else {
1619 return Task::ready(Err(anyhow!("buffer has no file")));
1620 };
1621
1622 match file.worktree.read(cx) {
1623 Worktree::Local(worktree) => {
1624 let worktree_path = worktree.abs_path().clone();
1625 let Some((repo_entry, repo)) =
1626 worktree.repository_for_path(file.path()).and_then(|entry| {
1627 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1628 Some((entry, repo))
1629 })
1630 else {
1631 // If we're not in a Git repo, check whether this is a Rust source
1632 // file in the Cargo registry (presumably opened with go-to-definition
1633 // from a normal Rust file). If so, we can put together a permalink
1634 // using crate metadata.
1635 if !buffer
1636 .language()
1637 .is_some_and(|lang| lang.name() == "Rust".into())
1638 {
1639 return Task::ready(Err(anyhow!("no permalink available")));
1640 }
1641 let file_path = worktree_path.join(file.path());
1642 return cx.spawn(|cx| async move {
1643 let provider_registry =
1644 cx.update(GitHostingProviderRegistry::default_global)?;
1645 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1646 .map_err(|_| anyhow!("no permalink available"))
1647 });
1648 };
1649
1650 let path = match repo_entry.relativize(file.path()) {
1651 Ok(RepoPath(path)) => path,
1652 Err(e) => return Task::ready(Err(e)),
1653 };
1654
1655 cx.spawn(|cx| async move {
1656 const REMOTE_NAME: &str = "origin";
1657 let origin_url = repo
1658 .remote_url(REMOTE_NAME)
1659 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1660
1661 let sha = repo
1662 .head_sha()
1663 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1664
1665 let provider_registry =
1666 cx.update(GitHostingProviderRegistry::default_global)?;
1667
1668 let (provider, remote) =
1669 parse_git_remote_url(provider_registry, &origin_url)
1670 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1671
1672 let path = path
1673 .to_str()
1674 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1675
1676 Ok(provider.build_permalink(
1677 remote,
1678 BuildPermalinkParams {
1679 sha: &sha,
1680 path,
1681 selection: Some(selection),
1682 },
1683 ))
1684 })
1685 }
1686 Worktree::Remote(worktree) => {
1687 let buffer_id = buffer.remote_id();
1688 let project_id = worktree.project_id();
1689 let client = worktree.client();
1690 cx.spawn(|_| async move {
1691 let response = client
1692 .request(proto::GetPermalinkToLine {
1693 project_id,
1694 buffer_id: buffer_id.into(),
1695 selection: Some(proto::Range {
1696 start: selection.start as u64,
1697 end: selection.end as u64,
1698 }),
1699 })
1700 .await?;
1701
1702 url::Url::parse(&response.permalink).context("failed to parse permalink")
1703 })
1704 }
1705 }
1706 }
1707
1708 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1709 let buffer = buffer_entity.read(cx);
1710 let language = buffer.language().cloned();
1711 let language_registry = buffer.language_registry();
1712 let remote_id = buffer.remote_id();
1713 let is_remote = buffer.replica_id() != 0;
1714 let open_buffer = OpenBuffer::Complete {
1715 buffer: buffer_entity.downgrade(),
1716 diff_state: cx.new(|_| BufferDiffState {
1717 language,
1718 language_registry,
1719 ..Default::default()
1720 }),
1721 };
1722
1723 let handle = cx.entity().downgrade();
1724 buffer_entity.update(cx, move |_, cx| {
1725 cx.on_release(move |buffer, cx| {
1726 handle
1727 .update(cx, |_, cx| {
1728 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1729 })
1730 .ok();
1731 })
1732 .detach()
1733 });
1734
1735 match self.opened_buffers.entry(remote_id) {
1736 hash_map::Entry::Vacant(entry) => {
1737 entry.insert(open_buffer);
1738 }
1739 hash_map::Entry::Occupied(mut entry) => {
1740 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1741 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1742 } else if entry.get().upgrade().is_some() {
1743 if is_remote {
1744 return Ok(());
1745 } else {
1746 debug_panic!("buffer {} was already registered", remote_id);
1747 Err(anyhow!("buffer {} was already registered", remote_id))?;
1748 }
1749 }
1750 entry.insert(open_buffer);
1751 }
1752 }
1753
1754 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1755 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1756 Ok(())
1757 }
1758
1759 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1760 self.opened_buffers
1761 .values()
1762 .filter_map(|buffer| buffer.upgrade())
1763 }
1764
1765 pub fn loading_buffers(
1766 &self,
1767 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1768 self.loading_buffers.iter().map(|(path, task)| {
1769 let task = task.clone();
1770 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1771 })
1772 }
1773
1774 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1775 self.buffers().find_map(|buffer| {
1776 let file = File::from_dyn(buffer.read(cx).file())?;
1777 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1778 Some(buffer)
1779 } else {
1780 None
1781 }
1782 })
1783 }
1784
1785 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1786 self.opened_buffers.get(&buffer_id)?.upgrade()
1787 }
1788
1789 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1790 self.get(buffer_id)
1791 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1792 }
1793
1794 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1795 self.get(buffer_id).or_else(|| {
1796 self.as_remote()
1797 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1798 })
1799 }
1800
1801 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1802 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1803 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1804 } else {
1805 None
1806 }
1807 }
1808
1809 pub fn get_uncommitted_diff(
1810 &self,
1811 buffer_id: BufferId,
1812 cx: &App,
1813 ) -> Option<Entity<BufferDiff>> {
1814 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1815 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1816 } else {
1817 None
1818 }
1819 }
1820
1821 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1822 let buffers = self
1823 .buffers()
1824 .map(|buffer| {
1825 let buffer = buffer.read(cx);
1826 proto::BufferVersion {
1827 id: buffer.remote_id().into(),
1828 version: language::proto::serialize_version(&buffer.version),
1829 }
1830 })
1831 .collect();
1832 let incomplete_buffer_ids = self
1833 .as_remote()
1834 .map(|remote| remote.incomplete_buffer_ids())
1835 .unwrap_or_default();
1836 (buffers, incomplete_buffer_ids)
1837 }
1838
1839 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1840 for open_buffer in self.opened_buffers.values_mut() {
1841 if let Some(buffer) = open_buffer.upgrade() {
1842 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1843 }
1844 }
1845
1846 for buffer in self.buffers() {
1847 buffer.update(cx, |buffer, cx| {
1848 buffer.set_capability(Capability::ReadOnly, cx)
1849 });
1850 }
1851
1852 if let Some(remote) = self.as_remote_mut() {
1853 // Wake up all futures currently waiting on a buffer to get opened,
1854 // to give them a chance to fail now that we've disconnected.
1855 remote.remote_buffer_listeners.clear()
1856 }
1857 }
1858
1859 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1860 self.downstream_client = Some((downstream_client, remote_id));
1861 }
1862
1863 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1864 self.downstream_client.take();
1865 self.forget_shared_buffers();
1866 }
1867
1868 pub fn discard_incomplete(&mut self) {
1869 self.opened_buffers
1870 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1871 }
1872
1873 pub fn find_search_candidates(
1874 &mut self,
1875 query: &SearchQuery,
1876 mut limit: usize,
1877 fs: Arc<dyn Fs>,
1878 cx: &mut Context<Self>,
1879 ) -> Receiver<Entity<Buffer>> {
1880 let (tx, rx) = smol::channel::unbounded();
1881 let mut open_buffers = HashSet::default();
1882 let mut unnamed_buffers = Vec::new();
1883 for handle in self.buffers() {
1884 let buffer = handle.read(cx);
1885 if let Some(entry_id) = buffer.entry_id(cx) {
1886 open_buffers.insert(entry_id);
1887 } else {
1888 limit = limit.saturating_sub(1);
1889 unnamed_buffers.push(handle)
1890 };
1891 }
1892
1893 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1894 let project_paths_rx = self
1895 .worktree_store
1896 .update(cx, |worktree_store, cx| {
1897 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1898 })
1899 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1900
1901 cx.spawn(|this, mut cx| async move {
1902 for buffer in unnamed_buffers {
1903 tx.send(buffer).await.ok();
1904 }
1905
1906 let mut project_paths_rx = pin!(project_paths_rx);
1907 while let Some(project_paths) = project_paths_rx.next().await {
1908 let buffers = this.update(&mut cx, |this, cx| {
1909 project_paths
1910 .into_iter()
1911 .map(|project_path| this.open_buffer(project_path, cx))
1912 .collect::<Vec<_>>()
1913 })?;
1914 for buffer_task in buffers {
1915 if let Some(buffer) = buffer_task.await.log_err() {
1916 if tx.send(buffer).await.is_err() {
1917 return anyhow::Ok(());
1918 }
1919 }
1920 }
1921 }
1922 anyhow::Ok(())
1923 })
1924 .detach();
1925 rx
1926 }
1927
1928 pub fn recalculate_buffer_diffs(
1929 &mut self,
1930 buffers: Vec<Entity<Buffer>>,
1931 cx: &mut Context<Self>,
1932 ) -> impl Future<Output = ()> {
1933 let mut futures = Vec::new();
1934 for buffer in buffers {
1935 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1936 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1937 {
1938 let buffer = buffer.read(cx).text_snapshot();
1939 futures.push(diff_state.update(cx, |diff_state, cx| {
1940 diff_state.recalculate_diffs(buffer, cx)
1941 }));
1942 }
1943 }
1944 async move {
1945 futures::future::join_all(futures).await;
1946 }
1947 }
1948
1949 fn on_buffer_event(
1950 &mut self,
1951 buffer: Entity<Buffer>,
1952 event: &BufferEvent,
1953 cx: &mut Context<Self>,
1954 ) {
1955 match event {
1956 BufferEvent::FileHandleChanged => {
1957 if let Some(local) = self.as_local_mut() {
1958 local.buffer_changed_file(buffer, cx);
1959 }
1960 }
1961 BufferEvent::Reloaded => {
1962 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1963 return;
1964 };
1965 let buffer = buffer.read(cx);
1966 downstream_client
1967 .send(proto::BufferReloaded {
1968 project_id: *project_id,
1969 buffer_id: buffer.remote_id().to_proto(),
1970 version: serialize_version(&buffer.version()),
1971 mtime: buffer.saved_mtime().map(|t| t.into()),
1972 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1973 })
1974 .log_err();
1975 }
1976 BufferEvent::LanguageChanged => {
1977 let buffer_id = buffer.read(cx).remote_id();
1978 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1979 self.opened_buffers.get(&buffer_id)
1980 {
1981 diff_state.update(cx, |diff_state, cx| {
1982 diff_state.buffer_language_changed(buffer, cx);
1983 });
1984 }
1985 }
1986 _ => {}
1987 }
1988 }
1989
1990 pub async fn handle_update_buffer(
1991 this: Entity<Self>,
1992 envelope: TypedEnvelope<proto::UpdateBuffer>,
1993 mut cx: AsyncApp,
1994 ) -> Result<proto::Ack> {
1995 let payload = envelope.payload.clone();
1996 let buffer_id = BufferId::new(payload.buffer_id)?;
1997 let ops = payload
1998 .operations
1999 .into_iter()
2000 .map(language::proto::deserialize_operation)
2001 .collect::<Result<Vec<_>, _>>()?;
2002 this.update(&mut cx, |this, cx| {
2003 match this.opened_buffers.entry(buffer_id) {
2004 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2005 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2006 OpenBuffer::Complete { buffer, .. } => {
2007 if let Some(buffer) = buffer.upgrade() {
2008 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2009 }
2010 }
2011 },
2012 hash_map::Entry::Vacant(e) => {
2013 e.insert(OpenBuffer::Operations(ops));
2014 }
2015 }
2016 Ok(proto::Ack {})
2017 })?
2018 }
2019
2020 pub fn register_shared_lsp_handle(
2021 &mut self,
2022 peer_id: proto::PeerId,
2023 buffer_id: BufferId,
2024 handle: OpenLspBufferHandle,
2025 ) {
2026 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2027 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2028 buffer.lsp_handle = Some(handle);
2029 return;
2030 }
2031 }
2032 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2033 }
2034
2035 pub fn handle_synchronize_buffers(
2036 &mut self,
2037 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2038 cx: &mut Context<Self>,
2039 client: Arc<Client>,
2040 ) -> Result<proto::SynchronizeBuffersResponse> {
2041 let project_id = envelope.payload.project_id;
2042 let mut response = proto::SynchronizeBuffersResponse {
2043 buffers: Default::default(),
2044 };
2045 let Some(guest_id) = envelope.original_sender_id else {
2046 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2047 };
2048
2049 self.shared_buffers.entry(guest_id).or_default().clear();
2050 for buffer in envelope.payload.buffers {
2051 let buffer_id = BufferId::new(buffer.id)?;
2052 let remote_version = language::proto::deserialize_version(&buffer.version);
2053 if let Some(buffer) = self.get(buffer_id) {
2054 self.shared_buffers
2055 .entry(guest_id)
2056 .or_default()
2057 .entry(buffer_id)
2058 .or_insert_with(|| SharedBuffer {
2059 buffer: buffer.clone(),
2060 diff: None,
2061 lsp_handle: None,
2062 });
2063
2064 let buffer = buffer.read(cx);
2065 response.buffers.push(proto::BufferVersion {
2066 id: buffer_id.into(),
2067 version: language::proto::serialize_version(&buffer.version),
2068 });
2069
2070 let operations = buffer.serialize_ops(Some(remote_version), cx);
2071 let client = client.clone();
2072 if let Some(file) = buffer.file() {
2073 client
2074 .send(proto::UpdateBufferFile {
2075 project_id,
2076 buffer_id: buffer_id.into(),
2077 file: Some(file.to_proto(cx)),
2078 })
2079 .log_err();
2080 }
2081
2082 // TODO(max): do something
2083 // client
2084 // .send(proto::UpdateStagedText {
2085 // project_id,
2086 // buffer_id: buffer_id.into(),
2087 // diff_base: buffer.diff_base().map(ToString::to_string),
2088 // })
2089 // .log_err();
2090
2091 client
2092 .send(proto::BufferReloaded {
2093 project_id,
2094 buffer_id: buffer_id.into(),
2095 version: language::proto::serialize_version(buffer.saved_version()),
2096 mtime: buffer.saved_mtime().map(|time| time.into()),
2097 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2098 as i32,
2099 })
2100 .log_err();
2101
2102 cx.background_spawn(
2103 async move {
2104 let operations = operations.await;
2105 for chunk in split_operations(operations) {
2106 client
2107 .request(proto::UpdateBuffer {
2108 project_id,
2109 buffer_id: buffer_id.into(),
2110 operations: chunk,
2111 })
2112 .await?;
2113 }
2114 anyhow::Ok(())
2115 }
2116 .log_err(),
2117 )
2118 .detach();
2119 }
2120 }
2121 Ok(response)
2122 }
2123
2124 pub fn handle_create_buffer_for_peer(
2125 &mut self,
2126 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2127 replica_id: u16,
2128 capability: Capability,
2129 cx: &mut Context<Self>,
2130 ) -> Result<()> {
2131 let Some(remote) = self.as_remote_mut() else {
2132 return Err(anyhow!("buffer store is not a remote"));
2133 };
2134
2135 if let Some(buffer) =
2136 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2137 {
2138 self.add_buffer(buffer, cx)?;
2139 }
2140
2141 Ok(())
2142 }
2143
2144 pub async fn handle_update_buffer_file(
2145 this: Entity<Self>,
2146 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2147 mut cx: AsyncApp,
2148 ) -> Result<()> {
2149 let buffer_id = envelope.payload.buffer_id;
2150 let buffer_id = BufferId::new(buffer_id)?;
2151
2152 this.update(&mut cx, |this, cx| {
2153 let payload = envelope.payload.clone();
2154 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2155 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2156 let worktree = this
2157 .worktree_store
2158 .read(cx)
2159 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2160 .ok_or_else(|| anyhow!("no such worktree"))?;
2161 let file = File::from_proto(file, worktree, cx)?;
2162 let old_file = buffer.update(cx, |buffer, cx| {
2163 let old_file = buffer.file().cloned();
2164 let new_path = file.path.clone();
2165 buffer.file_updated(Arc::new(file), cx);
2166 if old_file
2167 .as_ref()
2168 .map_or(true, |old| *old.path() != new_path)
2169 {
2170 Some(old_file)
2171 } else {
2172 None
2173 }
2174 });
2175 if let Some(old_file) = old_file {
2176 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2177 }
2178 }
2179 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2180 downstream_client
2181 .send(proto::UpdateBufferFile {
2182 project_id: *project_id,
2183 buffer_id: buffer_id.into(),
2184 file: envelope.payload.file,
2185 })
2186 .log_err();
2187 }
2188 Ok(())
2189 })?
2190 }
2191
2192 pub async fn handle_save_buffer(
2193 this: Entity<Self>,
2194 envelope: TypedEnvelope<proto::SaveBuffer>,
2195 mut cx: AsyncApp,
2196 ) -> Result<proto::BufferSaved> {
2197 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2198 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2199 anyhow::Ok((
2200 this.get_existing(buffer_id)?,
2201 this.downstream_client
2202 .as_ref()
2203 .map(|(_, project_id)| *project_id)
2204 .context("project is not shared")?,
2205 ))
2206 })??;
2207 buffer
2208 .update(&mut cx, |buffer, _| {
2209 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2210 })?
2211 .await?;
2212 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2213
2214 if let Some(new_path) = envelope.payload.new_path {
2215 let new_path = ProjectPath::from_proto(new_path);
2216 this.update(&mut cx, |this, cx| {
2217 this.save_buffer_as(buffer.clone(), new_path, cx)
2218 })?
2219 .await?;
2220 } else {
2221 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2222 .await?;
2223 }
2224
2225 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2226 project_id,
2227 buffer_id: buffer_id.into(),
2228 version: serialize_version(buffer.saved_version()),
2229 mtime: buffer.saved_mtime().map(|time| time.into()),
2230 })
2231 }
2232
2233 pub async fn handle_close_buffer(
2234 this: Entity<Self>,
2235 envelope: TypedEnvelope<proto::CloseBuffer>,
2236 mut cx: AsyncApp,
2237 ) -> Result<()> {
2238 let peer_id = envelope.sender_id;
2239 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2240 this.update(&mut cx, |this, _| {
2241 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2242 if shared.remove(&buffer_id).is_some() {
2243 if shared.is_empty() {
2244 this.shared_buffers.remove(&peer_id);
2245 }
2246 return;
2247 }
2248 }
2249 debug_panic!(
2250 "peer_id {} closed buffer_id {} which was either not open or already closed",
2251 peer_id,
2252 buffer_id
2253 )
2254 })
2255 }
2256
2257 pub async fn handle_buffer_saved(
2258 this: Entity<Self>,
2259 envelope: TypedEnvelope<proto::BufferSaved>,
2260 mut cx: AsyncApp,
2261 ) -> Result<()> {
2262 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2263 let version = deserialize_version(&envelope.payload.version);
2264 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2265 this.update(&mut cx, move |this, cx| {
2266 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2267 buffer.update(cx, |buffer, cx| {
2268 buffer.did_save(version, mtime, cx);
2269 });
2270 }
2271
2272 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2273 downstream_client
2274 .send(proto::BufferSaved {
2275 project_id: *project_id,
2276 buffer_id: buffer_id.into(),
2277 mtime: envelope.payload.mtime,
2278 version: envelope.payload.version,
2279 })
2280 .log_err();
2281 }
2282 })
2283 }
2284
2285 pub async fn handle_buffer_reloaded(
2286 this: Entity<Self>,
2287 envelope: TypedEnvelope<proto::BufferReloaded>,
2288 mut cx: AsyncApp,
2289 ) -> Result<()> {
2290 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2291 let version = deserialize_version(&envelope.payload.version);
2292 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2293 let line_ending = deserialize_line_ending(
2294 proto::LineEnding::from_i32(envelope.payload.line_ending)
2295 .ok_or_else(|| anyhow!("missing line ending"))?,
2296 );
2297 this.update(&mut cx, |this, cx| {
2298 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2299 buffer.update(cx, |buffer, cx| {
2300 buffer.did_reload(version, line_ending, mtime, cx);
2301 });
2302 }
2303
2304 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2305 downstream_client
2306 .send(proto::BufferReloaded {
2307 project_id: *project_id,
2308 buffer_id: buffer_id.into(),
2309 mtime: envelope.payload.mtime,
2310 version: envelope.payload.version,
2311 line_ending: envelope.payload.line_ending,
2312 })
2313 .log_err();
2314 }
2315 })
2316 }
2317
2318 pub async fn handle_blame_buffer(
2319 this: Entity<Self>,
2320 envelope: TypedEnvelope<proto::BlameBuffer>,
2321 mut cx: AsyncApp,
2322 ) -> Result<proto::BlameBufferResponse> {
2323 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2324 let version = deserialize_version(&envelope.payload.version);
2325 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2326 buffer
2327 .update(&mut cx, |buffer, _| {
2328 buffer.wait_for_version(version.clone())
2329 })?
2330 .await?;
2331 let blame = this
2332 .update(&mut cx, |this, cx| {
2333 this.blame_buffer(&buffer, Some(version), cx)
2334 })?
2335 .await?;
2336 Ok(serialize_blame_buffer_response(blame))
2337 }
2338
2339 pub async fn handle_get_permalink_to_line(
2340 this: Entity<Self>,
2341 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2342 mut cx: AsyncApp,
2343 ) -> Result<proto::GetPermalinkToLineResponse> {
2344 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2345 // let version = deserialize_version(&envelope.payload.version);
2346 let selection = {
2347 let proto_selection = envelope
2348 .payload
2349 .selection
2350 .context("no selection to get permalink for defined")?;
2351 proto_selection.start as u32..proto_selection.end as u32
2352 };
2353 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2354 let permalink = this
2355 .update(&mut cx, |this, cx| {
2356 this.get_permalink_to_line(&buffer, selection, cx)
2357 })?
2358 .await?;
2359 Ok(proto::GetPermalinkToLineResponse {
2360 permalink: permalink.to_string(),
2361 })
2362 }
2363
2364 pub async fn handle_open_unstaged_diff(
2365 this: Entity<Self>,
2366 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2367 mut cx: AsyncApp,
2368 ) -> Result<proto::OpenUnstagedDiffResponse> {
2369 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2370 let diff = this
2371 .update(&mut cx, |this, cx| {
2372 let buffer = this.get(buffer_id)?;
2373 Some(this.open_unstaged_diff(buffer, cx))
2374 })?
2375 .ok_or_else(|| anyhow!("no such buffer"))?
2376 .await?;
2377 this.update(&mut cx, |this, _| {
2378 let shared_buffers = this
2379 .shared_buffers
2380 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2381 .or_default();
2382 debug_assert!(shared_buffers.contains_key(&buffer_id));
2383 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2384 shared.diff = Some(diff.clone());
2385 }
2386 })?;
2387 let staged_text =
2388 diff.read_with(&cx, |diff, _| diff.base_text().map(|buffer| buffer.text()))?;
2389 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2390 }
2391
2392 pub async fn handle_open_uncommitted_diff(
2393 this: Entity<Self>,
2394 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2395 mut cx: AsyncApp,
2396 ) -> Result<proto::OpenUncommittedDiffResponse> {
2397 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2398 let diff = this
2399 .update(&mut cx, |this, cx| {
2400 let buffer = this.get(buffer_id)?;
2401 Some(this.open_uncommitted_diff(buffer, cx))
2402 })?
2403 .ok_or_else(|| anyhow!("no such buffer"))?
2404 .await?;
2405 this.update(&mut cx, |this, _| {
2406 let shared_buffers = this
2407 .shared_buffers
2408 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2409 .or_default();
2410 debug_assert!(shared_buffers.contains_key(&buffer_id));
2411 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2412 shared.diff = Some(diff.clone());
2413 }
2414 })?;
2415 diff.read_with(&cx, |diff, cx| {
2416 use proto::open_uncommitted_diff_response::Mode;
2417
2418 let staged_buffer = diff
2419 .secondary_diff()
2420 .and_then(|diff| diff.read(cx).base_text());
2421
2422 let mode;
2423 let staged_text;
2424 let committed_text;
2425 if let Some(committed_buffer) = diff.base_text() {
2426 committed_text = Some(committed_buffer.text());
2427 if let Some(staged_buffer) = staged_buffer {
2428 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2429 mode = Mode::IndexMatchesHead;
2430 staged_text = None;
2431 } else {
2432 mode = Mode::IndexAndHead;
2433 staged_text = Some(staged_buffer.text());
2434 }
2435 } else {
2436 mode = Mode::IndexAndHead;
2437 staged_text = None;
2438 }
2439 } else {
2440 mode = Mode::IndexAndHead;
2441 committed_text = None;
2442 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2443 }
2444
2445 proto::OpenUncommittedDiffResponse {
2446 committed_text,
2447 staged_text,
2448 mode: mode.into(),
2449 }
2450 })
2451 }
2452
2453 pub async fn handle_update_diff_bases(
2454 this: Entity<Self>,
2455 request: TypedEnvelope<proto::UpdateDiffBases>,
2456 mut cx: AsyncApp,
2457 ) -> Result<()> {
2458 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2459 this.update(&mut cx, |this, cx| {
2460 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2461 this.opened_buffers.get_mut(&buffer_id)
2462 {
2463 if let Some(buffer) = buffer.upgrade() {
2464 let buffer = buffer.read(cx).text_snapshot();
2465 diff_state.update(cx, |diff_state, cx| {
2466 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2467 })
2468 }
2469 }
2470 })
2471 }
2472
2473 pub fn reload_buffers(
2474 &self,
2475 buffers: HashSet<Entity<Buffer>>,
2476 push_to_history: bool,
2477 cx: &mut Context<Self>,
2478 ) -> Task<Result<ProjectTransaction>> {
2479 if buffers.is_empty() {
2480 return Task::ready(Ok(ProjectTransaction::default()));
2481 }
2482 match &self.state {
2483 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2484 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2485 }
2486 }
2487
2488 async fn handle_reload_buffers(
2489 this: Entity<Self>,
2490 envelope: TypedEnvelope<proto::ReloadBuffers>,
2491 mut cx: AsyncApp,
2492 ) -> Result<proto::ReloadBuffersResponse> {
2493 let sender_id = envelope.original_sender_id().unwrap_or_default();
2494 let reload = this.update(&mut cx, |this, cx| {
2495 let mut buffers = HashSet::default();
2496 for buffer_id in &envelope.payload.buffer_ids {
2497 let buffer_id = BufferId::new(*buffer_id)?;
2498 buffers.insert(this.get_existing(buffer_id)?);
2499 }
2500 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2501 })??;
2502
2503 let project_transaction = reload.await?;
2504 let project_transaction = this.update(&mut cx, |this, cx| {
2505 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2506 })?;
2507 Ok(proto::ReloadBuffersResponse {
2508 transaction: Some(project_transaction),
2509 })
2510 }
2511
2512 pub fn create_buffer_for_peer(
2513 &mut self,
2514 buffer: &Entity<Buffer>,
2515 peer_id: proto::PeerId,
2516 cx: &mut Context<Self>,
2517 ) -> Task<Result<()>> {
2518 let buffer_id = buffer.read(cx).remote_id();
2519 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2520 if shared_buffers.contains_key(&buffer_id) {
2521 return Task::ready(Ok(()));
2522 }
2523 shared_buffers.insert(
2524 buffer_id,
2525 SharedBuffer {
2526 buffer: buffer.clone(),
2527 diff: None,
2528 lsp_handle: None,
2529 },
2530 );
2531
2532 let Some((client, project_id)) = self.downstream_client.clone() else {
2533 return Task::ready(Ok(()));
2534 };
2535
2536 cx.spawn(|this, mut cx| async move {
2537 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2538 return anyhow::Ok(());
2539 };
2540
2541 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2542 let operations = operations.await;
2543 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2544
2545 let initial_state = proto::CreateBufferForPeer {
2546 project_id,
2547 peer_id: Some(peer_id),
2548 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2549 };
2550
2551 if client.send(initial_state).log_err().is_some() {
2552 let client = client.clone();
2553 cx.background_spawn(async move {
2554 let mut chunks = split_operations(operations).peekable();
2555 while let Some(chunk) = chunks.next() {
2556 let is_last = chunks.peek().is_none();
2557 client.send(proto::CreateBufferForPeer {
2558 project_id,
2559 peer_id: Some(peer_id),
2560 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2561 proto::BufferChunk {
2562 buffer_id: buffer_id.into(),
2563 operations: chunk,
2564 is_last,
2565 },
2566 )),
2567 })?;
2568 }
2569 anyhow::Ok(())
2570 })
2571 .await
2572 .log_err();
2573 }
2574 Ok(())
2575 })
2576 }
2577
2578 pub fn forget_shared_buffers(&mut self) {
2579 self.shared_buffers.clear();
2580 }
2581
2582 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2583 self.shared_buffers.remove(peer_id);
2584 }
2585
2586 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2587 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2588 self.shared_buffers.insert(new_peer_id, buffers);
2589 }
2590 }
2591
2592 pub fn has_shared_buffers(&self) -> bool {
2593 !self.shared_buffers.is_empty()
2594 }
2595
2596 pub fn create_local_buffer(
2597 &mut self,
2598 text: &str,
2599 language: Option<Arc<Language>>,
2600 cx: &mut Context<Self>,
2601 ) -> Entity<Buffer> {
2602 let buffer = cx.new(|cx| {
2603 Buffer::local(text, cx)
2604 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2605 });
2606
2607 self.add_buffer(buffer.clone(), cx).log_err();
2608 let buffer_id = buffer.read(cx).remote_id();
2609
2610 let this = self
2611 .as_local_mut()
2612 .expect("local-only method called in a non-local context");
2613 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2614 this.local_buffer_ids_by_path.insert(
2615 ProjectPath {
2616 worktree_id: file.worktree_id(cx),
2617 path: file.path.clone(),
2618 },
2619 buffer_id,
2620 );
2621
2622 if let Some(entry_id) = file.entry_id {
2623 this.local_buffer_ids_by_entry_id
2624 .insert(entry_id, buffer_id);
2625 }
2626 }
2627 buffer
2628 }
2629
2630 pub fn deserialize_project_transaction(
2631 &mut self,
2632 message: proto::ProjectTransaction,
2633 push_to_history: bool,
2634 cx: &mut Context<Self>,
2635 ) -> Task<Result<ProjectTransaction>> {
2636 if let Some(this) = self.as_remote_mut() {
2637 this.deserialize_project_transaction(message, push_to_history, cx)
2638 } else {
2639 debug_panic!("not a remote buffer store");
2640 Task::ready(Err(anyhow!("not a remote buffer store")))
2641 }
2642 }
2643
2644 pub fn wait_for_remote_buffer(
2645 &mut self,
2646 id: BufferId,
2647 cx: &mut Context<BufferStore>,
2648 ) -> Task<Result<Entity<Buffer>>> {
2649 if let Some(this) = self.as_remote_mut() {
2650 this.wait_for_remote_buffer(id, cx)
2651 } else {
2652 debug_panic!("not a remote buffer store");
2653 Task::ready(Err(anyhow!("not a remote buffer store")))
2654 }
2655 }
2656
2657 pub fn serialize_project_transaction_for_peer(
2658 &mut self,
2659 project_transaction: ProjectTransaction,
2660 peer_id: proto::PeerId,
2661 cx: &mut Context<Self>,
2662 ) -> proto::ProjectTransaction {
2663 let mut serialized_transaction = proto::ProjectTransaction {
2664 buffer_ids: Default::default(),
2665 transactions: Default::default(),
2666 };
2667 for (buffer, transaction) in project_transaction.0 {
2668 self.create_buffer_for_peer(&buffer, peer_id, cx)
2669 .detach_and_log_err(cx);
2670 serialized_transaction
2671 .buffer_ids
2672 .push(buffer.read(cx).remote_id().into());
2673 serialized_transaction
2674 .transactions
2675 .push(language::proto::serialize_transaction(&transaction));
2676 }
2677 serialized_transaction
2678 }
2679}
2680
2681impl OpenBuffer {
2682 fn upgrade(&self) -> Option<Entity<Buffer>> {
2683 match self {
2684 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2685 OpenBuffer::Operations(_) => None,
2686 }
2687 }
2688}
2689
2690fn is_not_found_error(error: &anyhow::Error) -> bool {
2691 error
2692 .root_cause()
2693 .downcast_ref::<io::Error>()
2694 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2695}
2696
2697fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2698 let Some(blame) = blame else {
2699 return proto::BlameBufferResponse {
2700 blame_response: None,
2701 };
2702 };
2703
2704 let entries = blame
2705 .entries
2706 .into_iter()
2707 .map(|entry| proto::BlameEntry {
2708 sha: entry.sha.as_bytes().into(),
2709 start_line: entry.range.start,
2710 end_line: entry.range.end,
2711 original_line_number: entry.original_line_number,
2712 author: entry.author.clone(),
2713 author_mail: entry.author_mail.clone(),
2714 author_time: entry.author_time,
2715 author_tz: entry.author_tz.clone(),
2716 committer: entry.committer_name.clone(),
2717 committer_mail: entry.committer_email.clone(),
2718 committer_time: entry.committer_time,
2719 committer_tz: entry.committer_tz.clone(),
2720 summary: entry.summary.clone(),
2721 previous: entry.previous.clone(),
2722 filename: entry.filename.clone(),
2723 })
2724 .collect::<Vec<_>>();
2725
2726 let messages = blame
2727 .messages
2728 .into_iter()
2729 .map(|(oid, message)| proto::CommitMessage {
2730 oid: oid.as_bytes().into(),
2731 message,
2732 })
2733 .collect::<Vec<_>>();
2734
2735 let permalinks = blame
2736 .permalinks
2737 .into_iter()
2738 .map(|(oid, url)| proto::CommitPermalink {
2739 oid: oid.as_bytes().into(),
2740 permalink: url.to_string(),
2741 })
2742 .collect::<Vec<_>>();
2743
2744 proto::BlameBufferResponse {
2745 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2746 entries,
2747 messages,
2748 permalinks,
2749 remote_url: blame.remote_url,
2750 }),
2751 }
2752}
2753
2754fn deserialize_blame_buffer_response(
2755 response: proto::BlameBufferResponse,
2756) -> Option<git::blame::Blame> {
2757 let response = response.blame_response?;
2758 let entries = response
2759 .entries
2760 .into_iter()
2761 .filter_map(|entry| {
2762 Some(git::blame::BlameEntry {
2763 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2764 range: entry.start_line..entry.end_line,
2765 original_line_number: entry.original_line_number,
2766 committer_name: entry.committer,
2767 committer_time: entry.committer_time,
2768 committer_tz: entry.committer_tz,
2769 committer_email: entry.committer_mail,
2770 author: entry.author,
2771 author_mail: entry.author_mail,
2772 author_time: entry.author_time,
2773 author_tz: entry.author_tz,
2774 summary: entry.summary,
2775 previous: entry.previous,
2776 filename: entry.filename,
2777 })
2778 })
2779 .collect::<Vec<_>>();
2780
2781 let messages = response
2782 .messages
2783 .into_iter()
2784 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2785 .collect::<HashMap<_, _>>();
2786
2787 let permalinks = response
2788 .permalinks
2789 .into_iter()
2790 .filter_map(|permalink| {
2791 Some((
2792 git::Oid::from_bytes(&permalink.oid).ok()?,
2793 Url::from_str(&permalink.permalink).ok()?,
2794 ))
2795 })
2796 .collect::<HashMap<_, _>>();
2797
2798 Some(Blame {
2799 entries,
2800 permalinks,
2801 messages,
2802 remote_url: response.remote_url,
2803 })
2804}
2805
2806fn get_permalink_in_rust_registry_src(
2807 provider_registry: Arc<GitHostingProviderRegistry>,
2808 path: PathBuf,
2809 selection: Range<u32>,
2810) -> Result<url::Url> {
2811 #[derive(Deserialize)]
2812 struct CargoVcsGit {
2813 sha1: String,
2814 }
2815
2816 #[derive(Deserialize)]
2817 struct CargoVcsInfo {
2818 git: CargoVcsGit,
2819 path_in_vcs: String,
2820 }
2821
2822 #[derive(Deserialize)]
2823 struct CargoPackage {
2824 repository: String,
2825 }
2826
2827 #[derive(Deserialize)]
2828 struct CargoToml {
2829 package: CargoPackage,
2830 }
2831
2832 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2833 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2834 Some((dir, json))
2835 }) else {
2836 bail!("No .cargo_vcs_info.json found in parent directories")
2837 };
2838 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2839 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2840 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2841 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2842 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2843 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2844 let permalink = provider.build_permalink(
2845 remote,
2846 BuildPermalinkParams {
2847 sha: &cargo_vcs_info.git.sha1,
2848 path: &path.to_string_lossy(),
2849 selection: Some(selection),
2850 },
2851 );
2852 Ok(permalink)
2853}