1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use buffer_diff::{BufferDiff, BufferDiffEvent};
10use client::Client;
11use collections::{hash_map, HashMap, HashSet};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{
27 proto::{self, ToProto},
28 AnyProtoClient, ErrorExt as _, TypedEnvelope,
29};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::BufferId;
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum DiffKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_diffs:
58 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
59 worktree_store: Entity<WorktreeStore>,
60 opened_buffers: HashMap<BufferId, OpenBuffer>,
61 downstream_client: Option<(AnyProtoClient, u64)>,
62 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
63}
64
65#[derive(Hash, Eq, PartialEq, Clone)]
66struct SharedBuffer {
67 buffer: Entity<Buffer>,
68 diff: Option<Entity<BufferDiff>>,
69 lsp_handle: Option<OpenLspBufferHandle>,
70}
71
72#[derive(Default)]
73struct BufferDiffState {
74 unstaged_diff: Option<WeakEntity<BufferDiff>>,
75 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
76 recalculate_diff_task: Option<Task<Result<()>>>,
77 language: Option<Arc<Language>>,
78 language_registry: Option<Arc<LanguageRegistry>>,
79 diff_updated_futures: Vec<oneshot::Sender<()>>,
80
81 head_text: Option<Arc<String>>,
82 index_text: Option<Arc<String>>,
83 head_changed: bool,
84 index_changed: bool,
85 language_changed: bool,
86}
87
88#[derive(Clone, Debug)]
89enum DiffBasesChange {
90 SetIndex(Option<String>),
91 SetHead(Option<String>),
92 SetEach {
93 index: Option<String>,
94 head: Option<String>,
95 },
96 SetBoth(Option<String>),
97}
98
99impl BufferDiffState {
100 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
101 self.language = buffer.read(cx).language().cloned();
102 self.language_changed = true;
103 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
104 }
105
106 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
107 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
108 }
109
110 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
111 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
112 }
113
114 fn handle_base_texts_updated(
115 &mut self,
116 buffer: text::BufferSnapshot,
117 message: proto::UpdateDiffBases,
118 cx: &mut Context<Self>,
119 ) {
120 use proto::update_diff_bases::Mode;
121
122 let Some(mode) = Mode::from_i32(message.mode) else {
123 return;
124 };
125
126 let diff_bases_change = match mode {
127 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
128 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
129 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
130 Mode::IndexAndHead => DiffBasesChange::SetEach {
131 index: message.staged_text,
132 head: message.committed_text,
133 },
134 };
135
136 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
137 }
138
139 fn diff_bases_changed(
140 &mut self,
141 buffer: text::BufferSnapshot,
142 diff_bases_change: DiffBasesChange,
143 cx: &mut Context<Self>,
144 ) -> oneshot::Receiver<()> {
145 match diff_bases_change {
146 DiffBasesChange::SetIndex(index) => {
147 self.index_text = index.map(|mut index| {
148 text::LineEnding::normalize(&mut index);
149 Arc::new(index)
150 });
151 self.index_changed = true;
152 }
153 DiffBasesChange::SetHead(head) => {
154 self.head_text = head.map(|mut head| {
155 text::LineEnding::normalize(&mut head);
156 Arc::new(head)
157 });
158 self.head_changed = true;
159 }
160 DiffBasesChange::SetBoth(text) => {
161 let text = text.map(|mut text| {
162 text::LineEnding::normalize(&mut text);
163 Arc::new(text)
164 });
165 self.head_text = text.clone();
166 self.index_text = text;
167 self.head_changed = true;
168 self.index_changed = true;
169 }
170 DiffBasesChange::SetEach { index, head } => {
171 self.index_text = index.map(|mut index| {
172 text::LineEnding::normalize(&mut index);
173 Arc::new(index)
174 });
175 self.index_changed = true;
176 self.head_text = head.map(|mut head| {
177 text::LineEnding::normalize(&mut head);
178 Arc::new(head)
179 });
180 self.head_changed = true;
181 }
182 }
183
184 self.recalculate_diffs(buffer, cx)
185 }
186
187 fn recalculate_diffs(
188 &mut self,
189 buffer: text::BufferSnapshot,
190 cx: &mut Context<Self>,
191 ) -> oneshot::Receiver<()> {
192 log::debug!("recalculate diffs");
193 let (tx, rx) = oneshot::channel();
194 self.diff_updated_futures.push(tx);
195
196 let language = self.language.clone();
197 let language_registry = self.language_registry.clone();
198 let unstaged_diff = self.unstaged_diff();
199 let uncommitted_diff = self.uncommitted_diff();
200 let head = self.head_text.clone();
201 let index = self.index_text.clone();
202 let index_changed = self.index_changed;
203 let head_changed = self.head_changed;
204 let language_changed = self.language_changed;
205 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
206 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
207 (None, None) => true,
208 _ => false,
209 };
210 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
211 let mut unstaged_changed_range = None;
212 if let Some(unstaged_diff) = &unstaged_diff {
213 unstaged_changed_range = BufferDiff::update_diff(
214 unstaged_diff.clone(),
215 buffer.clone(),
216 index,
217 index_changed,
218 language_changed,
219 language.clone(),
220 language_registry.clone(),
221 &mut cx,
222 )
223 .await?;
224
225 unstaged_diff.update(&mut cx, |_, cx| {
226 if language_changed {
227 cx.emit(BufferDiffEvent::LanguageChanged);
228 }
229 if let Some(changed_range) = unstaged_changed_range.clone() {
230 cx.emit(BufferDiffEvent::DiffChanged {
231 changed_range: Some(changed_range),
232 })
233 }
234 })?;
235 }
236
237 if let Some(uncommitted_diff) = &uncommitted_diff {
238 let uncommitted_changed_range =
239 if let (Some(unstaged_diff), true) = (&unstaged_diff, index_matches_head) {
240 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
241 uncommitted_diff.update_diff_from(&buffer, unstaged_diff, cx)
242 })?
243 } else {
244 BufferDiff::update_diff(
245 uncommitted_diff.clone(),
246 buffer.clone(),
247 head,
248 head_changed,
249 language_changed,
250 language.clone(),
251 language_registry.clone(),
252 &mut cx,
253 )
254 .await?
255 };
256
257 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
258 if language_changed {
259 cx.emit(BufferDiffEvent::LanguageChanged);
260 }
261 let changed_range = match (unstaged_changed_range, uncommitted_changed_range) {
262 (None, None) => None,
263 (Some(unstaged_range), None) => {
264 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
265 }
266 (None, Some(uncommitted_range)) => Some(uncommitted_range),
267 (Some(unstaged_range), Some(uncommitted_range)) => {
268 let mut start = uncommitted_range.start;
269 let mut end = uncommitted_range.end;
270 if let Some(unstaged_range) =
271 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
272 {
273 start = unstaged_range.start.min(&uncommitted_range.start, &buffer);
274 end = unstaged_range.end.max(&uncommitted_range.end, &buffer);
275 }
276 Some(start..end)
277 }
278 };
279 cx.emit(BufferDiffEvent::DiffChanged { changed_range });
280 })?;
281 }
282
283 if let Some(this) = this.upgrade() {
284 this.update(&mut cx, |this, _| {
285 this.index_changed = false;
286 this.head_changed = false;
287 this.language_changed = false;
288 for tx in this.diff_updated_futures.drain(..) {
289 tx.send(()).ok();
290 }
291 })?;
292 }
293
294 Ok(())
295 }));
296
297 rx
298 }
299}
300
301enum BufferStoreState {
302 Local(LocalBufferStore),
303 Remote(RemoteBufferStore),
304}
305
306struct RemoteBufferStore {
307 shared_with_me: HashSet<Entity<Buffer>>,
308 upstream_client: AnyProtoClient,
309 project_id: u64,
310 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
311 remote_buffer_listeners:
312 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
313 worktree_store: Entity<WorktreeStore>,
314}
315
316struct LocalBufferStore {
317 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
318 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
319 worktree_store: Entity<WorktreeStore>,
320 _subscription: Subscription,
321}
322
323enum OpenBuffer {
324 Complete {
325 buffer: WeakEntity<Buffer>,
326 diff_state: Entity<BufferDiffState>,
327 },
328 Operations(Vec<Operation>),
329}
330
331pub enum BufferStoreEvent {
332 BufferAdded(Entity<Buffer>),
333 BufferDropped(BufferId),
334 BufferChangedFilePath {
335 buffer: Entity<Buffer>,
336 old_file: Option<Arc<dyn language::File>>,
337 },
338}
339
340#[derive(Default, Debug)]
341pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
342
343impl EventEmitter<BufferStoreEvent> for BufferStore {}
344
345impl RemoteBufferStore {
346 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
347 let project_id = self.project_id;
348 let client = self.upstream_client.clone();
349 cx.background_spawn(async move {
350 let response = client
351 .request(proto::OpenUnstagedDiff {
352 project_id,
353 buffer_id: buffer_id.to_proto(),
354 })
355 .await?;
356 Ok(response.staged_text)
357 })
358 }
359
360 fn open_uncommitted_diff(
361 &self,
362 buffer_id: BufferId,
363 cx: &App,
364 ) -> Task<Result<DiffBasesChange>> {
365 use proto::open_uncommitted_diff_response::Mode;
366
367 let project_id = self.project_id;
368 let client = self.upstream_client.clone();
369 cx.background_spawn(async move {
370 let response = client
371 .request(proto::OpenUncommittedDiff {
372 project_id,
373 buffer_id: buffer_id.to_proto(),
374 })
375 .await?;
376 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
377 let bases = match mode {
378 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
379 Mode::IndexAndHead => DiffBasesChange::SetEach {
380 head: response.committed_text,
381 index: response.staged_text,
382 },
383 };
384 Ok(bases)
385 })
386 }
387
388 pub fn wait_for_remote_buffer(
389 &mut self,
390 id: BufferId,
391 cx: &mut Context<BufferStore>,
392 ) -> Task<Result<Entity<Buffer>>> {
393 let (tx, rx) = oneshot::channel();
394 self.remote_buffer_listeners.entry(id).or_default().push(tx);
395
396 cx.spawn(|this, cx| async move {
397 if let Some(buffer) = this
398 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
399 .ok()
400 .flatten()
401 {
402 return Ok(buffer);
403 }
404
405 cx.background_spawn(async move { rx.await? }).await
406 })
407 }
408
409 fn save_remote_buffer(
410 &self,
411 buffer_handle: Entity<Buffer>,
412 new_path: Option<proto::ProjectPath>,
413 cx: &Context<BufferStore>,
414 ) -> Task<Result<()>> {
415 let buffer = buffer_handle.read(cx);
416 let buffer_id = buffer.remote_id().into();
417 let version = buffer.version();
418 let rpc = self.upstream_client.clone();
419 let project_id = self.project_id;
420 cx.spawn(move |_, mut cx| async move {
421 let response = rpc
422 .request(proto::SaveBuffer {
423 project_id,
424 buffer_id,
425 new_path,
426 version: serialize_version(&version),
427 })
428 .await?;
429 let version = deserialize_version(&response.version);
430 let mtime = response.mtime.map(|mtime| mtime.into());
431
432 buffer_handle.update(&mut cx, |buffer, cx| {
433 buffer.did_save(version.clone(), mtime, cx);
434 })?;
435
436 Ok(())
437 })
438 }
439
440 pub fn handle_create_buffer_for_peer(
441 &mut self,
442 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
443 replica_id: u16,
444 capability: Capability,
445 cx: &mut Context<BufferStore>,
446 ) -> Result<Option<Entity<Buffer>>> {
447 match envelope
448 .payload
449 .variant
450 .ok_or_else(|| anyhow!("missing variant"))?
451 {
452 proto::create_buffer_for_peer::Variant::State(mut state) => {
453 let buffer_id = BufferId::new(state.id)?;
454
455 let buffer_result = maybe!({
456 let mut buffer_file = None;
457 if let Some(file) = state.file.take() {
458 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
459 let worktree = self
460 .worktree_store
461 .read(cx)
462 .worktree_for_id(worktree_id, cx)
463 .ok_or_else(|| {
464 anyhow!("no worktree found for id {}", file.worktree_id)
465 })?;
466 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
467 as Arc<dyn language::File>);
468 }
469 Buffer::from_proto(replica_id, capability, state, buffer_file)
470 });
471
472 match buffer_result {
473 Ok(buffer) => {
474 let buffer = cx.new(|_| buffer);
475 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
476 }
477 Err(error) => {
478 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
479 for listener in listeners {
480 listener.send(Err(anyhow!(error.cloned()))).ok();
481 }
482 }
483 }
484 }
485 }
486 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
487 let buffer_id = BufferId::new(chunk.buffer_id)?;
488 let buffer = self
489 .loading_remote_buffers_by_id
490 .get(&buffer_id)
491 .cloned()
492 .ok_or_else(|| {
493 anyhow!(
494 "received chunk for buffer {} without initial state",
495 chunk.buffer_id
496 )
497 })?;
498
499 let result = maybe!({
500 let operations = chunk
501 .operations
502 .into_iter()
503 .map(language::proto::deserialize_operation)
504 .collect::<Result<Vec<_>>>()?;
505 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
506 anyhow::Ok(())
507 });
508
509 if let Err(error) = result {
510 self.loading_remote_buffers_by_id.remove(&buffer_id);
511 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
512 for listener in listeners {
513 listener.send(Err(error.cloned())).ok();
514 }
515 }
516 } else if chunk.is_last {
517 self.loading_remote_buffers_by_id.remove(&buffer_id);
518 if self.upstream_client.is_via_collab() {
519 // retain buffers sent by peers to avoid races.
520 self.shared_with_me.insert(buffer.clone());
521 }
522
523 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
524 for sender in senders {
525 sender.send(Ok(buffer.clone())).ok();
526 }
527 }
528 return Ok(Some(buffer));
529 }
530 }
531 }
532 return Ok(None);
533 }
534
535 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
536 self.loading_remote_buffers_by_id
537 .keys()
538 .copied()
539 .collect::<Vec<_>>()
540 }
541
542 pub fn deserialize_project_transaction(
543 &self,
544 message: proto::ProjectTransaction,
545 push_to_history: bool,
546 cx: &mut Context<BufferStore>,
547 ) -> Task<Result<ProjectTransaction>> {
548 cx.spawn(|this, mut cx| async move {
549 let mut project_transaction = ProjectTransaction::default();
550 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
551 {
552 let buffer_id = BufferId::new(buffer_id)?;
553 let buffer = this
554 .update(&mut cx, |this, cx| {
555 this.wait_for_remote_buffer(buffer_id, cx)
556 })?
557 .await?;
558 let transaction = language::proto::deserialize_transaction(transaction)?;
559 project_transaction.0.insert(buffer, transaction);
560 }
561
562 for (buffer, transaction) in &project_transaction.0 {
563 buffer
564 .update(&mut cx, |buffer, _| {
565 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
566 })?
567 .await?;
568
569 if push_to_history {
570 buffer.update(&mut cx, |buffer, _| {
571 buffer.push_transaction(transaction.clone(), Instant::now());
572 })?;
573 }
574 }
575
576 Ok(project_transaction)
577 })
578 }
579
580 fn open_buffer(
581 &self,
582 path: Arc<Path>,
583 worktree: Entity<Worktree>,
584 cx: &mut Context<BufferStore>,
585 ) -> Task<Result<Entity<Buffer>>> {
586 let worktree_id = worktree.read(cx).id().to_proto();
587 let project_id = self.project_id;
588 let client = self.upstream_client.clone();
589 cx.spawn(move |this, mut cx| async move {
590 let response = client
591 .request(proto::OpenBufferByPath {
592 project_id,
593 worktree_id,
594 path: path.to_proto(),
595 })
596 .await?;
597 let buffer_id = BufferId::new(response.buffer_id)?;
598
599 let buffer = this
600 .update(&mut cx, {
601 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
602 })?
603 .await?;
604
605 Ok(buffer)
606 })
607 }
608
609 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
610 let create = self.upstream_client.request(proto::OpenNewBuffer {
611 project_id: self.project_id,
612 });
613 cx.spawn(|this, mut cx| async move {
614 let response = create.await?;
615 let buffer_id = BufferId::new(response.buffer_id)?;
616
617 this.update(&mut cx, |this, cx| {
618 this.wait_for_remote_buffer(buffer_id, cx)
619 })?
620 .await
621 })
622 }
623
624 fn reload_buffers(
625 &self,
626 buffers: HashSet<Entity<Buffer>>,
627 push_to_history: bool,
628 cx: &mut Context<BufferStore>,
629 ) -> Task<Result<ProjectTransaction>> {
630 let request = self.upstream_client.request(proto::ReloadBuffers {
631 project_id: self.project_id,
632 buffer_ids: buffers
633 .iter()
634 .map(|buffer| buffer.read(cx).remote_id().to_proto())
635 .collect(),
636 });
637
638 cx.spawn(|this, mut cx| async move {
639 let response = request
640 .await?
641 .transaction
642 .ok_or_else(|| anyhow!("missing transaction"))?;
643 this.update(&mut cx, |this, cx| {
644 this.deserialize_project_transaction(response, push_to_history, cx)
645 })?
646 .await
647 })
648 }
649}
650
651impl LocalBufferStore {
652 fn worktree_for_buffer(
653 &self,
654 buffer: &Entity<Buffer>,
655 cx: &App,
656 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
657 let file = buffer.read(cx).file()?;
658 let worktree_id = file.worktree_id(cx);
659 let path = file.path().clone();
660 let worktree = self
661 .worktree_store
662 .read(cx)
663 .worktree_for_id(worktree_id, cx)?;
664 Some((worktree, path))
665 }
666
667 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
668 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
669 worktree.read(cx).load_staged_file(path.as_ref(), cx)
670 } else {
671 return Task::ready(Err(anyhow!("no such worktree")));
672 }
673 }
674
675 fn load_committed_text(
676 &self,
677 buffer: &Entity<Buffer>,
678 cx: &App,
679 ) -> Task<Result<Option<String>>> {
680 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
681 worktree.read(cx).load_committed_file(path.as_ref(), cx)
682 } else {
683 Task::ready(Err(anyhow!("no such worktree")))
684 }
685 }
686
687 fn save_local_buffer(
688 &self,
689 buffer_handle: Entity<Buffer>,
690 worktree: Entity<Worktree>,
691 path: Arc<Path>,
692 mut has_changed_file: bool,
693 cx: &mut Context<BufferStore>,
694 ) -> Task<Result<()>> {
695 let buffer = buffer_handle.read(cx);
696
697 let text = buffer.as_rope().clone();
698 let line_ending = buffer.line_ending();
699 let version = buffer.version();
700 let buffer_id = buffer.remote_id();
701 if buffer
702 .file()
703 .is_some_and(|file| file.disk_state() == DiskState::New)
704 {
705 has_changed_file = true;
706 }
707
708 let save = worktree.update(cx, |worktree, cx| {
709 worktree.write_file(path.as_ref(), text, line_ending, cx)
710 });
711
712 cx.spawn(move |this, mut cx| async move {
713 let new_file = save.await?;
714 let mtime = new_file.disk_state().mtime();
715 this.update(&mut cx, |this, cx| {
716 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
717 if has_changed_file {
718 downstream_client
719 .send(proto::UpdateBufferFile {
720 project_id,
721 buffer_id: buffer_id.to_proto(),
722 file: Some(language::File::to_proto(&*new_file, cx)),
723 })
724 .log_err();
725 }
726 downstream_client
727 .send(proto::BufferSaved {
728 project_id,
729 buffer_id: buffer_id.to_proto(),
730 version: serialize_version(&version),
731 mtime: mtime.map(|time| time.into()),
732 })
733 .log_err();
734 }
735 })?;
736 buffer_handle.update(&mut cx, |buffer, cx| {
737 if has_changed_file {
738 buffer.file_updated(new_file, cx);
739 }
740 buffer.did_save(version.clone(), mtime, cx);
741 })
742 })
743 }
744
745 fn subscribe_to_worktree(
746 &mut self,
747 worktree: &Entity<Worktree>,
748 cx: &mut Context<BufferStore>,
749 ) {
750 cx.subscribe(worktree, |this, worktree, event, cx| {
751 if worktree.read(cx).is_local() {
752 match event {
753 worktree::Event::UpdatedEntries(changes) => {
754 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
755 }
756 worktree::Event::UpdatedGitRepositories(updated_repos) => {
757 Self::local_worktree_git_repos_changed(
758 this,
759 worktree.clone(),
760 updated_repos,
761 cx,
762 )
763 }
764 _ => {}
765 }
766 }
767 })
768 .detach();
769 }
770
771 fn local_worktree_entries_changed(
772 this: &mut BufferStore,
773 worktree_handle: &Entity<Worktree>,
774 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
775 cx: &mut Context<BufferStore>,
776 ) {
777 let snapshot = worktree_handle.read(cx).snapshot();
778 for (path, entry_id, _) in changes {
779 Self::local_worktree_entry_changed(
780 this,
781 *entry_id,
782 path,
783 worktree_handle,
784 &snapshot,
785 cx,
786 );
787 }
788 }
789
790 fn local_worktree_git_repos_changed(
791 this: &mut BufferStore,
792 worktree_handle: Entity<Worktree>,
793 changed_repos: &UpdatedGitRepositoriesSet,
794 cx: &mut Context<BufferStore>,
795 ) {
796 debug_assert!(worktree_handle.read(cx).is_local());
797
798 let mut diff_state_updates = Vec::new();
799 for buffer in this.opened_buffers.values() {
800 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
801 continue;
802 };
803 let Some(buffer) = buffer.upgrade() else {
804 continue;
805 };
806 let buffer = buffer.read(cx);
807 let Some(file) = File::from_dyn(buffer.file()) else {
808 continue;
809 };
810 if file.worktree != worktree_handle {
811 continue;
812 }
813 let diff_state = diff_state.read(cx);
814 if changed_repos
815 .iter()
816 .any(|(work_dir, _)| file.path.starts_with(work_dir))
817 {
818 let snapshot = buffer.text_snapshot();
819 let has_unstaged_diff = diff_state
820 .unstaged_diff
821 .as_ref()
822 .is_some_and(|diff| diff.is_upgradable());
823 let has_uncommitted_diff = diff_state
824 .uncommitted_diff
825 .as_ref()
826 .is_some_and(|set| set.is_upgradable());
827 diff_state_updates.push((
828 snapshot.clone(),
829 file.path.clone(),
830 has_unstaged_diff.then(|| diff_state.index_text.clone()),
831 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
832 ));
833 }
834 }
835
836 if diff_state_updates.is_empty() {
837 return;
838 }
839
840 cx.spawn(move |this, mut cx| async move {
841 let snapshot =
842 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
843 let diff_bases_changes_by_buffer = cx
844 .background_spawn(async move {
845 diff_state_updates
846 .into_iter()
847 .filter_map(
848 |(buffer_snapshot, path, current_index_text, current_head_text)| {
849 let local_repo = snapshot.local_repo_for_path(&path)?;
850 let relative_path = local_repo.relativize(&path).ok()?;
851 let index_text = if current_index_text.is_some() {
852 local_repo.repo().load_index_text(&relative_path)
853 } else {
854 None
855 };
856 let head_text = if current_head_text.is_some() {
857 local_repo.repo().load_committed_text(&relative_path)
858 } else {
859 None
860 };
861
862 // Avoid triggering a diff update if the base text has not changed.
863 if let Some((current_index, current_head)) =
864 current_index_text.as_ref().zip(current_head_text.as_ref())
865 {
866 if current_index.as_deref() == index_text.as_ref()
867 && current_head.as_deref() == head_text.as_ref()
868 {
869 return None;
870 }
871 }
872
873 let diff_bases_change = match (
874 current_index_text.is_some(),
875 current_head_text.is_some(),
876 ) {
877 (true, true) => Some(if index_text == head_text {
878 DiffBasesChange::SetBoth(head_text)
879 } else {
880 DiffBasesChange::SetEach {
881 index: index_text,
882 head: head_text,
883 }
884 }),
885 (true, false) => Some(DiffBasesChange::SetIndex(index_text)),
886 (false, true) => Some(DiffBasesChange::SetHead(head_text)),
887 (false, false) => None,
888 };
889 Some((buffer_snapshot, diff_bases_change))
890 },
891 )
892 .collect::<Vec<_>>()
893 })
894 .await;
895
896 this.update(&mut cx, |this, cx| {
897 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
898 let Some(OpenBuffer::Complete { diff_state, .. }) =
899 this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
900 else {
901 continue;
902 };
903 let Some(diff_bases_change) = diff_bases_change else {
904 continue;
905 };
906
907 diff_state.update(cx, |diff_state, cx| {
908 use proto::update_diff_bases::Mode;
909
910 if let Some((client, project_id)) = this.downstream_client.as_ref() {
911 let buffer_id = buffer_snapshot.remote_id().to_proto();
912 let (staged_text, committed_text, mode) = match diff_bases_change
913 .clone()
914 {
915 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
916 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
917 DiffBasesChange::SetEach { index, head } => {
918 (index, head, Mode::IndexAndHead)
919 }
920 DiffBasesChange::SetBoth(text) => {
921 (None, text, Mode::IndexMatchesHead)
922 }
923 };
924 let message = proto::UpdateDiffBases {
925 project_id: *project_id,
926 buffer_id,
927 staged_text,
928 committed_text,
929 mode: mode as i32,
930 };
931
932 client.send(message).log_err();
933 }
934
935 let _ =
936 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
937 });
938 }
939 })
940 })
941 .detach_and_log_err(cx);
942 }
943
944 fn local_worktree_entry_changed(
945 this: &mut BufferStore,
946 entry_id: ProjectEntryId,
947 path: &Arc<Path>,
948 worktree: &Entity<worktree::Worktree>,
949 snapshot: &worktree::Snapshot,
950 cx: &mut Context<BufferStore>,
951 ) -> Option<()> {
952 let project_path = ProjectPath {
953 worktree_id: snapshot.id(),
954 path: path.clone(),
955 };
956
957 let buffer_id = {
958 let local = this.as_local_mut()?;
959 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
960 Some(&buffer_id) => buffer_id,
961 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
962 }
963 };
964
965 let buffer = if let Some(buffer) = this.get(buffer_id) {
966 Some(buffer)
967 } else {
968 this.opened_buffers.remove(&buffer_id);
969 None
970 };
971
972 let buffer = if let Some(buffer) = buffer {
973 buffer
974 } else {
975 let this = this.as_local_mut()?;
976 this.local_buffer_ids_by_path.remove(&project_path);
977 this.local_buffer_ids_by_entry_id.remove(&entry_id);
978 return None;
979 };
980
981 let events = buffer.update(cx, |buffer, cx| {
982 let local = this.as_local_mut()?;
983 let file = buffer.file()?;
984 let old_file = File::from_dyn(Some(file))?;
985 if old_file.worktree != *worktree {
986 return None;
987 }
988
989 let snapshot_entry = old_file
990 .entry_id
991 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
992 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
993
994 let new_file = if let Some(entry) = snapshot_entry {
995 File {
996 disk_state: match entry.mtime {
997 Some(mtime) => DiskState::Present { mtime },
998 None => old_file.disk_state,
999 },
1000 is_local: true,
1001 entry_id: Some(entry.id),
1002 path: entry.path.clone(),
1003 worktree: worktree.clone(),
1004 is_private: entry.is_private,
1005 }
1006 } else {
1007 File {
1008 disk_state: DiskState::Deleted,
1009 is_local: true,
1010 entry_id: old_file.entry_id,
1011 path: old_file.path.clone(),
1012 worktree: worktree.clone(),
1013 is_private: old_file.is_private,
1014 }
1015 };
1016
1017 if new_file == *old_file {
1018 return None;
1019 }
1020
1021 let mut events = Vec::new();
1022 if new_file.path != old_file.path {
1023 local.local_buffer_ids_by_path.remove(&ProjectPath {
1024 path: old_file.path.clone(),
1025 worktree_id: old_file.worktree_id(cx),
1026 });
1027 local.local_buffer_ids_by_path.insert(
1028 ProjectPath {
1029 worktree_id: new_file.worktree_id(cx),
1030 path: new_file.path.clone(),
1031 },
1032 buffer_id,
1033 );
1034 events.push(BufferStoreEvent::BufferChangedFilePath {
1035 buffer: cx.entity(),
1036 old_file: buffer.file().cloned(),
1037 });
1038 }
1039
1040 if new_file.entry_id != old_file.entry_id {
1041 if let Some(entry_id) = old_file.entry_id {
1042 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1043 }
1044 if let Some(entry_id) = new_file.entry_id {
1045 local
1046 .local_buffer_ids_by_entry_id
1047 .insert(entry_id, buffer_id);
1048 }
1049 }
1050
1051 if let Some((client, project_id)) = &this.downstream_client {
1052 client
1053 .send(proto::UpdateBufferFile {
1054 project_id: *project_id,
1055 buffer_id: buffer_id.to_proto(),
1056 file: Some(new_file.to_proto(cx)),
1057 })
1058 .ok();
1059 }
1060
1061 buffer.file_updated(Arc::new(new_file), cx);
1062 Some(events)
1063 })?;
1064
1065 for event in events {
1066 cx.emit(event);
1067 }
1068
1069 None
1070 }
1071
1072 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1073 let file = File::from_dyn(buffer.read(cx).file())?;
1074
1075 let remote_id = buffer.read(cx).remote_id();
1076 if let Some(entry_id) = file.entry_id {
1077 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1078 Some(_) => {
1079 return None;
1080 }
1081 None => {
1082 self.local_buffer_ids_by_entry_id
1083 .insert(entry_id, remote_id);
1084 }
1085 }
1086 };
1087 self.local_buffer_ids_by_path.insert(
1088 ProjectPath {
1089 worktree_id: file.worktree_id(cx),
1090 path: file.path.clone(),
1091 },
1092 remote_id,
1093 );
1094
1095 Some(())
1096 }
1097
1098 fn save_buffer(
1099 &self,
1100 buffer: Entity<Buffer>,
1101 cx: &mut Context<BufferStore>,
1102 ) -> Task<Result<()>> {
1103 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1104 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1105 };
1106 let worktree = file.worktree.clone();
1107 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1108 }
1109
1110 fn save_buffer_as(
1111 &self,
1112 buffer: Entity<Buffer>,
1113 path: ProjectPath,
1114 cx: &mut Context<BufferStore>,
1115 ) -> Task<Result<()>> {
1116 let Some(worktree) = self
1117 .worktree_store
1118 .read(cx)
1119 .worktree_for_id(path.worktree_id, cx)
1120 else {
1121 return Task::ready(Err(anyhow!("no such worktree")));
1122 };
1123 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1124 }
1125
1126 fn open_buffer(
1127 &self,
1128 path: Arc<Path>,
1129 worktree: Entity<Worktree>,
1130 cx: &mut Context<BufferStore>,
1131 ) -> Task<Result<Entity<Buffer>>> {
1132 let load_buffer = worktree.update(cx, |worktree, cx| {
1133 let load_file = worktree.load_file(path.as_ref(), cx);
1134 let reservation = cx.reserve_entity();
1135 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1136 cx.spawn(move |_, mut cx| async move {
1137 let loaded = load_file.await?;
1138 let text_buffer = cx
1139 .background_spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1140 .await;
1141 cx.insert_entity(reservation, |_| {
1142 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1143 })
1144 })
1145 });
1146
1147 cx.spawn(move |this, mut cx| async move {
1148 let buffer = match load_buffer.await {
1149 Ok(buffer) => Ok(buffer),
1150 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1151 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1152 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1153 Buffer::build(
1154 text_buffer,
1155 Some(Arc::new(File {
1156 worktree,
1157 path,
1158 disk_state: DiskState::New,
1159 entry_id: None,
1160 is_local: true,
1161 is_private: false,
1162 })),
1163 Capability::ReadWrite,
1164 )
1165 }),
1166 Err(e) => Err(e),
1167 }?;
1168 this.update(&mut cx, |this, cx| {
1169 this.add_buffer(buffer.clone(), cx)?;
1170 let buffer_id = buffer.read(cx).remote_id();
1171 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1172 let this = this.as_local_mut().unwrap();
1173 this.local_buffer_ids_by_path.insert(
1174 ProjectPath {
1175 worktree_id: file.worktree_id(cx),
1176 path: file.path.clone(),
1177 },
1178 buffer_id,
1179 );
1180
1181 if let Some(entry_id) = file.entry_id {
1182 this.local_buffer_ids_by_entry_id
1183 .insert(entry_id, buffer_id);
1184 }
1185 }
1186
1187 anyhow::Ok(())
1188 })??;
1189
1190 Ok(buffer)
1191 })
1192 }
1193
1194 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1195 cx.spawn(|buffer_store, mut cx| async move {
1196 let buffer =
1197 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1198 buffer_store.update(&mut cx, |buffer_store, cx| {
1199 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1200 })?;
1201 Ok(buffer)
1202 })
1203 }
1204
1205 fn reload_buffers(
1206 &self,
1207 buffers: HashSet<Entity<Buffer>>,
1208 push_to_history: bool,
1209 cx: &mut Context<BufferStore>,
1210 ) -> Task<Result<ProjectTransaction>> {
1211 cx.spawn(move |_, mut cx| async move {
1212 let mut project_transaction = ProjectTransaction::default();
1213 for buffer in buffers {
1214 let transaction = buffer
1215 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1216 .await?;
1217 buffer.update(&mut cx, |buffer, cx| {
1218 if let Some(transaction) = transaction {
1219 if !push_to_history {
1220 buffer.forget_transaction(transaction.id);
1221 }
1222 project_transaction.0.insert(cx.entity(), transaction);
1223 }
1224 })?;
1225 }
1226
1227 Ok(project_transaction)
1228 })
1229 }
1230}
1231
1232impl BufferStore {
1233 pub fn init(client: &AnyProtoClient) {
1234 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1235 client.add_entity_message_handler(Self::handle_buffer_saved);
1236 client.add_entity_message_handler(Self::handle_update_buffer_file);
1237 client.add_entity_request_handler(Self::handle_save_buffer);
1238 client.add_entity_request_handler(Self::handle_blame_buffer);
1239 client.add_entity_request_handler(Self::handle_reload_buffers);
1240 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1241 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1242 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1243 client.add_entity_message_handler(Self::handle_update_diff_bases);
1244 }
1245
1246 /// Creates a buffer store, optionally retaining its buffers.
1247 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1248 Self {
1249 state: BufferStoreState::Local(LocalBufferStore {
1250 local_buffer_ids_by_path: Default::default(),
1251 local_buffer_ids_by_entry_id: Default::default(),
1252 worktree_store: worktree_store.clone(),
1253 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1254 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1255 let this = this.as_local_mut().unwrap();
1256 this.subscribe_to_worktree(worktree, cx);
1257 }
1258 }),
1259 }),
1260 downstream_client: None,
1261 opened_buffers: Default::default(),
1262 shared_buffers: Default::default(),
1263 loading_buffers: Default::default(),
1264 loading_diffs: Default::default(),
1265 worktree_store,
1266 }
1267 }
1268
1269 pub fn remote(
1270 worktree_store: Entity<WorktreeStore>,
1271 upstream_client: AnyProtoClient,
1272 remote_id: u64,
1273 _cx: &mut Context<Self>,
1274 ) -> Self {
1275 Self {
1276 state: BufferStoreState::Remote(RemoteBufferStore {
1277 shared_with_me: Default::default(),
1278 loading_remote_buffers_by_id: Default::default(),
1279 remote_buffer_listeners: Default::default(),
1280 project_id: remote_id,
1281 upstream_client,
1282 worktree_store: worktree_store.clone(),
1283 }),
1284 downstream_client: None,
1285 opened_buffers: Default::default(),
1286 loading_buffers: Default::default(),
1287 loading_diffs: Default::default(),
1288 shared_buffers: Default::default(),
1289 worktree_store,
1290 }
1291 }
1292
1293 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1294 match &mut self.state {
1295 BufferStoreState::Local(state) => Some(state),
1296 _ => None,
1297 }
1298 }
1299
1300 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1301 match &mut self.state {
1302 BufferStoreState::Remote(state) => Some(state),
1303 _ => None,
1304 }
1305 }
1306
1307 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1308 match &self.state {
1309 BufferStoreState::Remote(state) => Some(state),
1310 _ => None,
1311 }
1312 }
1313
1314 pub fn open_buffer(
1315 &mut self,
1316 project_path: ProjectPath,
1317 cx: &mut Context<Self>,
1318 ) -> Task<Result<Entity<Buffer>>> {
1319 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1320 return Task::ready(Ok(buffer));
1321 }
1322
1323 let task = match self.loading_buffers.entry(project_path.clone()) {
1324 hash_map::Entry::Occupied(e) => e.get().clone(),
1325 hash_map::Entry::Vacant(entry) => {
1326 let path = project_path.path.clone();
1327 let Some(worktree) = self
1328 .worktree_store
1329 .read(cx)
1330 .worktree_for_id(project_path.worktree_id, cx)
1331 else {
1332 return Task::ready(Err(anyhow!("no such worktree")));
1333 };
1334 let load_buffer = match &self.state {
1335 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1336 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1337 };
1338
1339 entry
1340 .insert(
1341 cx.spawn(move |this, mut cx| async move {
1342 let load_result = load_buffer.await;
1343 this.update(&mut cx, |this, _cx| {
1344 // Record the fact that the buffer is no longer loading.
1345 this.loading_buffers.remove(&project_path);
1346 })
1347 .ok();
1348 load_result.map_err(Arc::new)
1349 })
1350 .shared(),
1351 )
1352 .clone()
1353 }
1354 };
1355
1356 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1357 }
1358
1359 pub fn open_unstaged_diff(
1360 &mut self,
1361 buffer: Entity<Buffer>,
1362 cx: &mut Context<Self>,
1363 ) -> Task<Result<Entity<BufferDiff>>> {
1364 let buffer_id = buffer.read(cx).remote_id();
1365 if let Some(diff) = self.get_unstaged_diff(buffer_id, cx) {
1366 return Task::ready(Ok(diff));
1367 }
1368
1369 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1370 hash_map::Entry::Occupied(e) => e.get().clone(),
1371 hash_map::Entry::Vacant(entry) => {
1372 let staged_text = match &self.state {
1373 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1374 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1375 };
1376
1377 entry
1378 .insert(
1379 cx.spawn(move |this, cx| async move {
1380 Self::open_diff_internal(
1381 this,
1382 DiffKind::Unstaged,
1383 staged_text.await.map(DiffBasesChange::SetIndex),
1384 buffer,
1385 cx,
1386 )
1387 .await
1388 .map_err(Arc::new)
1389 })
1390 .shared(),
1391 )
1392 .clone()
1393 }
1394 };
1395
1396 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1397 }
1398
1399 pub fn open_uncommitted_diff(
1400 &mut self,
1401 buffer: Entity<Buffer>,
1402 cx: &mut Context<Self>,
1403 ) -> Task<Result<Entity<BufferDiff>>> {
1404 let buffer_id = buffer.read(cx).remote_id();
1405 if let Some(diff) = self.get_uncommitted_diff(buffer_id, cx) {
1406 return Task::ready(Ok(diff));
1407 }
1408
1409 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1410 hash_map::Entry::Occupied(e) => e.get().clone(),
1411 hash_map::Entry::Vacant(entry) => {
1412 let changes = match &self.state {
1413 BufferStoreState::Local(this) => {
1414 let committed_text = this.load_committed_text(&buffer, cx);
1415 let staged_text = this.load_staged_text(&buffer, cx);
1416 cx.background_spawn(async move {
1417 let committed_text = committed_text.await?;
1418 let staged_text = staged_text.await?;
1419 let diff_bases_change = if committed_text == staged_text {
1420 DiffBasesChange::SetBoth(committed_text)
1421 } else {
1422 DiffBasesChange::SetEach {
1423 index: staged_text,
1424 head: committed_text,
1425 }
1426 };
1427 Ok(diff_bases_change)
1428 })
1429 }
1430 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1431 };
1432
1433 entry
1434 .insert(
1435 cx.spawn(move |this, cx| async move {
1436 Self::open_diff_internal(
1437 this,
1438 DiffKind::Uncommitted,
1439 changes.await,
1440 buffer,
1441 cx,
1442 )
1443 .await
1444 .map_err(Arc::new)
1445 })
1446 .shared(),
1447 )
1448 .clone()
1449 }
1450 };
1451
1452 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1453 }
1454
1455 async fn open_diff_internal(
1456 this: WeakEntity<Self>,
1457 kind: DiffKind,
1458 texts: Result<DiffBasesChange>,
1459 buffer_entity: Entity<Buffer>,
1460 mut cx: AsyncApp,
1461 ) -> Result<Entity<BufferDiff>> {
1462 let diff_bases_change = match texts {
1463 Err(e) => {
1464 this.update(&mut cx, |this, cx| {
1465 let buffer = buffer_entity.read(cx);
1466 let buffer_id = buffer.remote_id();
1467 this.loading_diffs.remove(&(buffer_id, kind));
1468 })?;
1469 return Err(e);
1470 }
1471 Ok(change) => change,
1472 };
1473
1474 this.update(&mut cx, |this, cx| {
1475 let buffer = buffer_entity.read(cx);
1476 let buffer_id = buffer.remote_id();
1477 let language = buffer.language().cloned();
1478 let language_registry = buffer.language_registry();
1479 let text_snapshot = buffer.text_snapshot();
1480 this.loading_diffs.remove(&(buffer_id, kind));
1481
1482 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1483 this.opened_buffers.get_mut(&buffer_id)
1484 {
1485 diff_state.update(cx, |diff_state, cx| {
1486 diff_state.language = language;
1487 diff_state.language_registry = language_registry;
1488
1489 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
1490 match kind {
1491 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1492 DiffKind::Uncommitted => {
1493 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1494 diff
1495 } else {
1496 let unstaged_diff =
1497 cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
1498 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1499 unstaged_diff
1500 };
1501
1502 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
1503 diff_state.uncommitted_diff = Some(diff.downgrade())
1504 }
1505 };
1506
1507 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1508
1509 Ok(async move {
1510 rx.await.ok();
1511 Ok(diff)
1512 })
1513 })
1514 } else {
1515 Err(anyhow!("buffer was closed"))
1516 }
1517 })??
1518 .await
1519 }
1520
1521 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1522 match &self.state {
1523 BufferStoreState::Local(this) => this.create_buffer(cx),
1524 BufferStoreState::Remote(this) => this.create_buffer(cx),
1525 }
1526 }
1527
1528 pub fn save_buffer(
1529 &mut self,
1530 buffer: Entity<Buffer>,
1531 cx: &mut Context<Self>,
1532 ) -> Task<Result<()>> {
1533 match &mut self.state {
1534 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1535 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1536 }
1537 }
1538
1539 pub fn save_buffer_as(
1540 &mut self,
1541 buffer: Entity<Buffer>,
1542 path: ProjectPath,
1543 cx: &mut Context<Self>,
1544 ) -> Task<Result<()>> {
1545 let old_file = buffer.read(cx).file().cloned();
1546 let task = match &self.state {
1547 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1548 BufferStoreState::Remote(this) => {
1549 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1550 }
1551 };
1552 cx.spawn(|this, mut cx| async move {
1553 task.await?;
1554 this.update(&mut cx, |_, cx| {
1555 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1556 })
1557 })
1558 }
1559
1560 pub fn blame_buffer(
1561 &self,
1562 buffer: &Entity<Buffer>,
1563 version: Option<clock::Global>,
1564 cx: &App,
1565 ) -> Task<Result<Option<Blame>>> {
1566 let buffer = buffer.read(cx);
1567 let Some(file) = File::from_dyn(buffer.file()) else {
1568 return Task::ready(Err(anyhow!("buffer has no file")));
1569 };
1570
1571 match file.worktree.clone().read(cx) {
1572 Worktree::Local(worktree) => {
1573 let worktree = worktree.snapshot();
1574 let blame_params = maybe!({
1575 let local_repo = match worktree.local_repo_for_path(&file.path) {
1576 Some(repo_for_path) => repo_for_path,
1577 None => return Ok(None),
1578 };
1579
1580 let relative_path = local_repo
1581 .relativize(&file.path)
1582 .context("failed to relativize buffer path")?;
1583
1584 let repo = local_repo.repo().clone();
1585
1586 let content = match version {
1587 Some(version) => buffer.rope_for_version(&version).clone(),
1588 None => buffer.as_rope().clone(),
1589 };
1590
1591 anyhow::Ok(Some((repo, relative_path, content)))
1592 });
1593
1594 cx.background_spawn(async move {
1595 let Some((repo, relative_path, content)) = blame_params? else {
1596 return Ok(None);
1597 };
1598 repo.blame(&relative_path, content)
1599 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1600 .map(Some)
1601 })
1602 }
1603 Worktree::Remote(worktree) => {
1604 let buffer_id = buffer.remote_id();
1605 let version = buffer.version();
1606 let project_id = worktree.project_id();
1607 let client = worktree.client();
1608 cx.spawn(|_| async move {
1609 let response = client
1610 .request(proto::BlameBuffer {
1611 project_id,
1612 buffer_id: buffer_id.into(),
1613 version: serialize_version(&version),
1614 })
1615 .await?;
1616 Ok(deserialize_blame_buffer_response(response))
1617 })
1618 }
1619 }
1620 }
1621
1622 pub fn get_permalink_to_line(
1623 &self,
1624 buffer: &Entity<Buffer>,
1625 selection: Range<u32>,
1626 cx: &App,
1627 ) -> Task<Result<url::Url>> {
1628 let buffer = buffer.read(cx);
1629 let Some(file) = File::from_dyn(buffer.file()) else {
1630 return Task::ready(Err(anyhow!("buffer has no file")));
1631 };
1632
1633 match file.worktree.read(cx) {
1634 Worktree::Local(worktree) => {
1635 let worktree_path = worktree.abs_path().clone();
1636 let Some((repo_entry, repo)) =
1637 worktree.repository_for_path(file.path()).and_then(|entry| {
1638 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1639 Some((entry, repo))
1640 })
1641 else {
1642 // If we're not in a Git repo, check whether this is a Rust source
1643 // file in the Cargo registry (presumably opened with go-to-definition
1644 // from a normal Rust file). If so, we can put together a permalink
1645 // using crate metadata.
1646 if buffer
1647 .language()
1648 .is_none_or(|lang| lang.name() != "Rust".into())
1649 {
1650 return Task::ready(Err(anyhow!("no permalink available")));
1651 }
1652 let file_path = worktree_path.join(file.path());
1653 return cx.spawn(|cx| async move {
1654 let provider_registry =
1655 cx.update(GitHostingProviderRegistry::default_global)?;
1656 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1657 .map_err(|_| anyhow!("no permalink available"))
1658 });
1659 };
1660
1661 let path = match repo_entry.relativize(file.path()) {
1662 Ok(RepoPath(path)) => path,
1663 Err(e) => return Task::ready(Err(e)),
1664 };
1665
1666 cx.spawn(|cx| async move {
1667 const REMOTE_NAME: &str = "origin";
1668 let origin_url = repo
1669 .remote_url(REMOTE_NAME)
1670 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1671
1672 let sha = repo
1673 .head_sha()
1674 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1675
1676 let provider_registry =
1677 cx.update(GitHostingProviderRegistry::default_global)?;
1678
1679 let (provider, remote) =
1680 parse_git_remote_url(provider_registry, &origin_url)
1681 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1682
1683 let path = path
1684 .to_str()
1685 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1686
1687 Ok(provider.build_permalink(
1688 remote,
1689 BuildPermalinkParams {
1690 sha: &sha,
1691 path,
1692 selection: Some(selection),
1693 },
1694 ))
1695 })
1696 }
1697 Worktree::Remote(worktree) => {
1698 let buffer_id = buffer.remote_id();
1699 let project_id = worktree.project_id();
1700 let client = worktree.client();
1701 cx.spawn(|_| async move {
1702 let response = client
1703 .request(proto::GetPermalinkToLine {
1704 project_id,
1705 buffer_id: buffer_id.into(),
1706 selection: Some(proto::Range {
1707 start: selection.start as u64,
1708 end: selection.end as u64,
1709 }),
1710 })
1711 .await?;
1712
1713 url::Url::parse(&response.permalink).context("failed to parse permalink")
1714 })
1715 }
1716 }
1717 }
1718
1719 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1720 let buffer = buffer_entity.read(cx);
1721 let language = buffer.language().cloned();
1722 let language_registry = buffer.language_registry();
1723 let remote_id = buffer.remote_id();
1724 let is_remote = buffer.replica_id() != 0;
1725 let open_buffer = OpenBuffer::Complete {
1726 buffer: buffer_entity.downgrade(),
1727 diff_state: cx.new(|_| BufferDiffState {
1728 language,
1729 language_registry,
1730 ..Default::default()
1731 }),
1732 };
1733
1734 let handle = cx.entity().downgrade();
1735 buffer_entity.update(cx, move |_, cx| {
1736 cx.on_release(move |buffer, cx| {
1737 handle
1738 .update(cx, |_, cx| {
1739 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1740 })
1741 .ok();
1742 })
1743 .detach()
1744 });
1745
1746 match self.opened_buffers.entry(remote_id) {
1747 hash_map::Entry::Vacant(entry) => {
1748 entry.insert(open_buffer);
1749 }
1750 hash_map::Entry::Occupied(mut entry) => {
1751 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1752 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1753 } else if entry.get().upgrade().is_some() {
1754 if is_remote {
1755 return Ok(());
1756 } else {
1757 debug_panic!("buffer {} was already registered", remote_id);
1758 Err(anyhow!("buffer {} was already registered", remote_id))?;
1759 }
1760 }
1761 entry.insert(open_buffer);
1762 }
1763 }
1764
1765 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1766 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1767 Ok(())
1768 }
1769
1770 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1771 self.opened_buffers
1772 .values()
1773 .filter_map(|buffer| buffer.upgrade())
1774 }
1775
1776 pub fn loading_buffers(
1777 &self,
1778 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1779 self.loading_buffers.iter().map(|(path, task)| {
1780 let task = task.clone();
1781 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1782 })
1783 }
1784
1785 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1786 self.buffers().find_map(|buffer| {
1787 let file = File::from_dyn(buffer.read(cx).file())?;
1788 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1789 Some(buffer)
1790 } else {
1791 None
1792 }
1793 })
1794 }
1795
1796 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1797 self.opened_buffers.get(&buffer_id)?.upgrade()
1798 }
1799
1800 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1801 self.get(buffer_id)
1802 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1803 }
1804
1805 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1806 self.get(buffer_id).or_else(|| {
1807 self.as_remote()
1808 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1809 })
1810 }
1811
1812 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1813 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1814 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1815 } else {
1816 None
1817 }
1818 }
1819
1820 pub fn get_uncommitted_diff(
1821 &self,
1822 buffer_id: BufferId,
1823 cx: &App,
1824 ) -> Option<Entity<BufferDiff>> {
1825 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1826 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1827 } else {
1828 None
1829 }
1830 }
1831
1832 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1833 let buffers = self
1834 .buffers()
1835 .map(|buffer| {
1836 let buffer = buffer.read(cx);
1837 proto::BufferVersion {
1838 id: buffer.remote_id().into(),
1839 version: language::proto::serialize_version(&buffer.version),
1840 }
1841 })
1842 .collect();
1843 let incomplete_buffer_ids = self
1844 .as_remote()
1845 .map(|remote| remote.incomplete_buffer_ids())
1846 .unwrap_or_default();
1847 (buffers, incomplete_buffer_ids)
1848 }
1849
1850 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1851 for open_buffer in self.opened_buffers.values_mut() {
1852 if let Some(buffer) = open_buffer.upgrade() {
1853 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1854 }
1855 }
1856
1857 for buffer in self.buffers() {
1858 buffer.update(cx, |buffer, cx| {
1859 buffer.set_capability(Capability::ReadOnly, cx)
1860 });
1861 }
1862
1863 if let Some(remote) = self.as_remote_mut() {
1864 // Wake up all futures currently waiting on a buffer to get opened,
1865 // to give them a chance to fail now that we've disconnected.
1866 remote.remote_buffer_listeners.clear()
1867 }
1868 }
1869
1870 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1871 self.downstream_client = Some((downstream_client, remote_id));
1872 }
1873
1874 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1875 self.downstream_client.take();
1876 self.forget_shared_buffers();
1877 }
1878
1879 pub fn discard_incomplete(&mut self) {
1880 self.opened_buffers
1881 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1882 }
1883
1884 pub fn find_search_candidates(
1885 &mut self,
1886 query: &SearchQuery,
1887 mut limit: usize,
1888 fs: Arc<dyn Fs>,
1889 cx: &mut Context<Self>,
1890 ) -> Receiver<Entity<Buffer>> {
1891 let (tx, rx) = smol::channel::unbounded();
1892 let mut open_buffers = HashSet::default();
1893 let mut unnamed_buffers = Vec::new();
1894 for handle in self.buffers() {
1895 let buffer = handle.read(cx);
1896 if let Some(entry_id) = buffer.entry_id(cx) {
1897 open_buffers.insert(entry_id);
1898 } else {
1899 limit = limit.saturating_sub(1);
1900 unnamed_buffers.push(handle)
1901 };
1902 }
1903
1904 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1905 let project_paths_rx = self
1906 .worktree_store
1907 .update(cx, |worktree_store, cx| {
1908 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1909 })
1910 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1911
1912 cx.spawn(|this, mut cx| async move {
1913 for buffer in unnamed_buffers {
1914 tx.send(buffer).await.ok();
1915 }
1916
1917 let mut project_paths_rx = pin!(project_paths_rx);
1918 while let Some(project_paths) = project_paths_rx.next().await {
1919 let buffers = this.update(&mut cx, |this, cx| {
1920 project_paths
1921 .into_iter()
1922 .map(|project_path| this.open_buffer(project_path, cx))
1923 .collect::<Vec<_>>()
1924 })?;
1925 for buffer_task in buffers {
1926 if let Some(buffer) = buffer_task.await.log_err() {
1927 if tx.send(buffer).await.is_err() {
1928 return anyhow::Ok(());
1929 }
1930 }
1931 }
1932 }
1933 anyhow::Ok(())
1934 })
1935 .detach();
1936 rx
1937 }
1938
1939 pub fn recalculate_buffer_diffs(
1940 &mut self,
1941 buffers: Vec<Entity<Buffer>>,
1942 cx: &mut Context<Self>,
1943 ) -> impl Future<Output = ()> {
1944 let mut futures = Vec::new();
1945 for buffer in buffers {
1946 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1947 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1948 {
1949 let buffer = buffer.read(cx).text_snapshot();
1950 futures.push(diff_state.update(cx, |diff_state, cx| {
1951 diff_state.recalculate_diffs(buffer, cx)
1952 }));
1953 }
1954 }
1955 async move {
1956 futures::future::join_all(futures).await;
1957 }
1958 }
1959
1960 fn on_buffer_event(
1961 &mut self,
1962 buffer: Entity<Buffer>,
1963 event: &BufferEvent,
1964 cx: &mut Context<Self>,
1965 ) {
1966 match event {
1967 BufferEvent::FileHandleChanged => {
1968 if let Some(local) = self.as_local_mut() {
1969 local.buffer_changed_file(buffer, cx);
1970 }
1971 }
1972 BufferEvent::Reloaded => {
1973 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1974 return;
1975 };
1976 let buffer = buffer.read(cx);
1977 downstream_client
1978 .send(proto::BufferReloaded {
1979 project_id: *project_id,
1980 buffer_id: buffer.remote_id().to_proto(),
1981 version: serialize_version(&buffer.version()),
1982 mtime: buffer.saved_mtime().map(|t| t.into()),
1983 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1984 })
1985 .log_err();
1986 }
1987 BufferEvent::LanguageChanged => {
1988 let buffer_id = buffer.read(cx).remote_id();
1989 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1990 self.opened_buffers.get(&buffer_id)
1991 {
1992 diff_state.update(cx, |diff_state, cx| {
1993 diff_state.buffer_language_changed(buffer, cx);
1994 });
1995 }
1996 }
1997 _ => {}
1998 }
1999 }
2000
2001 pub async fn handle_update_buffer(
2002 this: Entity<Self>,
2003 envelope: TypedEnvelope<proto::UpdateBuffer>,
2004 mut cx: AsyncApp,
2005 ) -> Result<proto::Ack> {
2006 let payload = envelope.payload.clone();
2007 let buffer_id = BufferId::new(payload.buffer_id)?;
2008 let ops = payload
2009 .operations
2010 .into_iter()
2011 .map(language::proto::deserialize_operation)
2012 .collect::<Result<Vec<_>, _>>()?;
2013 this.update(&mut cx, |this, cx| {
2014 match this.opened_buffers.entry(buffer_id) {
2015 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2016 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2017 OpenBuffer::Complete { buffer, .. } => {
2018 if let Some(buffer) = buffer.upgrade() {
2019 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2020 }
2021 }
2022 },
2023 hash_map::Entry::Vacant(e) => {
2024 e.insert(OpenBuffer::Operations(ops));
2025 }
2026 }
2027 Ok(proto::Ack {})
2028 })?
2029 }
2030
2031 pub fn register_shared_lsp_handle(
2032 &mut self,
2033 peer_id: proto::PeerId,
2034 buffer_id: BufferId,
2035 handle: OpenLspBufferHandle,
2036 ) {
2037 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2038 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2039 buffer.lsp_handle = Some(handle);
2040 return;
2041 }
2042 }
2043 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2044 }
2045
2046 pub fn handle_synchronize_buffers(
2047 &mut self,
2048 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2049 cx: &mut Context<Self>,
2050 client: Arc<Client>,
2051 ) -> Result<proto::SynchronizeBuffersResponse> {
2052 let project_id = envelope.payload.project_id;
2053 let mut response = proto::SynchronizeBuffersResponse {
2054 buffers: Default::default(),
2055 };
2056 let Some(guest_id) = envelope.original_sender_id else {
2057 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2058 };
2059
2060 self.shared_buffers.entry(guest_id).or_default().clear();
2061 for buffer in envelope.payload.buffers {
2062 let buffer_id = BufferId::new(buffer.id)?;
2063 let remote_version = language::proto::deserialize_version(&buffer.version);
2064 if let Some(buffer) = self.get(buffer_id) {
2065 self.shared_buffers
2066 .entry(guest_id)
2067 .or_default()
2068 .entry(buffer_id)
2069 .or_insert_with(|| SharedBuffer {
2070 buffer: buffer.clone(),
2071 diff: None,
2072 lsp_handle: None,
2073 });
2074
2075 let buffer = buffer.read(cx);
2076 response.buffers.push(proto::BufferVersion {
2077 id: buffer_id.into(),
2078 version: language::proto::serialize_version(&buffer.version),
2079 });
2080
2081 let operations = buffer.serialize_ops(Some(remote_version), cx);
2082 let client = client.clone();
2083 if let Some(file) = buffer.file() {
2084 client
2085 .send(proto::UpdateBufferFile {
2086 project_id,
2087 buffer_id: buffer_id.into(),
2088 file: Some(file.to_proto(cx)),
2089 })
2090 .log_err();
2091 }
2092
2093 // TODO(max): do something
2094 // client
2095 // .send(proto::UpdateStagedText {
2096 // project_id,
2097 // buffer_id: buffer_id.into(),
2098 // diff_base: buffer.diff_base().map(ToString::to_string),
2099 // })
2100 // .log_err();
2101
2102 client
2103 .send(proto::BufferReloaded {
2104 project_id,
2105 buffer_id: buffer_id.into(),
2106 version: language::proto::serialize_version(buffer.saved_version()),
2107 mtime: buffer.saved_mtime().map(|time| time.into()),
2108 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2109 as i32,
2110 })
2111 .log_err();
2112
2113 cx.background_spawn(
2114 async move {
2115 let operations = operations.await;
2116 for chunk in split_operations(operations) {
2117 client
2118 .request(proto::UpdateBuffer {
2119 project_id,
2120 buffer_id: buffer_id.into(),
2121 operations: chunk,
2122 })
2123 .await?;
2124 }
2125 anyhow::Ok(())
2126 }
2127 .log_err(),
2128 )
2129 .detach();
2130 }
2131 }
2132 Ok(response)
2133 }
2134
2135 pub fn handle_create_buffer_for_peer(
2136 &mut self,
2137 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2138 replica_id: u16,
2139 capability: Capability,
2140 cx: &mut Context<Self>,
2141 ) -> Result<()> {
2142 let Some(remote) = self.as_remote_mut() else {
2143 return Err(anyhow!("buffer store is not a remote"));
2144 };
2145
2146 if let Some(buffer) =
2147 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2148 {
2149 self.add_buffer(buffer, cx)?;
2150 }
2151
2152 Ok(())
2153 }
2154
2155 pub async fn handle_update_buffer_file(
2156 this: Entity<Self>,
2157 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2158 mut cx: AsyncApp,
2159 ) -> Result<()> {
2160 let buffer_id = envelope.payload.buffer_id;
2161 let buffer_id = BufferId::new(buffer_id)?;
2162
2163 this.update(&mut cx, |this, cx| {
2164 let payload = envelope.payload.clone();
2165 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2166 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2167 let worktree = this
2168 .worktree_store
2169 .read(cx)
2170 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2171 .ok_or_else(|| anyhow!("no such worktree"))?;
2172 let file = File::from_proto(file, worktree, cx)?;
2173 let old_file = buffer.update(cx, |buffer, cx| {
2174 let old_file = buffer.file().cloned();
2175 let new_path = file.path.clone();
2176 buffer.file_updated(Arc::new(file), cx);
2177 if old_file
2178 .as_ref()
2179 .map_or(true, |old| *old.path() != new_path)
2180 {
2181 Some(old_file)
2182 } else {
2183 None
2184 }
2185 });
2186 if let Some(old_file) = old_file {
2187 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2188 }
2189 }
2190 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2191 downstream_client
2192 .send(proto::UpdateBufferFile {
2193 project_id: *project_id,
2194 buffer_id: buffer_id.into(),
2195 file: envelope.payload.file,
2196 })
2197 .log_err();
2198 }
2199 Ok(())
2200 })?
2201 }
2202
2203 pub async fn handle_save_buffer(
2204 this: Entity<Self>,
2205 envelope: TypedEnvelope<proto::SaveBuffer>,
2206 mut cx: AsyncApp,
2207 ) -> Result<proto::BufferSaved> {
2208 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2209 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2210 anyhow::Ok((
2211 this.get_existing(buffer_id)?,
2212 this.downstream_client
2213 .as_ref()
2214 .map(|(_, project_id)| *project_id)
2215 .context("project is not shared")?,
2216 ))
2217 })??;
2218 buffer
2219 .update(&mut cx, |buffer, _| {
2220 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2221 })?
2222 .await?;
2223 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2224
2225 if let Some(new_path) = envelope.payload.new_path {
2226 let new_path = ProjectPath::from_proto(new_path);
2227 this.update(&mut cx, |this, cx| {
2228 this.save_buffer_as(buffer.clone(), new_path, cx)
2229 })?
2230 .await?;
2231 } else {
2232 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2233 .await?;
2234 }
2235
2236 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2237 project_id,
2238 buffer_id: buffer_id.into(),
2239 version: serialize_version(buffer.saved_version()),
2240 mtime: buffer.saved_mtime().map(|time| time.into()),
2241 })
2242 }
2243
2244 pub async fn handle_close_buffer(
2245 this: Entity<Self>,
2246 envelope: TypedEnvelope<proto::CloseBuffer>,
2247 mut cx: AsyncApp,
2248 ) -> Result<()> {
2249 let peer_id = envelope.sender_id;
2250 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2251 this.update(&mut cx, |this, _| {
2252 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2253 if shared.remove(&buffer_id).is_some() {
2254 if shared.is_empty() {
2255 this.shared_buffers.remove(&peer_id);
2256 }
2257 return;
2258 }
2259 }
2260 debug_panic!(
2261 "peer_id {} closed buffer_id {} which was either not open or already closed",
2262 peer_id,
2263 buffer_id
2264 )
2265 })
2266 }
2267
2268 pub async fn handle_buffer_saved(
2269 this: Entity<Self>,
2270 envelope: TypedEnvelope<proto::BufferSaved>,
2271 mut cx: AsyncApp,
2272 ) -> Result<()> {
2273 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2274 let version = deserialize_version(&envelope.payload.version);
2275 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2276 this.update(&mut cx, move |this, cx| {
2277 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2278 buffer.update(cx, |buffer, cx| {
2279 buffer.did_save(version, mtime, cx);
2280 });
2281 }
2282
2283 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2284 downstream_client
2285 .send(proto::BufferSaved {
2286 project_id: *project_id,
2287 buffer_id: buffer_id.into(),
2288 mtime: envelope.payload.mtime,
2289 version: envelope.payload.version,
2290 })
2291 .log_err();
2292 }
2293 })
2294 }
2295
2296 pub async fn handle_buffer_reloaded(
2297 this: Entity<Self>,
2298 envelope: TypedEnvelope<proto::BufferReloaded>,
2299 mut cx: AsyncApp,
2300 ) -> Result<()> {
2301 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2302 let version = deserialize_version(&envelope.payload.version);
2303 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2304 let line_ending = deserialize_line_ending(
2305 proto::LineEnding::from_i32(envelope.payload.line_ending)
2306 .ok_or_else(|| anyhow!("missing line ending"))?,
2307 );
2308 this.update(&mut cx, |this, cx| {
2309 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2310 buffer.update(cx, |buffer, cx| {
2311 buffer.did_reload(version, line_ending, mtime, cx);
2312 });
2313 }
2314
2315 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2316 downstream_client
2317 .send(proto::BufferReloaded {
2318 project_id: *project_id,
2319 buffer_id: buffer_id.into(),
2320 mtime: envelope.payload.mtime,
2321 version: envelope.payload.version,
2322 line_ending: envelope.payload.line_ending,
2323 })
2324 .log_err();
2325 }
2326 })
2327 }
2328
2329 pub async fn handle_blame_buffer(
2330 this: Entity<Self>,
2331 envelope: TypedEnvelope<proto::BlameBuffer>,
2332 mut cx: AsyncApp,
2333 ) -> Result<proto::BlameBufferResponse> {
2334 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2335 let version = deserialize_version(&envelope.payload.version);
2336 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2337 buffer
2338 .update(&mut cx, |buffer, _| {
2339 buffer.wait_for_version(version.clone())
2340 })?
2341 .await?;
2342 let blame = this
2343 .update(&mut cx, |this, cx| {
2344 this.blame_buffer(&buffer, Some(version), cx)
2345 })?
2346 .await?;
2347 Ok(serialize_blame_buffer_response(blame))
2348 }
2349
2350 pub async fn handle_get_permalink_to_line(
2351 this: Entity<Self>,
2352 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2353 mut cx: AsyncApp,
2354 ) -> Result<proto::GetPermalinkToLineResponse> {
2355 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2356 // let version = deserialize_version(&envelope.payload.version);
2357 let selection = {
2358 let proto_selection = envelope
2359 .payload
2360 .selection
2361 .context("no selection to get permalink for defined")?;
2362 proto_selection.start as u32..proto_selection.end as u32
2363 };
2364 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2365 let permalink = this
2366 .update(&mut cx, |this, cx| {
2367 this.get_permalink_to_line(&buffer, selection, cx)
2368 })?
2369 .await?;
2370 Ok(proto::GetPermalinkToLineResponse {
2371 permalink: permalink.to_string(),
2372 })
2373 }
2374
2375 pub async fn handle_open_unstaged_diff(
2376 this: Entity<Self>,
2377 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2378 mut cx: AsyncApp,
2379 ) -> Result<proto::OpenUnstagedDiffResponse> {
2380 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2381 let diff = this
2382 .update(&mut cx, |this, cx| {
2383 let buffer = this.get(buffer_id)?;
2384 Some(this.open_unstaged_diff(buffer, cx))
2385 })?
2386 .ok_or_else(|| anyhow!("no such buffer"))?
2387 .await?;
2388 this.update(&mut cx, |this, _| {
2389 let shared_buffers = this
2390 .shared_buffers
2391 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2392 .or_default();
2393 debug_assert!(shared_buffers.contains_key(&buffer_id));
2394 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2395 shared.diff = Some(diff.clone());
2396 }
2397 })?;
2398 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2399 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2400 }
2401
2402 pub async fn handle_open_uncommitted_diff(
2403 this: Entity<Self>,
2404 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2405 mut cx: AsyncApp,
2406 ) -> Result<proto::OpenUncommittedDiffResponse> {
2407 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2408 let diff = this
2409 .update(&mut cx, |this, cx| {
2410 let buffer = this.get(buffer_id)?;
2411 Some(this.open_uncommitted_diff(buffer, cx))
2412 })?
2413 .ok_or_else(|| anyhow!("no such buffer"))?
2414 .await?;
2415 this.update(&mut cx, |this, _| {
2416 let shared_buffers = this
2417 .shared_buffers
2418 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2419 .or_default();
2420 debug_assert!(shared_buffers.contains_key(&buffer_id));
2421 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2422 shared.diff = Some(diff.clone());
2423 }
2424 })?;
2425 diff.read_with(&cx, |diff, cx| {
2426 use proto::open_uncommitted_diff_response::Mode;
2427
2428 let unstaged_diff = diff.secondary_diff();
2429 let index_snapshot = unstaged_diff.and_then(|diff| {
2430 let diff = diff.read(cx);
2431 diff.base_text_exists().then(|| diff.base_text())
2432 });
2433
2434 let mode;
2435 let staged_text;
2436 let committed_text;
2437 if diff.base_text_exists() {
2438 let committed_snapshot = diff.base_text();
2439 committed_text = Some(committed_snapshot.text());
2440 if let Some(index_text) = index_snapshot {
2441 if index_text.remote_id() == committed_snapshot.remote_id() {
2442 mode = Mode::IndexMatchesHead;
2443 staged_text = None;
2444 } else {
2445 mode = Mode::IndexAndHead;
2446 staged_text = Some(index_text.text());
2447 }
2448 } else {
2449 mode = Mode::IndexAndHead;
2450 staged_text = None;
2451 }
2452 } else {
2453 mode = Mode::IndexAndHead;
2454 committed_text = None;
2455 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2456 }
2457
2458 proto::OpenUncommittedDiffResponse {
2459 committed_text,
2460 staged_text,
2461 mode: mode.into(),
2462 }
2463 })
2464 }
2465
2466 pub async fn handle_update_diff_bases(
2467 this: Entity<Self>,
2468 request: TypedEnvelope<proto::UpdateDiffBases>,
2469 mut cx: AsyncApp,
2470 ) -> Result<()> {
2471 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2472 this.update(&mut cx, |this, cx| {
2473 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2474 this.opened_buffers.get_mut(&buffer_id)
2475 {
2476 if let Some(buffer) = buffer.upgrade() {
2477 let buffer = buffer.read(cx).text_snapshot();
2478 diff_state.update(cx, |diff_state, cx| {
2479 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2480 })
2481 }
2482 }
2483 })
2484 }
2485
2486 pub fn reload_buffers(
2487 &self,
2488 buffers: HashSet<Entity<Buffer>>,
2489 push_to_history: bool,
2490 cx: &mut Context<Self>,
2491 ) -> Task<Result<ProjectTransaction>> {
2492 if buffers.is_empty() {
2493 return Task::ready(Ok(ProjectTransaction::default()));
2494 }
2495 match &self.state {
2496 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2497 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2498 }
2499 }
2500
2501 async fn handle_reload_buffers(
2502 this: Entity<Self>,
2503 envelope: TypedEnvelope<proto::ReloadBuffers>,
2504 mut cx: AsyncApp,
2505 ) -> Result<proto::ReloadBuffersResponse> {
2506 let sender_id = envelope.original_sender_id().unwrap_or_default();
2507 let reload = this.update(&mut cx, |this, cx| {
2508 let mut buffers = HashSet::default();
2509 for buffer_id in &envelope.payload.buffer_ids {
2510 let buffer_id = BufferId::new(*buffer_id)?;
2511 buffers.insert(this.get_existing(buffer_id)?);
2512 }
2513 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2514 })??;
2515
2516 let project_transaction = reload.await?;
2517 let project_transaction = this.update(&mut cx, |this, cx| {
2518 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2519 })?;
2520 Ok(proto::ReloadBuffersResponse {
2521 transaction: Some(project_transaction),
2522 })
2523 }
2524
2525 pub fn create_buffer_for_peer(
2526 &mut self,
2527 buffer: &Entity<Buffer>,
2528 peer_id: proto::PeerId,
2529 cx: &mut Context<Self>,
2530 ) -> Task<Result<()>> {
2531 let buffer_id = buffer.read(cx).remote_id();
2532 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2533 if shared_buffers.contains_key(&buffer_id) {
2534 return Task::ready(Ok(()));
2535 }
2536 shared_buffers.insert(
2537 buffer_id,
2538 SharedBuffer {
2539 buffer: buffer.clone(),
2540 diff: None,
2541 lsp_handle: None,
2542 },
2543 );
2544
2545 let Some((client, project_id)) = self.downstream_client.clone() else {
2546 return Task::ready(Ok(()));
2547 };
2548
2549 cx.spawn(|this, mut cx| async move {
2550 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2551 return anyhow::Ok(());
2552 };
2553
2554 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2555 let operations = operations.await;
2556 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2557
2558 let initial_state = proto::CreateBufferForPeer {
2559 project_id,
2560 peer_id: Some(peer_id),
2561 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2562 };
2563
2564 if client.send(initial_state).log_err().is_some() {
2565 let client = client.clone();
2566 cx.background_spawn(async move {
2567 let mut chunks = split_operations(operations).peekable();
2568 while let Some(chunk) = chunks.next() {
2569 let is_last = chunks.peek().is_none();
2570 client.send(proto::CreateBufferForPeer {
2571 project_id,
2572 peer_id: Some(peer_id),
2573 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2574 proto::BufferChunk {
2575 buffer_id: buffer_id.into(),
2576 operations: chunk,
2577 is_last,
2578 },
2579 )),
2580 })?;
2581 }
2582 anyhow::Ok(())
2583 })
2584 .await
2585 .log_err();
2586 }
2587 Ok(())
2588 })
2589 }
2590
2591 pub fn forget_shared_buffers(&mut self) {
2592 self.shared_buffers.clear();
2593 }
2594
2595 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2596 self.shared_buffers.remove(peer_id);
2597 }
2598
2599 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2600 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2601 self.shared_buffers.insert(new_peer_id, buffers);
2602 }
2603 }
2604
2605 pub fn has_shared_buffers(&self) -> bool {
2606 !self.shared_buffers.is_empty()
2607 }
2608
2609 pub fn create_local_buffer(
2610 &mut self,
2611 text: &str,
2612 language: Option<Arc<Language>>,
2613 cx: &mut Context<Self>,
2614 ) -> Entity<Buffer> {
2615 let buffer = cx.new(|cx| {
2616 Buffer::local(text, cx)
2617 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2618 });
2619
2620 self.add_buffer(buffer.clone(), cx).log_err();
2621 let buffer_id = buffer.read(cx).remote_id();
2622
2623 let this = self
2624 .as_local_mut()
2625 .expect("local-only method called in a non-local context");
2626 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2627 this.local_buffer_ids_by_path.insert(
2628 ProjectPath {
2629 worktree_id: file.worktree_id(cx),
2630 path: file.path.clone(),
2631 },
2632 buffer_id,
2633 );
2634
2635 if let Some(entry_id) = file.entry_id {
2636 this.local_buffer_ids_by_entry_id
2637 .insert(entry_id, buffer_id);
2638 }
2639 }
2640 buffer
2641 }
2642
2643 pub fn deserialize_project_transaction(
2644 &mut self,
2645 message: proto::ProjectTransaction,
2646 push_to_history: bool,
2647 cx: &mut Context<Self>,
2648 ) -> Task<Result<ProjectTransaction>> {
2649 if let Some(this) = self.as_remote_mut() {
2650 this.deserialize_project_transaction(message, push_to_history, cx)
2651 } else {
2652 debug_panic!("not a remote buffer store");
2653 Task::ready(Err(anyhow!("not a remote buffer store")))
2654 }
2655 }
2656
2657 pub fn wait_for_remote_buffer(
2658 &mut self,
2659 id: BufferId,
2660 cx: &mut Context<BufferStore>,
2661 ) -> Task<Result<Entity<Buffer>>> {
2662 if let Some(this) = self.as_remote_mut() {
2663 this.wait_for_remote_buffer(id, cx)
2664 } else {
2665 debug_panic!("not a remote buffer store");
2666 Task::ready(Err(anyhow!("not a remote buffer store")))
2667 }
2668 }
2669
2670 pub fn serialize_project_transaction_for_peer(
2671 &mut self,
2672 project_transaction: ProjectTransaction,
2673 peer_id: proto::PeerId,
2674 cx: &mut Context<Self>,
2675 ) -> proto::ProjectTransaction {
2676 let mut serialized_transaction = proto::ProjectTransaction {
2677 buffer_ids: Default::default(),
2678 transactions: Default::default(),
2679 };
2680 for (buffer, transaction) in project_transaction.0 {
2681 self.create_buffer_for_peer(&buffer, peer_id, cx)
2682 .detach_and_log_err(cx);
2683 serialized_transaction
2684 .buffer_ids
2685 .push(buffer.read(cx).remote_id().into());
2686 serialized_transaction
2687 .transactions
2688 .push(language::proto::serialize_transaction(&transaction));
2689 }
2690 serialized_transaction
2691 }
2692}
2693
2694impl OpenBuffer {
2695 fn upgrade(&self) -> Option<Entity<Buffer>> {
2696 match self {
2697 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2698 OpenBuffer::Operations(_) => None,
2699 }
2700 }
2701}
2702
2703fn is_not_found_error(error: &anyhow::Error) -> bool {
2704 error
2705 .root_cause()
2706 .downcast_ref::<io::Error>()
2707 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2708}
2709
2710fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2711 let Some(blame) = blame else {
2712 return proto::BlameBufferResponse {
2713 blame_response: None,
2714 };
2715 };
2716
2717 let entries = blame
2718 .entries
2719 .into_iter()
2720 .map(|entry| proto::BlameEntry {
2721 sha: entry.sha.as_bytes().into(),
2722 start_line: entry.range.start,
2723 end_line: entry.range.end,
2724 original_line_number: entry.original_line_number,
2725 author: entry.author.clone(),
2726 author_mail: entry.author_mail.clone(),
2727 author_time: entry.author_time,
2728 author_tz: entry.author_tz.clone(),
2729 committer: entry.committer_name.clone(),
2730 committer_mail: entry.committer_email.clone(),
2731 committer_time: entry.committer_time,
2732 committer_tz: entry.committer_tz.clone(),
2733 summary: entry.summary.clone(),
2734 previous: entry.previous.clone(),
2735 filename: entry.filename.clone(),
2736 })
2737 .collect::<Vec<_>>();
2738
2739 let messages = blame
2740 .messages
2741 .into_iter()
2742 .map(|(oid, message)| proto::CommitMessage {
2743 oid: oid.as_bytes().into(),
2744 message,
2745 })
2746 .collect::<Vec<_>>();
2747
2748 let permalinks = blame
2749 .permalinks
2750 .into_iter()
2751 .map(|(oid, url)| proto::CommitPermalink {
2752 oid: oid.as_bytes().into(),
2753 permalink: url.to_string(),
2754 })
2755 .collect::<Vec<_>>();
2756
2757 proto::BlameBufferResponse {
2758 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2759 entries,
2760 messages,
2761 permalinks,
2762 remote_url: blame.remote_url,
2763 }),
2764 }
2765}
2766
2767fn deserialize_blame_buffer_response(
2768 response: proto::BlameBufferResponse,
2769) -> Option<git::blame::Blame> {
2770 let response = response.blame_response?;
2771 let entries = response
2772 .entries
2773 .into_iter()
2774 .filter_map(|entry| {
2775 Some(git::blame::BlameEntry {
2776 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2777 range: entry.start_line..entry.end_line,
2778 original_line_number: entry.original_line_number,
2779 committer_name: entry.committer,
2780 committer_time: entry.committer_time,
2781 committer_tz: entry.committer_tz,
2782 committer_email: entry.committer_mail,
2783 author: entry.author,
2784 author_mail: entry.author_mail,
2785 author_time: entry.author_time,
2786 author_tz: entry.author_tz,
2787 summary: entry.summary,
2788 previous: entry.previous,
2789 filename: entry.filename,
2790 })
2791 })
2792 .collect::<Vec<_>>();
2793
2794 let messages = response
2795 .messages
2796 .into_iter()
2797 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2798 .collect::<HashMap<_, _>>();
2799
2800 let permalinks = response
2801 .permalinks
2802 .into_iter()
2803 .filter_map(|permalink| {
2804 Some((
2805 git::Oid::from_bytes(&permalink.oid).ok()?,
2806 Url::from_str(&permalink.permalink).ok()?,
2807 ))
2808 })
2809 .collect::<HashMap<_, _>>();
2810
2811 Some(Blame {
2812 entries,
2813 permalinks,
2814 messages,
2815 remote_url: response.remote_url,
2816 })
2817}
2818
2819fn get_permalink_in_rust_registry_src(
2820 provider_registry: Arc<GitHostingProviderRegistry>,
2821 path: PathBuf,
2822 selection: Range<u32>,
2823) -> Result<url::Url> {
2824 #[derive(Deserialize)]
2825 struct CargoVcsGit {
2826 sha1: String,
2827 }
2828
2829 #[derive(Deserialize)]
2830 struct CargoVcsInfo {
2831 git: CargoVcsGit,
2832 path_in_vcs: String,
2833 }
2834
2835 #[derive(Deserialize)]
2836 struct CargoPackage {
2837 repository: String,
2838 }
2839
2840 #[derive(Deserialize)]
2841 struct CargoToml {
2842 package: CargoPackage,
2843 }
2844
2845 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2846 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2847 Some((dir, json))
2848 }) else {
2849 bail!("No .cargo_vcs_info.json found in parent directories")
2850 };
2851 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2852 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2853 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2854 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2855 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2856 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2857 let permalink = provider.build_permalink(
2858 remote,
2859 BuildPermalinkParams {
2860 sha: &cargo_vcs_info.git.sha1,
2861 path: &path.to_string_lossy(),
2862 selection: Some(selection),
2863 },
2864 );
2865 Ok(permalink)
2866}