1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use buffer_diff::{BufferDiff, BufferDiffEvent};
10use client::Client;
11use collections::{hash_map, HashMap, HashSet};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{
27 proto::{self, ToProto},
28 AnyProtoClient, ErrorExt as _, TypedEnvelope,
29};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::BufferId;
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum DiffKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_diffs:
58 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
59 worktree_store: Entity<WorktreeStore>,
60 opened_buffers: HashMap<BufferId, OpenBuffer>,
61 downstream_client: Option<(AnyProtoClient, u64)>,
62 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
63}
64
65#[derive(Hash, Eq, PartialEq, Clone)]
66struct SharedBuffer {
67 buffer: Entity<Buffer>,
68 diff: Option<Entity<BufferDiff>>,
69 lsp_handle: Option<OpenLspBufferHandle>,
70}
71
72#[derive(Default)]
73struct BufferDiffState {
74 unstaged_diff: Option<WeakEntity<BufferDiff>>,
75 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
76 recalculate_diff_task: Option<Task<Result<()>>>,
77 language: Option<Arc<Language>>,
78 language_registry: Option<Arc<LanguageRegistry>>,
79 diff_updated_futures: Vec<oneshot::Sender<()>>,
80
81 head_text: Option<Arc<String>>,
82 index_text: Option<Arc<String>>,
83 head_changed: bool,
84 index_changed: bool,
85 language_changed: bool,
86}
87
88#[derive(Clone, Debug)]
89enum DiffBasesChange {
90 SetIndex(Option<String>),
91 SetHead(Option<String>),
92 SetEach {
93 index: Option<String>,
94 head: Option<String>,
95 },
96 SetBoth(Option<String>),
97}
98
99impl BufferDiffState {
100 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
101 self.language = buffer.read(cx).language().cloned();
102 self.language_changed = true;
103 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
104 }
105
106 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
107 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
108 }
109
110 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
111 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
112 }
113
114 fn handle_base_texts_updated(
115 &mut self,
116 buffer: text::BufferSnapshot,
117 message: proto::UpdateDiffBases,
118 cx: &mut Context<Self>,
119 ) {
120 use proto::update_diff_bases::Mode;
121
122 let Some(mode) = Mode::from_i32(message.mode) else {
123 return;
124 };
125
126 let diff_bases_change = match mode {
127 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
128 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
129 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
130 Mode::IndexAndHead => DiffBasesChange::SetEach {
131 index: message.staged_text,
132 head: message.committed_text,
133 },
134 };
135
136 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
137 }
138
139 pub fn wait_for_recalculation(&mut self) -> Option<oneshot::Receiver<()>> {
140 if self.diff_updated_futures.is_empty() {
141 return None;
142 }
143 let (tx, rx) = oneshot::channel();
144 self.diff_updated_futures.push(tx);
145 Some(rx)
146 }
147
148 fn diff_bases_changed(
149 &mut self,
150 buffer: text::BufferSnapshot,
151 diff_bases_change: DiffBasesChange,
152 cx: &mut Context<Self>,
153 ) -> oneshot::Receiver<()> {
154 match diff_bases_change {
155 DiffBasesChange::SetIndex(index) => {
156 self.index_text = index.map(|mut index| {
157 text::LineEnding::normalize(&mut index);
158 Arc::new(index)
159 });
160 self.index_changed = true;
161 }
162 DiffBasesChange::SetHead(head) => {
163 self.head_text = head.map(|mut head| {
164 text::LineEnding::normalize(&mut head);
165 Arc::new(head)
166 });
167 self.head_changed = true;
168 }
169 DiffBasesChange::SetBoth(text) => {
170 let text = text.map(|mut text| {
171 text::LineEnding::normalize(&mut text);
172 Arc::new(text)
173 });
174 self.head_text = text.clone();
175 self.index_text = text;
176 self.head_changed = true;
177 self.index_changed = true;
178 }
179 DiffBasesChange::SetEach { index, head } => {
180 self.index_text = index.map(|mut index| {
181 text::LineEnding::normalize(&mut index);
182 Arc::new(index)
183 });
184 self.index_changed = true;
185 self.head_text = head.map(|mut head| {
186 text::LineEnding::normalize(&mut head);
187 Arc::new(head)
188 });
189 self.head_changed = true;
190 }
191 }
192
193 self.recalculate_diffs(buffer, cx)
194 }
195
196 fn recalculate_diffs(
197 &mut self,
198 buffer: text::BufferSnapshot,
199 cx: &mut Context<Self>,
200 ) -> oneshot::Receiver<()> {
201 log::debug!("recalculate diffs");
202 let (tx, rx) = oneshot::channel();
203 self.diff_updated_futures.push(tx);
204
205 let language = self.language.clone();
206 let language_registry = self.language_registry.clone();
207 let unstaged_diff = self.unstaged_diff();
208 let uncommitted_diff = self.uncommitted_diff();
209 let head = self.head_text.clone();
210 let index = self.index_text.clone();
211 let index_changed = self.index_changed;
212 let head_changed = self.head_changed;
213 let language_changed = self.language_changed;
214 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
215 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
216 (None, None) => true,
217 _ => false,
218 };
219 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
220 let mut unstaged_changed_range = None;
221 if let Some(unstaged_diff) = &unstaged_diff {
222 unstaged_changed_range = BufferDiff::update_diff(
223 unstaged_diff.clone(),
224 buffer.clone(),
225 index,
226 index_changed,
227 language_changed,
228 language.clone(),
229 language_registry.clone(),
230 &mut cx,
231 )
232 .await?;
233
234 unstaged_diff.update(&mut cx, |_, cx| {
235 if language_changed {
236 cx.emit(BufferDiffEvent::LanguageChanged);
237 }
238 if let Some(changed_range) = unstaged_changed_range.clone() {
239 cx.emit(BufferDiffEvent::DiffChanged {
240 changed_range: Some(changed_range),
241 })
242 }
243 })?;
244 }
245
246 if let Some(uncommitted_diff) = &uncommitted_diff {
247 let uncommitted_changed_range =
248 if let (Some(unstaged_diff), true) = (&unstaged_diff, index_matches_head) {
249 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
250 uncommitted_diff.update_diff_from(&buffer, unstaged_diff, cx)
251 })?
252 } else {
253 BufferDiff::update_diff(
254 uncommitted_diff.clone(),
255 buffer.clone(),
256 head,
257 head_changed,
258 language_changed,
259 language.clone(),
260 language_registry.clone(),
261 &mut cx,
262 )
263 .await?
264 };
265
266 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
267 if language_changed {
268 cx.emit(BufferDiffEvent::LanguageChanged);
269 }
270 let changed_range = match (unstaged_changed_range, uncommitted_changed_range) {
271 (None, None) => None,
272 (Some(unstaged_range), None) => {
273 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
274 }
275 (None, Some(uncommitted_range)) => Some(uncommitted_range),
276 (Some(unstaged_range), Some(uncommitted_range)) => {
277 let mut start = uncommitted_range.start;
278 let mut end = uncommitted_range.end;
279 if let Some(unstaged_range) =
280 uncommitted_diff.range_to_hunk_range(unstaged_range, &buffer, cx)
281 {
282 start = unstaged_range.start.min(&uncommitted_range.start, &buffer);
283 end = unstaged_range.end.max(&uncommitted_range.end, &buffer);
284 }
285 Some(start..end)
286 }
287 };
288 cx.emit(BufferDiffEvent::DiffChanged { changed_range });
289 })?;
290 }
291
292 if let Some(this) = this.upgrade() {
293 this.update(&mut cx, |this, _| {
294 this.index_changed = false;
295 this.head_changed = false;
296 this.language_changed = false;
297 for tx in this.diff_updated_futures.drain(..) {
298 tx.send(()).ok();
299 }
300 })?;
301 }
302
303 Ok(())
304 }));
305
306 rx
307 }
308}
309
310enum BufferStoreState {
311 Local(LocalBufferStore),
312 Remote(RemoteBufferStore),
313}
314
315struct RemoteBufferStore {
316 shared_with_me: HashSet<Entity<Buffer>>,
317 upstream_client: AnyProtoClient,
318 project_id: u64,
319 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
320 remote_buffer_listeners:
321 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
322 worktree_store: Entity<WorktreeStore>,
323}
324
325struct LocalBufferStore {
326 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
327 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
328 worktree_store: Entity<WorktreeStore>,
329 _subscription: Subscription,
330}
331
332enum OpenBuffer {
333 Complete {
334 buffer: WeakEntity<Buffer>,
335 diff_state: Entity<BufferDiffState>,
336 },
337 Operations(Vec<Operation>),
338}
339
340pub enum BufferStoreEvent {
341 BufferAdded(Entity<Buffer>),
342 BufferDropped(BufferId),
343 BufferChangedFilePath {
344 buffer: Entity<Buffer>,
345 old_file: Option<Arc<dyn language::File>>,
346 },
347}
348
349#[derive(Default, Debug)]
350pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
351
352impl EventEmitter<BufferStoreEvent> for BufferStore {}
353
354impl RemoteBufferStore {
355 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
356 let project_id = self.project_id;
357 let client = self.upstream_client.clone();
358 cx.background_spawn(async move {
359 let response = client
360 .request(proto::OpenUnstagedDiff {
361 project_id,
362 buffer_id: buffer_id.to_proto(),
363 })
364 .await?;
365 Ok(response.staged_text)
366 })
367 }
368
369 fn open_uncommitted_diff(
370 &self,
371 buffer_id: BufferId,
372 cx: &App,
373 ) -> Task<Result<DiffBasesChange>> {
374 use proto::open_uncommitted_diff_response::Mode;
375
376 let project_id = self.project_id;
377 let client = self.upstream_client.clone();
378 cx.background_spawn(async move {
379 let response = client
380 .request(proto::OpenUncommittedDiff {
381 project_id,
382 buffer_id: buffer_id.to_proto(),
383 })
384 .await?;
385 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
386 let bases = match mode {
387 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
388 Mode::IndexAndHead => DiffBasesChange::SetEach {
389 head: response.committed_text,
390 index: response.staged_text,
391 },
392 };
393 Ok(bases)
394 })
395 }
396
397 pub fn wait_for_remote_buffer(
398 &mut self,
399 id: BufferId,
400 cx: &mut Context<BufferStore>,
401 ) -> Task<Result<Entity<Buffer>>> {
402 let (tx, rx) = oneshot::channel();
403 self.remote_buffer_listeners.entry(id).or_default().push(tx);
404
405 cx.spawn(|this, cx| async move {
406 if let Some(buffer) = this
407 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
408 .ok()
409 .flatten()
410 {
411 return Ok(buffer);
412 }
413
414 cx.background_spawn(async move { rx.await? }).await
415 })
416 }
417
418 fn save_remote_buffer(
419 &self,
420 buffer_handle: Entity<Buffer>,
421 new_path: Option<proto::ProjectPath>,
422 cx: &Context<BufferStore>,
423 ) -> Task<Result<()>> {
424 let buffer = buffer_handle.read(cx);
425 let buffer_id = buffer.remote_id().into();
426 let version = buffer.version();
427 let rpc = self.upstream_client.clone();
428 let project_id = self.project_id;
429 cx.spawn(move |_, mut cx| async move {
430 let response = rpc
431 .request(proto::SaveBuffer {
432 project_id,
433 buffer_id,
434 new_path,
435 version: serialize_version(&version),
436 })
437 .await?;
438 let version = deserialize_version(&response.version);
439 let mtime = response.mtime.map(|mtime| mtime.into());
440
441 buffer_handle.update(&mut cx, |buffer, cx| {
442 buffer.did_save(version.clone(), mtime, cx);
443 })?;
444
445 Ok(())
446 })
447 }
448
449 pub fn handle_create_buffer_for_peer(
450 &mut self,
451 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
452 replica_id: u16,
453 capability: Capability,
454 cx: &mut Context<BufferStore>,
455 ) -> Result<Option<Entity<Buffer>>> {
456 match envelope
457 .payload
458 .variant
459 .ok_or_else(|| anyhow!("missing variant"))?
460 {
461 proto::create_buffer_for_peer::Variant::State(mut state) => {
462 let buffer_id = BufferId::new(state.id)?;
463
464 let buffer_result = maybe!({
465 let mut buffer_file = None;
466 if let Some(file) = state.file.take() {
467 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
468 let worktree = self
469 .worktree_store
470 .read(cx)
471 .worktree_for_id(worktree_id, cx)
472 .ok_or_else(|| {
473 anyhow!("no worktree found for id {}", file.worktree_id)
474 })?;
475 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
476 as Arc<dyn language::File>);
477 }
478 Buffer::from_proto(replica_id, capability, state, buffer_file)
479 });
480
481 match buffer_result {
482 Ok(buffer) => {
483 let buffer = cx.new(|_| buffer);
484 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
485 }
486 Err(error) => {
487 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
488 for listener in listeners {
489 listener.send(Err(anyhow!(error.cloned()))).ok();
490 }
491 }
492 }
493 }
494 }
495 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
496 let buffer_id = BufferId::new(chunk.buffer_id)?;
497 let buffer = self
498 .loading_remote_buffers_by_id
499 .get(&buffer_id)
500 .cloned()
501 .ok_or_else(|| {
502 anyhow!(
503 "received chunk for buffer {} without initial state",
504 chunk.buffer_id
505 )
506 })?;
507
508 let result = maybe!({
509 let operations = chunk
510 .operations
511 .into_iter()
512 .map(language::proto::deserialize_operation)
513 .collect::<Result<Vec<_>>>()?;
514 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
515 anyhow::Ok(())
516 });
517
518 if let Err(error) = result {
519 self.loading_remote_buffers_by_id.remove(&buffer_id);
520 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
521 for listener in listeners {
522 listener.send(Err(error.cloned())).ok();
523 }
524 }
525 } else if chunk.is_last {
526 self.loading_remote_buffers_by_id.remove(&buffer_id);
527 if self.upstream_client.is_via_collab() {
528 // retain buffers sent by peers to avoid races.
529 self.shared_with_me.insert(buffer.clone());
530 }
531
532 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
533 for sender in senders {
534 sender.send(Ok(buffer.clone())).ok();
535 }
536 }
537 return Ok(Some(buffer));
538 }
539 }
540 }
541 return Ok(None);
542 }
543
544 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
545 self.loading_remote_buffers_by_id
546 .keys()
547 .copied()
548 .collect::<Vec<_>>()
549 }
550
551 pub fn deserialize_project_transaction(
552 &self,
553 message: proto::ProjectTransaction,
554 push_to_history: bool,
555 cx: &mut Context<BufferStore>,
556 ) -> Task<Result<ProjectTransaction>> {
557 cx.spawn(|this, mut cx| async move {
558 let mut project_transaction = ProjectTransaction::default();
559 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
560 {
561 let buffer_id = BufferId::new(buffer_id)?;
562 let buffer = this
563 .update(&mut cx, |this, cx| {
564 this.wait_for_remote_buffer(buffer_id, cx)
565 })?
566 .await?;
567 let transaction = language::proto::deserialize_transaction(transaction)?;
568 project_transaction.0.insert(buffer, transaction);
569 }
570
571 for (buffer, transaction) in &project_transaction.0 {
572 buffer
573 .update(&mut cx, |buffer, _| {
574 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
575 })?
576 .await?;
577
578 if push_to_history {
579 buffer.update(&mut cx, |buffer, _| {
580 buffer.push_transaction(transaction.clone(), Instant::now());
581 })?;
582 }
583 }
584
585 Ok(project_transaction)
586 })
587 }
588
589 fn open_buffer(
590 &self,
591 path: Arc<Path>,
592 worktree: Entity<Worktree>,
593 cx: &mut Context<BufferStore>,
594 ) -> Task<Result<Entity<Buffer>>> {
595 let worktree_id = worktree.read(cx).id().to_proto();
596 let project_id = self.project_id;
597 let client = self.upstream_client.clone();
598 cx.spawn(move |this, mut cx| async move {
599 let response = client
600 .request(proto::OpenBufferByPath {
601 project_id,
602 worktree_id,
603 path: path.to_proto(),
604 })
605 .await?;
606 let buffer_id = BufferId::new(response.buffer_id)?;
607
608 let buffer = this
609 .update(&mut cx, {
610 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
611 })?
612 .await?;
613
614 Ok(buffer)
615 })
616 }
617
618 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
619 let create = self.upstream_client.request(proto::OpenNewBuffer {
620 project_id: self.project_id,
621 });
622 cx.spawn(|this, mut cx| async move {
623 let response = create.await?;
624 let buffer_id = BufferId::new(response.buffer_id)?;
625
626 this.update(&mut cx, |this, cx| {
627 this.wait_for_remote_buffer(buffer_id, cx)
628 })?
629 .await
630 })
631 }
632
633 fn reload_buffers(
634 &self,
635 buffers: HashSet<Entity<Buffer>>,
636 push_to_history: bool,
637 cx: &mut Context<BufferStore>,
638 ) -> Task<Result<ProjectTransaction>> {
639 let request = self.upstream_client.request(proto::ReloadBuffers {
640 project_id: self.project_id,
641 buffer_ids: buffers
642 .iter()
643 .map(|buffer| buffer.read(cx).remote_id().to_proto())
644 .collect(),
645 });
646
647 cx.spawn(|this, mut cx| async move {
648 let response = request
649 .await?
650 .transaction
651 .ok_or_else(|| anyhow!("missing transaction"))?;
652 this.update(&mut cx, |this, cx| {
653 this.deserialize_project_transaction(response, push_to_history, cx)
654 })?
655 .await
656 })
657 }
658}
659
660impl LocalBufferStore {
661 fn worktree_for_buffer(
662 &self,
663 buffer: &Entity<Buffer>,
664 cx: &App,
665 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
666 let file = buffer.read(cx).file()?;
667 let worktree_id = file.worktree_id(cx);
668 let path = file.path().clone();
669 let worktree = self
670 .worktree_store
671 .read(cx)
672 .worktree_for_id(worktree_id, cx)?;
673 Some((worktree, path))
674 }
675
676 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
677 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
678 worktree.read(cx).load_staged_file(path.as_ref(), cx)
679 } else {
680 return Task::ready(Err(anyhow!("no such worktree")));
681 }
682 }
683
684 fn load_committed_text(
685 &self,
686 buffer: &Entity<Buffer>,
687 cx: &App,
688 ) -> Task<Result<Option<String>>> {
689 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
690 worktree.read(cx).load_committed_file(path.as_ref(), cx)
691 } else {
692 Task::ready(Err(anyhow!("no such worktree")))
693 }
694 }
695
696 fn save_local_buffer(
697 &self,
698 buffer_handle: Entity<Buffer>,
699 worktree: Entity<Worktree>,
700 path: Arc<Path>,
701 mut has_changed_file: bool,
702 cx: &mut Context<BufferStore>,
703 ) -> Task<Result<()>> {
704 let buffer = buffer_handle.read(cx);
705
706 let text = buffer.as_rope().clone();
707 let line_ending = buffer.line_ending();
708 let version = buffer.version();
709 let buffer_id = buffer.remote_id();
710 if buffer
711 .file()
712 .is_some_and(|file| file.disk_state() == DiskState::New)
713 {
714 has_changed_file = true;
715 }
716
717 let save = worktree.update(cx, |worktree, cx| {
718 worktree.write_file(path.as_ref(), text, line_ending, cx)
719 });
720
721 cx.spawn(move |this, mut cx| async move {
722 let new_file = save.await?;
723 let mtime = new_file.disk_state().mtime();
724 this.update(&mut cx, |this, cx| {
725 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
726 if has_changed_file {
727 downstream_client
728 .send(proto::UpdateBufferFile {
729 project_id,
730 buffer_id: buffer_id.to_proto(),
731 file: Some(language::File::to_proto(&*new_file, cx)),
732 })
733 .log_err();
734 }
735 downstream_client
736 .send(proto::BufferSaved {
737 project_id,
738 buffer_id: buffer_id.to_proto(),
739 version: serialize_version(&version),
740 mtime: mtime.map(|time| time.into()),
741 })
742 .log_err();
743 }
744 })?;
745 buffer_handle.update(&mut cx, |buffer, cx| {
746 if has_changed_file {
747 buffer.file_updated(new_file, cx);
748 }
749 buffer.did_save(version.clone(), mtime, cx);
750 })
751 })
752 }
753
754 fn subscribe_to_worktree(
755 &mut self,
756 worktree: &Entity<Worktree>,
757 cx: &mut Context<BufferStore>,
758 ) {
759 cx.subscribe(worktree, |this, worktree, event, cx| {
760 if worktree.read(cx).is_local() {
761 match event {
762 worktree::Event::UpdatedEntries(changes) => {
763 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
764 }
765 worktree::Event::UpdatedGitRepositories(updated_repos) => {
766 Self::local_worktree_git_repos_changed(
767 this,
768 worktree.clone(),
769 updated_repos,
770 cx,
771 )
772 }
773 _ => {}
774 }
775 }
776 })
777 .detach();
778 }
779
780 fn local_worktree_entries_changed(
781 this: &mut BufferStore,
782 worktree_handle: &Entity<Worktree>,
783 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
784 cx: &mut Context<BufferStore>,
785 ) {
786 let snapshot = worktree_handle.read(cx).snapshot();
787 for (path, entry_id, _) in changes {
788 Self::local_worktree_entry_changed(
789 this,
790 *entry_id,
791 path,
792 worktree_handle,
793 &snapshot,
794 cx,
795 );
796 }
797 }
798
799 fn local_worktree_git_repos_changed(
800 this: &mut BufferStore,
801 worktree_handle: Entity<Worktree>,
802 changed_repos: &UpdatedGitRepositoriesSet,
803 cx: &mut Context<BufferStore>,
804 ) {
805 debug_assert!(worktree_handle.read(cx).is_local());
806
807 let mut diff_state_updates = Vec::new();
808 for buffer in this.opened_buffers.values() {
809 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
810 continue;
811 };
812 let Some(buffer) = buffer.upgrade() else {
813 continue;
814 };
815 let buffer = buffer.read(cx);
816 let Some(file) = File::from_dyn(buffer.file()) else {
817 continue;
818 };
819 if file.worktree != worktree_handle {
820 continue;
821 }
822 let diff_state = diff_state.read(cx);
823 if changed_repos
824 .iter()
825 .any(|(work_dir, _)| file.path.starts_with(work_dir))
826 {
827 let snapshot = buffer.text_snapshot();
828 let has_unstaged_diff = diff_state
829 .unstaged_diff
830 .as_ref()
831 .is_some_and(|diff| diff.is_upgradable());
832 let has_uncommitted_diff = diff_state
833 .uncommitted_diff
834 .as_ref()
835 .is_some_and(|set| set.is_upgradable());
836 diff_state_updates.push((
837 snapshot.clone(),
838 file.path.clone(),
839 has_unstaged_diff.then(|| diff_state.index_text.clone()),
840 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
841 ));
842 }
843 }
844
845 if diff_state_updates.is_empty() {
846 return;
847 }
848
849 cx.spawn(move |this, mut cx| async move {
850 let snapshot =
851 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
852 let diff_bases_changes_by_buffer = cx
853 .background_spawn(async move {
854 diff_state_updates
855 .into_iter()
856 .filter_map(
857 |(buffer_snapshot, path, current_index_text, current_head_text)| {
858 let local_repo = snapshot.local_repo_for_path(&path)?;
859 let relative_path = local_repo.relativize(&path).ok()?;
860 let index_text = if current_index_text.is_some() {
861 local_repo.repo().load_index_text(&relative_path)
862 } else {
863 None
864 };
865 let head_text = if current_head_text.is_some() {
866 local_repo.repo().load_committed_text(&relative_path)
867 } else {
868 None
869 };
870
871 // Avoid triggering a diff update if the base text has not changed.
872 if let Some((current_index, current_head)) =
873 current_index_text.as_ref().zip(current_head_text.as_ref())
874 {
875 if current_index.as_deref() == index_text.as_ref()
876 && current_head.as_deref() == head_text.as_ref()
877 {
878 return None;
879 }
880 }
881
882 let diff_bases_change = match (
883 current_index_text.is_some(),
884 current_head_text.is_some(),
885 ) {
886 (true, true) => Some(if index_text == head_text {
887 DiffBasesChange::SetBoth(head_text)
888 } else {
889 DiffBasesChange::SetEach {
890 index: index_text,
891 head: head_text,
892 }
893 }),
894 (true, false) => Some(DiffBasesChange::SetIndex(index_text)),
895 (false, true) => Some(DiffBasesChange::SetHead(head_text)),
896 (false, false) => None,
897 };
898 Some((buffer_snapshot, diff_bases_change))
899 },
900 )
901 .collect::<Vec<_>>()
902 })
903 .await;
904
905 this.update(&mut cx, |this, cx| {
906 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
907 let Some(OpenBuffer::Complete { diff_state, .. }) =
908 this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
909 else {
910 continue;
911 };
912 let Some(diff_bases_change) = diff_bases_change else {
913 continue;
914 };
915
916 diff_state.update(cx, |diff_state, cx| {
917 use proto::update_diff_bases::Mode;
918
919 if let Some((client, project_id)) = this.downstream_client.as_ref() {
920 let buffer_id = buffer_snapshot.remote_id().to_proto();
921 let (staged_text, committed_text, mode) = match diff_bases_change
922 .clone()
923 {
924 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
925 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
926 DiffBasesChange::SetEach { index, head } => {
927 (index, head, Mode::IndexAndHead)
928 }
929 DiffBasesChange::SetBoth(text) => {
930 (None, text, Mode::IndexMatchesHead)
931 }
932 };
933 let message = proto::UpdateDiffBases {
934 project_id: *project_id,
935 buffer_id,
936 staged_text,
937 committed_text,
938 mode: mode as i32,
939 };
940
941 client.send(message).log_err();
942 }
943
944 let _ =
945 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
946 });
947 }
948 })
949 })
950 .detach_and_log_err(cx);
951 }
952
953 fn local_worktree_entry_changed(
954 this: &mut BufferStore,
955 entry_id: ProjectEntryId,
956 path: &Arc<Path>,
957 worktree: &Entity<worktree::Worktree>,
958 snapshot: &worktree::Snapshot,
959 cx: &mut Context<BufferStore>,
960 ) -> Option<()> {
961 let project_path = ProjectPath {
962 worktree_id: snapshot.id(),
963 path: path.clone(),
964 };
965
966 let buffer_id = {
967 let local = this.as_local_mut()?;
968 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
969 Some(&buffer_id) => buffer_id,
970 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
971 }
972 };
973
974 let buffer = if let Some(buffer) = this.get(buffer_id) {
975 Some(buffer)
976 } else {
977 this.opened_buffers.remove(&buffer_id);
978 None
979 };
980
981 let buffer = if let Some(buffer) = buffer {
982 buffer
983 } else {
984 let this = this.as_local_mut()?;
985 this.local_buffer_ids_by_path.remove(&project_path);
986 this.local_buffer_ids_by_entry_id.remove(&entry_id);
987 return None;
988 };
989
990 let events = buffer.update(cx, |buffer, cx| {
991 let local = this.as_local_mut()?;
992 let file = buffer.file()?;
993 let old_file = File::from_dyn(Some(file))?;
994 if old_file.worktree != *worktree {
995 return None;
996 }
997
998 let snapshot_entry = old_file
999 .entry_id
1000 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
1001 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
1002
1003 let new_file = if let Some(entry) = snapshot_entry {
1004 File {
1005 disk_state: match entry.mtime {
1006 Some(mtime) => DiskState::Present { mtime },
1007 None => old_file.disk_state,
1008 },
1009 is_local: true,
1010 entry_id: Some(entry.id),
1011 path: entry.path.clone(),
1012 worktree: worktree.clone(),
1013 is_private: entry.is_private,
1014 }
1015 } else {
1016 File {
1017 disk_state: DiskState::Deleted,
1018 is_local: true,
1019 entry_id: old_file.entry_id,
1020 path: old_file.path.clone(),
1021 worktree: worktree.clone(),
1022 is_private: old_file.is_private,
1023 }
1024 };
1025
1026 if new_file == *old_file {
1027 return None;
1028 }
1029
1030 let mut events = Vec::new();
1031 if new_file.path != old_file.path {
1032 local.local_buffer_ids_by_path.remove(&ProjectPath {
1033 path: old_file.path.clone(),
1034 worktree_id: old_file.worktree_id(cx),
1035 });
1036 local.local_buffer_ids_by_path.insert(
1037 ProjectPath {
1038 worktree_id: new_file.worktree_id(cx),
1039 path: new_file.path.clone(),
1040 },
1041 buffer_id,
1042 );
1043 events.push(BufferStoreEvent::BufferChangedFilePath {
1044 buffer: cx.entity(),
1045 old_file: buffer.file().cloned(),
1046 });
1047 }
1048
1049 if new_file.entry_id != old_file.entry_id {
1050 if let Some(entry_id) = old_file.entry_id {
1051 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1052 }
1053 if let Some(entry_id) = new_file.entry_id {
1054 local
1055 .local_buffer_ids_by_entry_id
1056 .insert(entry_id, buffer_id);
1057 }
1058 }
1059
1060 if let Some((client, project_id)) = &this.downstream_client {
1061 client
1062 .send(proto::UpdateBufferFile {
1063 project_id: *project_id,
1064 buffer_id: buffer_id.to_proto(),
1065 file: Some(new_file.to_proto(cx)),
1066 })
1067 .ok();
1068 }
1069
1070 buffer.file_updated(Arc::new(new_file), cx);
1071 Some(events)
1072 })?;
1073
1074 for event in events {
1075 cx.emit(event);
1076 }
1077
1078 None
1079 }
1080
1081 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1082 let file = File::from_dyn(buffer.read(cx).file())?;
1083
1084 let remote_id = buffer.read(cx).remote_id();
1085 if let Some(entry_id) = file.entry_id {
1086 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1087 Some(_) => {
1088 return None;
1089 }
1090 None => {
1091 self.local_buffer_ids_by_entry_id
1092 .insert(entry_id, remote_id);
1093 }
1094 }
1095 };
1096 self.local_buffer_ids_by_path.insert(
1097 ProjectPath {
1098 worktree_id: file.worktree_id(cx),
1099 path: file.path.clone(),
1100 },
1101 remote_id,
1102 );
1103
1104 Some(())
1105 }
1106
1107 fn save_buffer(
1108 &self,
1109 buffer: Entity<Buffer>,
1110 cx: &mut Context<BufferStore>,
1111 ) -> Task<Result<()>> {
1112 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1113 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1114 };
1115 let worktree = file.worktree.clone();
1116 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1117 }
1118
1119 fn save_buffer_as(
1120 &self,
1121 buffer: Entity<Buffer>,
1122 path: ProjectPath,
1123 cx: &mut Context<BufferStore>,
1124 ) -> Task<Result<()>> {
1125 let Some(worktree) = self
1126 .worktree_store
1127 .read(cx)
1128 .worktree_for_id(path.worktree_id, cx)
1129 else {
1130 return Task::ready(Err(anyhow!("no such worktree")));
1131 };
1132 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1133 }
1134
1135 fn open_buffer(
1136 &self,
1137 path: Arc<Path>,
1138 worktree: Entity<Worktree>,
1139 cx: &mut Context<BufferStore>,
1140 ) -> Task<Result<Entity<Buffer>>> {
1141 let load_buffer = worktree.update(cx, |worktree, cx| {
1142 let load_file = worktree.load_file(path.as_ref(), cx);
1143 let reservation = cx.reserve_entity();
1144 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1145 cx.spawn(move |_, mut cx| async move {
1146 let loaded = load_file.await?;
1147 let text_buffer = cx
1148 .background_spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1149 .await;
1150 cx.insert_entity(reservation, |_| {
1151 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1152 })
1153 })
1154 });
1155
1156 cx.spawn(move |this, mut cx| async move {
1157 let buffer = match load_buffer.await {
1158 Ok(buffer) => Ok(buffer),
1159 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1160 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1161 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1162 Buffer::build(
1163 text_buffer,
1164 Some(Arc::new(File {
1165 worktree,
1166 path,
1167 disk_state: DiskState::New,
1168 entry_id: None,
1169 is_local: true,
1170 is_private: false,
1171 })),
1172 Capability::ReadWrite,
1173 )
1174 }),
1175 Err(e) => Err(e),
1176 }?;
1177 this.update(&mut cx, |this, cx| {
1178 this.add_buffer(buffer.clone(), cx)?;
1179 let buffer_id = buffer.read(cx).remote_id();
1180 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1181 let this = this.as_local_mut().unwrap();
1182 this.local_buffer_ids_by_path.insert(
1183 ProjectPath {
1184 worktree_id: file.worktree_id(cx),
1185 path: file.path.clone(),
1186 },
1187 buffer_id,
1188 );
1189
1190 if let Some(entry_id) = file.entry_id {
1191 this.local_buffer_ids_by_entry_id
1192 .insert(entry_id, buffer_id);
1193 }
1194 }
1195
1196 anyhow::Ok(())
1197 })??;
1198
1199 Ok(buffer)
1200 })
1201 }
1202
1203 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1204 cx.spawn(|buffer_store, mut cx| async move {
1205 let buffer =
1206 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1207 buffer_store.update(&mut cx, |buffer_store, cx| {
1208 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1209 })?;
1210 Ok(buffer)
1211 })
1212 }
1213
1214 fn reload_buffers(
1215 &self,
1216 buffers: HashSet<Entity<Buffer>>,
1217 push_to_history: bool,
1218 cx: &mut Context<BufferStore>,
1219 ) -> Task<Result<ProjectTransaction>> {
1220 cx.spawn(move |_, mut cx| async move {
1221 let mut project_transaction = ProjectTransaction::default();
1222 for buffer in buffers {
1223 let transaction = buffer
1224 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1225 .await?;
1226 buffer.update(&mut cx, |buffer, cx| {
1227 if let Some(transaction) = transaction {
1228 if !push_to_history {
1229 buffer.forget_transaction(transaction.id);
1230 }
1231 project_transaction.0.insert(cx.entity(), transaction);
1232 }
1233 })?;
1234 }
1235
1236 Ok(project_transaction)
1237 })
1238 }
1239}
1240
1241impl BufferStore {
1242 pub fn init(client: &AnyProtoClient) {
1243 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1244 client.add_entity_message_handler(Self::handle_buffer_saved);
1245 client.add_entity_message_handler(Self::handle_update_buffer_file);
1246 client.add_entity_request_handler(Self::handle_save_buffer);
1247 client.add_entity_request_handler(Self::handle_blame_buffer);
1248 client.add_entity_request_handler(Self::handle_reload_buffers);
1249 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1250 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1251 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1252 client.add_entity_message_handler(Self::handle_update_diff_bases);
1253 }
1254
1255 /// Creates a buffer store, optionally retaining its buffers.
1256 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1257 Self {
1258 state: BufferStoreState::Local(LocalBufferStore {
1259 local_buffer_ids_by_path: Default::default(),
1260 local_buffer_ids_by_entry_id: Default::default(),
1261 worktree_store: worktree_store.clone(),
1262 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1263 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1264 let this = this.as_local_mut().unwrap();
1265 this.subscribe_to_worktree(worktree, cx);
1266 }
1267 }),
1268 }),
1269 downstream_client: None,
1270 opened_buffers: Default::default(),
1271 shared_buffers: Default::default(),
1272 loading_buffers: Default::default(),
1273 loading_diffs: Default::default(),
1274 worktree_store,
1275 }
1276 }
1277
1278 pub fn remote(
1279 worktree_store: Entity<WorktreeStore>,
1280 upstream_client: AnyProtoClient,
1281 remote_id: u64,
1282 _cx: &mut Context<Self>,
1283 ) -> Self {
1284 Self {
1285 state: BufferStoreState::Remote(RemoteBufferStore {
1286 shared_with_me: Default::default(),
1287 loading_remote_buffers_by_id: Default::default(),
1288 remote_buffer_listeners: Default::default(),
1289 project_id: remote_id,
1290 upstream_client,
1291 worktree_store: worktree_store.clone(),
1292 }),
1293 downstream_client: None,
1294 opened_buffers: Default::default(),
1295 loading_buffers: Default::default(),
1296 loading_diffs: Default::default(),
1297 shared_buffers: Default::default(),
1298 worktree_store,
1299 }
1300 }
1301
1302 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1303 match &mut self.state {
1304 BufferStoreState::Local(state) => Some(state),
1305 _ => None,
1306 }
1307 }
1308
1309 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1310 match &mut self.state {
1311 BufferStoreState::Remote(state) => Some(state),
1312 _ => None,
1313 }
1314 }
1315
1316 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1317 match &self.state {
1318 BufferStoreState::Remote(state) => Some(state),
1319 _ => None,
1320 }
1321 }
1322
1323 pub fn open_buffer(
1324 &mut self,
1325 project_path: ProjectPath,
1326 cx: &mut Context<Self>,
1327 ) -> Task<Result<Entity<Buffer>>> {
1328 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1329 return Task::ready(Ok(buffer));
1330 }
1331
1332 let task = match self.loading_buffers.entry(project_path.clone()) {
1333 hash_map::Entry::Occupied(e) => e.get().clone(),
1334 hash_map::Entry::Vacant(entry) => {
1335 let path = project_path.path.clone();
1336 let Some(worktree) = self
1337 .worktree_store
1338 .read(cx)
1339 .worktree_for_id(project_path.worktree_id, cx)
1340 else {
1341 return Task::ready(Err(anyhow!("no such worktree")));
1342 };
1343 let load_buffer = match &self.state {
1344 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1345 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1346 };
1347
1348 entry
1349 .insert(
1350 cx.spawn(move |this, mut cx| async move {
1351 let load_result = load_buffer.await;
1352 this.update(&mut cx, |this, _cx| {
1353 // Record the fact that the buffer is no longer loading.
1354 this.loading_buffers.remove(&project_path);
1355 })
1356 .ok();
1357 load_result.map_err(Arc::new)
1358 })
1359 .shared(),
1360 )
1361 .clone()
1362 }
1363 };
1364
1365 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1366 }
1367
1368 pub fn open_unstaged_diff(
1369 &mut self,
1370 buffer: Entity<Buffer>,
1371 cx: &mut Context<Self>,
1372 ) -> Task<Result<Entity<BufferDiff>>> {
1373 let buffer_id = buffer.read(cx).remote_id();
1374 if let Some(OpenBuffer::Complete { diff_state, .. }) = self.opened_buffers.get(&buffer_id) {
1375 if let Some(unstaged_diff) = diff_state
1376 .read(cx)
1377 .unstaged_diff
1378 .as_ref()
1379 .and_then(|weak| weak.upgrade())
1380 {
1381 if let Some(task) =
1382 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
1383 {
1384 return cx.background_executor().spawn(async move {
1385 task.await?;
1386 Ok(unstaged_diff)
1387 });
1388 }
1389 return Task::ready(Ok(unstaged_diff));
1390 }
1391 }
1392
1393 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1394 hash_map::Entry::Occupied(e) => e.get().clone(),
1395 hash_map::Entry::Vacant(entry) => {
1396 let staged_text = match &self.state {
1397 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1398 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1399 };
1400
1401 entry
1402 .insert(
1403 cx.spawn(move |this, cx| async move {
1404 Self::open_diff_internal(
1405 this,
1406 DiffKind::Unstaged,
1407 staged_text.await.map(DiffBasesChange::SetIndex),
1408 buffer,
1409 cx,
1410 )
1411 .await
1412 .map_err(Arc::new)
1413 })
1414 .shared(),
1415 )
1416 .clone()
1417 }
1418 };
1419
1420 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1421 }
1422
1423 pub fn open_uncommitted_diff(
1424 &mut self,
1425 buffer: Entity<Buffer>,
1426 cx: &mut Context<Self>,
1427 ) -> Task<Result<Entity<BufferDiff>>> {
1428 let buffer_id = buffer.read(cx).remote_id();
1429
1430 if let Some(OpenBuffer::Complete { diff_state, .. }) = self.opened_buffers.get(&buffer_id) {
1431 if let Some(uncommitted_diff) = diff_state
1432 .read(cx)
1433 .uncommitted_diff
1434 .as_ref()
1435 .and_then(|weak| weak.upgrade())
1436 {
1437 if let Some(task) =
1438 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
1439 {
1440 return cx.background_executor().spawn(async move {
1441 task.await?;
1442 Ok(uncommitted_diff)
1443 });
1444 }
1445 return Task::ready(Ok(uncommitted_diff));
1446 }
1447 }
1448
1449 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1450 hash_map::Entry::Occupied(e) => e.get().clone(),
1451 hash_map::Entry::Vacant(entry) => {
1452 let changes = match &self.state {
1453 BufferStoreState::Local(this) => {
1454 let committed_text = this.load_committed_text(&buffer, cx);
1455 let staged_text = this.load_staged_text(&buffer, cx);
1456 cx.background_spawn(async move {
1457 let committed_text = committed_text.await?;
1458 let staged_text = staged_text.await?;
1459 let diff_bases_change = if committed_text == staged_text {
1460 DiffBasesChange::SetBoth(committed_text)
1461 } else {
1462 DiffBasesChange::SetEach {
1463 index: staged_text,
1464 head: committed_text,
1465 }
1466 };
1467 Ok(diff_bases_change)
1468 })
1469 }
1470 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1471 };
1472
1473 entry
1474 .insert(
1475 cx.spawn(move |this, cx| async move {
1476 Self::open_diff_internal(
1477 this,
1478 DiffKind::Uncommitted,
1479 changes.await,
1480 buffer,
1481 cx,
1482 )
1483 .await
1484 .map_err(Arc::new)
1485 })
1486 .shared(),
1487 )
1488 .clone()
1489 }
1490 };
1491
1492 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1493 }
1494
1495 async fn open_diff_internal(
1496 this: WeakEntity<Self>,
1497 kind: DiffKind,
1498 texts: Result<DiffBasesChange>,
1499 buffer_entity: Entity<Buffer>,
1500 mut cx: AsyncApp,
1501 ) -> Result<Entity<BufferDiff>> {
1502 let diff_bases_change = match texts {
1503 Err(e) => {
1504 this.update(&mut cx, |this, cx| {
1505 let buffer = buffer_entity.read(cx);
1506 let buffer_id = buffer.remote_id();
1507 this.loading_diffs.remove(&(buffer_id, kind));
1508 })?;
1509 return Err(e);
1510 }
1511 Ok(change) => change,
1512 };
1513
1514 this.update(&mut cx, |this, cx| {
1515 let buffer = buffer_entity.read(cx);
1516 let buffer_id = buffer.remote_id();
1517 let language = buffer.language().cloned();
1518 let language_registry = buffer.language_registry();
1519 let text_snapshot = buffer.text_snapshot();
1520 this.loading_diffs.remove(&(buffer_id, kind));
1521
1522 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1523 this.opened_buffers.get_mut(&buffer_id)
1524 {
1525 diff_state.update(cx, |diff_state, cx| {
1526 diff_state.language = language;
1527 diff_state.language_registry = language_registry;
1528
1529 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
1530 match kind {
1531 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1532 DiffKind::Uncommitted => {
1533 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1534 diff
1535 } else {
1536 let unstaged_diff =
1537 cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
1538 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1539 unstaged_diff
1540 };
1541
1542 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
1543 diff_state.uncommitted_diff = Some(diff.downgrade())
1544 }
1545 };
1546
1547 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1548
1549 Ok(async move {
1550 rx.await.ok();
1551 Ok(diff)
1552 })
1553 })
1554 } else {
1555 Err(anyhow!("buffer was closed"))
1556 }
1557 })??
1558 .await
1559 }
1560
1561 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1562 match &self.state {
1563 BufferStoreState::Local(this) => this.create_buffer(cx),
1564 BufferStoreState::Remote(this) => this.create_buffer(cx),
1565 }
1566 }
1567
1568 pub fn save_buffer(
1569 &mut self,
1570 buffer: Entity<Buffer>,
1571 cx: &mut Context<Self>,
1572 ) -> Task<Result<()>> {
1573 match &mut self.state {
1574 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1575 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1576 }
1577 }
1578
1579 pub fn save_buffer_as(
1580 &mut self,
1581 buffer: Entity<Buffer>,
1582 path: ProjectPath,
1583 cx: &mut Context<Self>,
1584 ) -> Task<Result<()>> {
1585 let old_file = buffer.read(cx).file().cloned();
1586 let task = match &self.state {
1587 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1588 BufferStoreState::Remote(this) => {
1589 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1590 }
1591 };
1592 cx.spawn(|this, mut cx| async move {
1593 task.await?;
1594 this.update(&mut cx, |_, cx| {
1595 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1596 })
1597 })
1598 }
1599
1600 pub fn blame_buffer(
1601 &self,
1602 buffer: &Entity<Buffer>,
1603 version: Option<clock::Global>,
1604 cx: &App,
1605 ) -> Task<Result<Option<Blame>>> {
1606 let buffer = buffer.read(cx);
1607 let Some(file) = File::from_dyn(buffer.file()) else {
1608 return Task::ready(Err(anyhow!("buffer has no file")));
1609 };
1610
1611 match file.worktree.clone().read(cx) {
1612 Worktree::Local(worktree) => {
1613 let worktree = worktree.snapshot();
1614 let blame_params = maybe!({
1615 let local_repo = match worktree.local_repo_for_path(&file.path) {
1616 Some(repo_for_path) => repo_for_path,
1617 None => return Ok(None),
1618 };
1619
1620 let relative_path = local_repo
1621 .relativize(&file.path)
1622 .context("failed to relativize buffer path")?;
1623
1624 let repo = local_repo.repo().clone();
1625
1626 let content = match version {
1627 Some(version) => buffer.rope_for_version(&version).clone(),
1628 None => buffer.as_rope().clone(),
1629 };
1630
1631 anyhow::Ok(Some((repo, relative_path, content)))
1632 });
1633
1634 cx.background_spawn(async move {
1635 let Some((repo, relative_path, content)) = blame_params? else {
1636 return Ok(None);
1637 };
1638 repo.blame(&relative_path, content)
1639 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1640 .map(Some)
1641 })
1642 }
1643 Worktree::Remote(worktree) => {
1644 let buffer_id = buffer.remote_id();
1645 let version = buffer.version();
1646 let project_id = worktree.project_id();
1647 let client = worktree.client();
1648 cx.spawn(|_| async move {
1649 let response = client
1650 .request(proto::BlameBuffer {
1651 project_id,
1652 buffer_id: buffer_id.into(),
1653 version: serialize_version(&version),
1654 })
1655 .await?;
1656 Ok(deserialize_blame_buffer_response(response))
1657 })
1658 }
1659 }
1660 }
1661
1662 pub fn get_permalink_to_line(
1663 &self,
1664 buffer: &Entity<Buffer>,
1665 selection: Range<u32>,
1666 cx: &App,
1667 ) -> Task<Result<url::Url>> {
1668 let buffer = buffer.read(cx);
1669 let Some(file) = File::from_dyn(buffer.file()) else {
1670 return Task::ready(Err(anyhow!("buffer has no file")));
1671 };
1672
1673 match file.worktree.read(cx) {
1674 Worktree::Local(worktree) => {
1675 let worktree_path = worktree.abs_path().clone();
1676 let Some((repo_entry, repo)) =
1677 worktree.repository_for_path(file.path()).and_then(|entry| {
1678 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1679 Some((entry, repo))
1680 })
1681 else {
1682 // If we're not in a Git repo, check whether this is a Rust source
1683 // file in the Cargo registry (presumably opened with go-to-definition
1684 // from a normal Rust file). If so, we can put together a permalink
1685 // using crate metadata.
1686 if buffer
1687 .language()
1688 .is_none_or(|lang| lang.name() != "Rust".into())
1689 {
1690 return Task::ready(Err(anyhow!("no permalink available")));
1691 }
1692 let file_path = worktree_path.join(file.path());
1693 return cx.spawn(|cx| async move {
1694 let provider_registry =
1695 cx.update(GitHostingProviderRegistry::default_global)?;
1696 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1697 .map_err(|_| anyhow!("no permalink available"))
1698 });
1699 };
1700
1701 let path = match repo_entry.relativize(file.path()) {
1702 Ok(RepoPath(path)) => path,
1703 Err(e) => return Task::ready(Err(e)),
1704 };
1705
1706 cx.spawn(|cx| async move {
1707 const REMOTE_NAME: &str = "origin";
1708 let origin_url = repo
1709 .remote_url(REMOTE_NAME)
1710 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1711
1712 let sha = repo
1713 .head_sha()
1714 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1715
1716 let provider_registry =
1717 cx.update(GitHostingProviderRegistry::default_global)?;
1718
1719 let (provider, remote) =
1720 parse_git_remote_url(provider_registry, &origin_url)
1721 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1722
1723 let path = path
1724 .to_str()
1725 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1726
1727 Ok(provider.build_permalink(
1728 remote,
1729 BuildPermalinkParams {
1730 sha: &sha,
1731 path,
1732 selection: Some(selection),
1733 },
1734 ))
1735 })
1736 }
1737 Worktree::Remote(worktree) => {
1738 let buffer_id = buffer.remote_id();
1739 let project_id = worktree.project_id();
1740 let client = worktree.client();
1741 cx.spawn(|_| async move {
1742 let response = client
1743 .request(proto::GetPermalinkToLine {
1744 project_id,
1745 buffer_id: buffer_id.into(),
1746 selection: Some(proto::Range {
1747 start: selection.start as u64,
1748 end: selection.end as u64,
1749 }),
1750 })
1751 .await?;
1752
1753 url::Url::parse(&response.permalink).context("failed to parse permalink")
1754 })
1755 }
1756 }
1757 }
1758
1759 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1760 let buffer = buffer_entity.read(cx);
1761 let language = buffer.language().cloned();
1762 let language_registry = buffer.language_registry();
1763 let remote_id = buffer.remote_id();
1764 let is_remote = buffer.replica_id() != 0;
1765 let open_buffer = OpenBuffer::Complete {
1766 buffer: buffer_entity.downgrade(),
1767 diff_state: cx.new(|_| BufferDiffState {
1768 language,
1769 language_registry,
1770 ..Default::default()
1771 }),
1772 };
1773
1774 let handle = cx.entity().downgrade();
1775 buffer_entity.update(cx, move |_, cx| {
1776 cx.on_release(move |buffer, cx| {
1777 handle
1778 .update(cx, |_, cx| {
1779 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1780 })
1781 .ok();
1782 })
1783 .detach()
1784 });
1785
1786 match self.opened_buffers.entry(remote_id) {
1787 hash_map::Entry::Vacant(entry) => {
1788 entry.insert(open_buffer);
1789 }
1790 hash_map::Entry::Occupied(mut entry) => {
1791 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1792 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1793 } else if entry.get().upgrade().is_some() {
1794 if is_remote {
1795 return Ok(());
1796 } else {
1797 debug_panic!("buffer {} was already registered", remote_id);
1798 Err(anyhow!("buffer {} was already registered", remote_id))?;
1799 }
1800 }
1801 entry.insert(open_buffer);
1802 }
1803 }
1804
1805 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1806 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1807 Ok(())
1808 }
1809
1810 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1811 self.opened_buffers
1812 .values()
1813 .filter_map(|buffer| buffer.upgrade())
1814 }
1815
1816 pub fn loading_buffers(
1817 &self,
1818 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1819 self.loading_buffers.iter().map(|(path, task)| {
1820 let task = task.clone();
1821 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1822 })
1823 }
1824
1825 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1826 self.buffers().find_map(|buffer| {
1827 let file = File::from_dyn(buffer.read(cx).file())?;
1828 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1829 Some(buffer)
1830 } else {
1831 None
1832 }
1833 })
1834 }
1835
1836 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1837 self.opened_buffers.get(&buffer_id)?.upgrade()
1838 }
1839
1840 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1841 self.get(buffer_id)
1842 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1843 }
1844
1845 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1846 self.get(buffer_id).or_else(|| {
1847 self.as_remote()
1848 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1849 })
1850 }
1851
1852 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1853 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1854 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1855 } else {
1856 None
1857 }
1858 }
1859
1860 pub fn get_uncommitted_diff(
1861 &self,
1862 buffer_id: BufferId,
1863 cx: &App,
1864 ) -> Option<Entity<BufferDiff>> {
1865 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1866 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1867 } else {
1868 None
1869 }
1870 }
1871
1872 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1873 let buffers = self
1874 .buffers()
1875 .map(|buffer| {
1876 let buffer = buffer.read(cx);
1877 proto::BufferVersion {
1878 id: buffer.remote_id().into(),
1879 version: language::proto::serialize_version(&buffer.version),
1880 }
1881 })
1882 .collect();
1883 let incomplete_buffer_ids = self
1884 .as_remote()
1885 .map(|remote| remote.incomplete_buffer_ids())
1886 .unwrap_or_default();
1887 (buffers, incomplete_buffer_ids)
1888 }
1889
1890 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1891 for open_buffer in self.opened_buffers.values_mut() {
1892 if let Some(buffer) = open_buffer.upgrade() {
1893 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1894 }
1895 }
1896
1897 for buffer in self.buffers() {
1898 buffer.update(cx, |buffer, cx| {
1899 buffer.set_capability(Capability::ReadOnly, cx)
1900 });
1901 }
1902
1903 if let Some(remote) = self.as_remote_mut() {
1904 // Wake up all futures currently waiting on a buffer to get opened,
1905 // to give them a chance to fail now that we've disconnected.
1906 remote.remote_buffer_listeners.clear()
1907 }
1908 }
1909
1910 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1911 self.downstream_client = Some((downstream_client, remote_id));
1912 }
1913
1914 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1915 self.downstream_client.take();
1916 self.forget_shared_buffers();
1917 }
1918
1919 pub fn discard_incomplete(&mut self) {
1920 self.opened_buffers
1921 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1922 }
1923
1924 pub fn find_search_candidates(
1925 &mut self,
1926 query: &SearchQuery,
1927 mut limit: usize,
1928 fs: Arc<dyn Fs>,
1929 cx: &mut Context<Self>,
1930 ) -> Receiver<Entity<Buffer>> {
1931 let (tx, rx) = smol::channel::unbounded();
1932 let mut open_buffers = HashSet::default();
1933 let mut unnamed_buffers = Vec::new();
1934 for handle in self.buffers() {
1935 let buffer = handle.read(cx);
1936 if let Some(entry_id) = buffer.entry_id(cx) {
1937 open_buffers.insert(entry_id);
1938 } else {
1939 limit = limit.saturating_sub(1);
1940 unnamed_buffers.push(handle)
1941 };
1942 }
1943
1944 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1945 let project_paths_rx = self
1946 .worktree_store
1947 .update(cx, |worktree_store, cx| {
1948 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1949 })
1950 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1951
1952 cx.spawn(|this, mut cx| async move {
1953 for buffer in unnamed_buffers {
1954 tx.send(buffer).await.ok();
1955 }
1956
1957 let mut project_paths_rx = pin!(project_paths_rx);
1958 while let Some(project_paths) = project_paths_rx.next().await {
1959 let buffers = this.update(&mut cx, |this, cx| {
1960 project_paths
1961 .into_iter()
1962 .map(|project_path| this.open_buffer(project_path, cx))
1963 .collect::<Vec<_>>()
1964 })?;
1965 for buffer_task in buffers {
1966 if let Some(buffer) = buffer_task.await.log_err() {
1967 if tx.send(buffer).await.is_err() {
1968 return anyhow::Ok(());
1969 }
1970 }
1971 }
1972 }
1973 anyhow::Ok(())
1974 })
1975 .detach();
1976 rx
1977 }
1978
1979 pub fn recalculate_buffer_diffs(
1980 &mut self,
1981 buffers: Vec<Entity<Buffer>>,
1982 cx: &mut Context<Self>,
1983 ) -> impl Future<Output = ()> {
1984 let mut futures = Vec::new();
1985 for buffer in buffers {
1986 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1987 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1988 {
1989 let buffer = buffer.read(cx).text_snapshot();
1990 futures.push(diff_state.update(cx, |diff_state, cx| {
1991 diff_state.recalculate_diffs(buffer, cx)
1992 }));
1993 }
1994 }
1995 async move {
1996 futures::future::join_all(futures).await;
1997 }
1998 }
1999
2000 fn on_buffer_event(
2001 &mut self,
2002 buffer: Entity<Buffer>,
2003 event: &BufferEvent,
2004 cx: &mut Context<Self>,
2005 ) {
2006 match event {
2007 BufferEvent::FileHandleChanged => {
2008 if let Some(local) = self.as_local_mut() {
2009 local.buffer_changed_file(buffer, cx);
2010 }
2011 }
2012 BufferEvent::Reloaded => {
2013 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
2014 return;
2015 };
2016 let buffer = buffer.read(cx);
2017 downstream_client
2018 .send(proto::BufferReloaded {
2019 project_id: *project_id,
2020 buffer_id: buffer.remote_id().to_proto(),
2021 version: serialize_version(&buffer.version()),
2022 mtime: buffer.saved_mtime().map(|t| t.into()),
2023 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
2024 })
2025 .log_err();
2026 }
2027 BufferEvent::LanguageChanged => {
2028 let buffer_id = buffer.read(cx).remote_id();
2029 if let Some(OpenBuffer::Complete { diff_state, .. }) =
2030 self.opened_buffers.get(&buffer_id)
2031 {
2032 diff_state.update(cx, |diff_state, cx| {
2033 diff_state.buffer_language_changed(buffer, cx);
2034 });
2035 }
2036 }
2037 _ => {}
2038 }
2039 }
2040
2041 pub async fn handle_update_buffer(
2042 this: Entity<Self>,
2043 envelope: TypedEnvelope<proto::UpdateBuffer>,
2044 mut cx: AsyncApp,
2045 ) -> Result<proto::Ack> {
2046 let payload = envelope.payload.clone();
2047 let buffer_id = BufferId::new(payload.buffer_id)?;
2048 let ops = payload
2049 .operations
2050 .into_iter()
2051 .map(language::proto::deserialize_operation)
2052 .collect::<Result<Vec<_>, _>>()?;
2053 this.update(&mut cx, |this, cx| {
2054 match this.opened_buffers.entry(buffer_id) {
2055 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2056 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2057 OpenBuffer::Complete { buffer, .. } => {
2058 if let Some(buffer) = buffer.upgrade() {
2059 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2060 }
2061 }
2062 },
2063 hash_map::Entry::Vacant(e) => {
2064 e.insert(OpenBuffer::Operations(ops));
2065 }
2066 }
2067 Ok(proto::Ack {})
2068 })?
2069 }
2070
2071 pub fn register_shared_lsp_handle(
2072 &mut self,
2073 peer_id: proto::PeerId,
2074 buffer_id: BufferId,
2075 handle: OpenLspBufferHandle,
2076 ) {
2077 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2078 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2079 buffer.lsp_handle = Some(handle);
2080 return;
2081 }
2082 }
2083 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2084 }
2085
2086 pub fn handle_synchronize_buffers(
2087 &mut self,
2088 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2089 cx: &mut Context<Self>,
2090 client: Arc<Client>,
2091 ) -> Result<proto::SynchronizeBuffersResponse> {
2092 let project_id = envelope.payload.project_id;
2093 let mut response = proto::SynchronizeBuffersResponse {
2094 buffers: Default::default(),
2095 };
2096 let Some(guest_id) = envelope.original_sender_id else {
2097 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2098 };
2099
2100 self.shared_buffers.entry(guest_id).or_default().clear();
2101 for buffer in envelope.payload.buffers {
2102 let buffer_id = BufferId::new(buffer.id)?;
2103 let remote_version = language::proto::deserialize_version(&buffer.version);
2104 if let Some(buffer) = self.get(buffer_id) {
2105 self.shared_buffers
2106 .entry(guest_id)
2107 .or_default()
2108 .entry(buffer_id)
2109 .or_insert_with(|| SharedBuffer {
2110 buffer: buffer.clone(),
2111 diff: None,
2112 lsp_handle: None,
2113 });
2114
2115 let buffer = buffer.read(cx);
2116 response.buffers.push(proto::BufferVersion {
2117 id: buffer_id.into(),
2118 version: language::proto::serialize_version(&buffer.version),
2119 });
2120
2121 let operations = buffer.serialize_ops(Some(remote_version), cx);
2122 let client = client.clone();
2123 if let Some(file) = buffer.file() {
2124 client
2125 .send(proto::UpdateBufferFile {
2126 project_id,
2127 buffer_id: buffer_id.into(),
2128 file: Some(file.to_proto(cx)),
2129 })
2130 .log_err();
2131 }
2132
2133 // TODO(max): do something
2134 // client
2135 // .send(proto::UpdateStagedText {
2136 // project_id,
2137 // buffer_id: buffer_id.into(),
2138 // diff_base: buffer.diff_base().map(ToString::to_string),
2139 // })
2140 // .log_err();
2141
2142 client
2143 .send(proto::BufferReloaded {
2144 project_id,
2145 buffer_id: buffer_id.into(),
2146 version: language::proto::serialize_version(buffer.saved_version()),
2147 mtime: buffer.saved_mtime().map(|time| time.into()),
2148 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2149 as i32,
2150 })
2151 .log_err();
2152
2153 cx.background_spawn(
2154 async move {
2155 let operations = operations.await;
2156 for chunk in split_operations(operations) {
2157 client
2158 .request(proto::UpdateBuffer {
2159 project_id,
2160 buffer_id: buffer_id.into(),
2161 operations: chunk,
2162 })
2163 .await?;
2164 }
2165 anyhow::Ok(())
2166 }
2167 .log_err(),
2168 )
2169 .detach();
2170 }
2171 }
2172 Ok(response)
2173 }
2174
2175 pub fn handle_create_buffer_for_peer(
2176 &mut self,
2177 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2178 replica_id: u16,
2179 capability: Capability,
2180 cx: &mut Context<Self>,
2181 ) -> Result<()> {
2182 let Some(remote) = self.as_remote_mut() else {
2183 return Err(anyhow!("buffer store is not a remote"));
2184 };
2185
2186 if let Some(buffer) =
2187 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2188 {
2189 self.add_buffer(buffer, cx)?;
2190 }
2191
2192 Ok(())
2193 }
2194
2195 pub async fn handle_update_buffer_file(
2196 this: Entity<Self>,
2197 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2198 mut cx: AsyncApp,
2199 ) -> Result<()> {
2200 let buffer_id = envelope.payload.buffer_id;
2201 let buffer_id = BufferId::new(buffer_id)?;
2202
2203 this.update(&mut cx, |this, cx| {
2204 let payload = envelope.payload.clone();
2205 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2206 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2207 let worktree = this
2208 .worktree_store
2209 .read(cx)
2210 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2211 .ok_or_else(|| anyhow!("no such worktree"))?;
2212 let file = File::from_proto(file, worktree, cx)?;
2213 let old_file = buffer.update(cx, |buffer, cx| {
2214 let old_file = buffer.file().cloned();
2215 let new_path = file.path.clone();
2216 buffer.file_updated(Arc::new(file), cx);
2217 if old_file
2218 .as_ref()
2219 .map_or(true, |old| *old.path() != new_path)
2220 {
2221 Some(old_file)
2222 } else {
2223 None
2224 }
2225 });
2226 if let Some(old_file) = old_file {
2227 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2228 }
2229 }
2230 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2231 downstream_client
2232 .send(proto::UpdateBufferFile {
2233 project_id: *project_id,
2234 buffer_id: buffer_id.into(),
2235 file: envelope.payload.file,
2236 })
2237 .log_err();
2238 }
2239 Ok(())
2240 })?
2241 }
2242
2243 pub async fn handle_save_buffer(
2244 this: Entity<Self>,
2245 envelope: TypedEnvelope<proto::SaveBuffer>,
2246 mut cx: AsyncApp,
2247 ) -> Result<proto::BufferSaved> {
2248 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2249 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2250 anyhow::Ok((
2251 this.get_existing(buffer_id)?,
2252 this.downstream_client
2253 .as_ref()
2254 .map(|(_, project_id)| *project_id)
2255 .context("project is not shared")?,
2256 ))
2257 })??;
2258 buffer
2259 .update(&mut cx, |buffer, _| {
2260 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2261 })?
2262 .await?;
2263 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2264
2265 if let Some(new_path) = envelope.payload.new_path {
2266 let new_path = ProjectPath::from_proto(new_path);
2267 this.update(&mut cx, |this, cx| {
2268 this.save_buffer_as(buffer.clone(), new_path, cx)
2269 })?
2270 .await?;
2271 } else {
2272 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2273 .await?;
2274 }
2275
2276 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2277 project_id,
2278 buffer_id: buffer_id.into(),
2279 version: serialize_version(buffer.saved_version()),
2280 mtime: buffer.saved_mtime().map(|time| time.into()),
2281 })
2282 }
2283
2284 pub async fn handle_close_buffer(
2285 this: Entity<Self>,
2286 envelope: TypedEnvelope<proto::CloseBuffer>,
2287 mut cx: AsyncApp,
2288 ) -> Result<()> {
2289 let peer_id = envelope.sender_id;
2290 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2291 this.update(&mut cx, |this, _| {
2292 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2293 if shared.remove(&buffer_id).is_some() {
2294 if shared.is_empty() {
2295 this.shared_buffers.remove(&peer_id);
2296 }
2297 return;
2298 }
2299 }
2300 debug_panic!(
2301 "peer_id {} closed buffer_id {} which was either not open or already closed",
2302 peer_id,
2303 buffer_id
2304 )
2305 })
2306 }
2307
2308 pub async fn handle_buffer_saved(
2309 this: Entity<Self>,
2310 envelope: TypedEnvelope<proto::BufferSaved>,
2311 mut cx: AsyncApp,
2312 ) -> Result<()> {
2313 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2314 let version = deserialize_version(&envelope.payload.version);
2315 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2316 this.update(&mut cx, move |this, cx| {
2317 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2318 buffer.update(cx, |buffer, cx| {
2319 buffer.did_save(version, mtime, cx);
2320 });
2321 }
2322
2323 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2324 downstream_client
2325 .send(proto::BufferSaved {
2326 project_id: *project_id,
2327 buffer_id: buffer_id.into(),
2328 mtime: envelope.payload.mtime,
2329 version: envelope.payload.version,
2330 })
2331 .log_err();
2332 }
2333 })
2334 }
2335
2336 pub async fn handle_buffer_reloaded(
2337 this: Entity<Self>,
2338 envelope: TypedEnvelope<proto::BufferReloaded>,
2339 mut cx: AsyncApp,
2340 ) -> Result<()> {
2341 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2342 let version = deserialize_version(&envelope.payload.version);
2343 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2344 let line_ending = deserialize_line_ending(
2345 proto::LineEnding::from_i32(envelope.payload.line_ending)
2346 .ok_or_else(|| anyhow!("missing line ending"))?,
2347 );
2348 this.update(&mut cx, |this, cx| {
2349 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2350 buffer.update(cx, |buffer, cx| {
2351 buffer.did_reload(version, line_ending, mtime, cx);
2352 });
2353 }
2354
2355 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2356 downstream_client
2357 .send(proto::BufferReloaded {
2358 project_id: *project_id,
2359 buffer_id: buffer_id.into(),
2360 mtime: envelope.payload.mtime,
2361 version: envelope.payload.version,
2362 line_ending: envelope.payload.line_ending,
2363 })
2364 .log_err();
2365 }
2366 })
2367 }
2368
2369 pub async fn handle_blame_buffer(
2370 this: Entity<Self>,
2371 envelope: TypedEnvelope<proto::BlameBuffer>,
2372 mut cx: AsyncApp,
2373 ) -> Result<proto::BlameBufferResponse> {
2374 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2375 let version = deserialize_version(&envelope.payload.version);
2376 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2377 buffer
2378 .update(&mut cx, |buffer, _| {
2379 buffer.wait_for_version(version.clone())
2380 })?
2381 .await?;
2382 let blame = this
2383 .update(&mut cx, |this, cx| {
2384 this.blame_buffer(&buffer, Some(version), cx)
2385 })?
2386 .await?;
2387 Ok(serialize_blame_buffer_response(blame))
2388 }
2389
2390 pub async fn handle_get_permalink_to_line(
2391 this: Entity<Self>,
2392 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2393 mut cx: AsyncApp,
2394 ) -> Result<proto::GetPermalinkToLineResponse> {
2395 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2396 // let version = deserialize_version(&envelope.payload.version);
2397 let selection = {
2398 let proto_selection = envelope
2399 .payload
2400 .selection
2401 .context("no selection to get permalink for defined")?;
2402 proto_selection.start as u32..proto_selection.end as u32
2403 };
2404 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2405 let permalink = this
2406 .update(&mut cx, |this, cx| {
2407 this.get_permalink_to_line(&buffer, selection, cx)
2408 })?
2409 .await?;
2410 Ok(proto::GetPermalinkToLineResponse {
2411 permalink: permalink.to_string(),
2412 })
2413 }
2414
2415 pub async fn handle_open_unstaged_diff(
2416 this: Entity<Self>,
2417 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2418 mut cx: AsyncApp,
2419 ) -> Result<proto::OpenUnstagedDiffResponse> {
2420 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2421 let diff = this
2422 .update(&mut cx, |this, cx| {
2423 let buffer = this.get(buffer_id)?;
2424 Some(this.open_unstaged_diff(buffer, cx))
2425 })?
2426 .ok_or_else(|| anyhow!("no such buffer"))?
2427 .await?;
2428 this.update(&mut cx, |this, _| {
2429 let shared_buffers = this
2430 .shared_buffers
2431 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2432 .or_default();
2433 debug_assert!(shared_buffers.contains_key(&buffer_id));
2434 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2435 shared.diff = Some(diff.clone());
2436 }
2437 })?;
2438 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2439 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2440 }
2441
2442 pub async fn handle_open_uncommitted_diff(
2443 this: Entity<Self>,
2444 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2445 mut cx: AsyncApp,
2446 ) -> Result<proto::OpenUncommittedDiffResponse> {
2447 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2448 let diff = this
2449 .update(&mut cx, |this, cx| {
2450 let buffer = this.get(buffer_id)?;
2451 Some(this.open_uncommitted_diff(buffer, cx))
2452 })?
2453 .ok_or_else(|| anyhow!("no such buffer"))?
2454 .await?;
2455 this.update(&mut cx, |this, _| {
2456 let shared_buffers = this
2457 .shared_buffers
2458 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2459 .or_default();
2460 debug_assert!(shared_buffers.contains_key(&buffer_id));
2461 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2462 shared.diff = Some(diff.clone());
2463 }
2464 })?;
2465 diff.read_with(&cx, |diff, cx| {
2466 use proto::open_uncommitted_diff_response::Mode;
2467
2468 let unstaged_diff = diff.secondary_diff();
2469 let index_snapshot = unstaged_diff.and_then(|diff| {
2470 let diff = diff.read(cx);
2471 diff.base_text_exists().then(|| diff.base_text())
2472 });
2473
2474 let mode;
2475 let staged_text;
2476 let committed_text;
2477 if diff.base_text_exists() {
2478 let committed_snapshot = diff.base_text();
2479 committed_text = Some(committed_snapshot.text());
2480 if let Some(index_text) = index_snapshot {
2481 if index_text.remote_id() == committed_snapshot.remote_id() {
2482 mode = Mode::IndexMatchesHead;
2483 staged_text = None;
2484 } else {
2485 mode = Mode::IndexAndHead;
2486 staged_text = Some(index_text.text());
2487 }
2488 } else {
2489 mode = Mode::IndexAndHead;
2490 staged_text = None;
2491 }
2492 } else {
2493 mode = Mode::IndexAndHead;
2494 committed_text = None;
2495 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2496 }
2497
2498 proto::OpenUncommittedDiffResponse {
2499 committed_text,
2500 staged_text,
2501 mode: mode.into(),
2502 }
2503 })
2504 }
2505
2506 pub async fn handle_update_diff_bases(
2507 this: Entity<Self>,
2508 request: TypedEnvelope<proto::UpdateDiffBases>,
2509 mut cx: AsyncApp,
2510 ) -> Result<()> {
2511 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2512 this.update(&mut cx, |this, cx| {
2513 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2514 this.opened_buffers.get_mut(&buffer_id)
2515 {
2516 if let Some(buffer) = buffer.upgrade() {
2517 let buffer = buffer.read(cx).text_snapshot();
2518 diff_state.update(cx, |diff_state, cx| {
2519 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2520 })
2521 }
2522 }
2523 })
2524 }
2525
2526 pub fn reload_buffers(
2527 &self,
2528 buffers: HashSet<Entity<Buffer>>,
2529 push_to_history: bool,
2530 cx: &mut Context<Self>,
2531 ) -> Task<Result<ProjectTransaction>> {
2532 if buffers.is_empty() {
2533 return Task::ready(Ok(ProjectTransaction::default()));
2534 }
2535 match &self.state {
2536 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2537 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2538 }
2539 }
2540
2541 async fn handle_reload_buffers(
2542 this: Entity<Self>,
2543 envelope: TypedEnvelope<proto::ReloadBuffers>,
2544 mut cx: AsyncApp,
2545 ) -> Result<proto::ReloadBuffersResponse> {
2546 let sender_id = envelope.original_sender_id().unwrap_or_default();
2547 let reload = this.update(&mut cx, |this, cx| {
2548 let mut buffers = HashSet::default();
2549 for buffer_id in &envelope.payload.buffer_ids {
2550 let buffer_id = BufferId::new(*buffer_id)?;
2551 buffers.insert(this.get_existing(buffer_id)?);
2552 }
2553 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2554 })??;
2555
2556 let project_transaction = reload.await?;
2557 let project_transaction = this.update(&mut cx, |this, cx| {
2558 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2559 })?;
2560 Ok(proto::ReloadBuffersResponse {
2561 transaction: Some(project_transaction),
2562 })
2563 }
2564
2565 pub fn create_buffer_for_peer(
2566 &mut self,
2567 buffer: &Entity<Buffer>,
2568 peer_id: proto::PeerId,
2569 cx: &mut Context<Self>,
2570 ) -> Task<Result<()>> {
2571 let buffer_id = buffer.read(cx).remote_id();
2572 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2573 if shared_buffers.contains_key(&buffer_id) {
2574 return Task::ready(Ok(()));
2575 }
2576 shared_buffers.insert(
2577 buffer_id,
2578 SharedBuffer {
2579 buffer: buffer.clone(),
2580 diff: None,
2581 lsp_handle: None,
2582 },
2583 );
2584
2585 let Some((client, project_id)) = self.downstream_client.clone() else {
2586 return Task::ready(Ok(()));
2587 };
2588
2589 cx.spawn(|this, mut cx| async move {
2590 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2591 return anyhow::Ok(());
2592 };
2593
2594 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2595 let operations = operations.await;
2596 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2597
2598 let initial_state = proto::CreateBufferForPeer {
2599 project_id,
2600 peer_id: Some(peer_id),
2601 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2602 };
2603
2604 if client.send(initial_state).log_err().is_some() {
2605 let client = client.clone();
2606 cx.background_spawn(async move {
2607 let mut chunks = split_operations(operations).peekable();
2608 while let Some(chunk) = chunks.next() {
2609 let is_last = chunks.peek().is_none();
2610 client.send(proto::CreateBufferForPeer {
2611 project_id,
2612 peer_id: Some(peer_id),
2613 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2614 proto::BufferChunk {
2615 buffer_id: buffer_id.into(),
2616 operations: chunk,
2617 is_last,
2618 },
2619 )),
2620 })?;
2621 }
2622 anyhow::Ok(())
2623 })
2624 .await
2625 .log_err();
2626 }
2627 Ok(())
2628 })
2629 }
2630
2631 pub fn forget_shared_buffers(&mut self) {
2632 self.shared_buffers.clear();
2633 }
2634
2635 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2636 self.shared_buffers.remove(peer_id);
2637 }
2638
2639 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2640 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2641 self.shared_buffers.insert(new_peer_id, buffers);
2642 }
2643 }
2644
2645 pub fn has_shared_buffers(&self) -> bool {
2646 !self.shared_buffers.is_empty()
2647 }
2648
2649 pub fn create_local_buffer(
2650 &mut self,
2651 text: &str,
2652 language: Option<Arc<Language>>,
2653 cx: &mut Context<Self>,
2654 ) -> Entity<Buffer> {
2655 let buffer = cx.new(|cx| {
2656 Buffer::local(text, cx)
2657 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2658 });
2659
2660 self.add_buffer(buffer.clone(), cx).log_err();
2661 let buffer_id = buffer.read(cx).remote_id();
2662
2663 let this = self
2664 .as_local_mut()
2665 .expect("local-only method called in a non-local context");
2666 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2667 this.local_buffer_ids_by_path.insert(
2668 ProjectPath {
2669 worktree_id: file.worktree_id(cx),
2670 path: file.path.clone(),
2671 },
2672 buffer_id,
2673 );
2674
2675 if let Some(entry_id) = file.entry_id {
2676 this.local_buffer_ids_by_entry_id
2677 .insert(entry_id, buffer_id);
2678 }
2679 }
2680 buffer
2681 }
2682
2683 pub fn deserialize_project_transaction(
2684 &mut self,
2685 message: proto::ProjectTransaction,
2686 push_to_history: bool,
2687 cx: &mut Context<Self>,
2688 ) -> Task<Result<ProjectTransaction>> {
2689 if let Some(this) = self.as_remote_mut() {
2690 this.deserialize_project_transaction(message, push_to_history, cx)
2691 } else {
2692 debug_panic!("not a remote buffer store");
2693 Task::ready(Err(anyhow!("not a remote buffer store")))
2694 }
2695 }
2696
2697 pub fn wait_for_remote_buffer(
2698 &mut self,
2699 id: BufferId,
2700 cx: &mut Context<BufferStore>,
2701 ) -> Task<Result<Entity<Buffer>>> {
2702 if let Some(this) = self.as_remote_mut() {
2703 this.wait_for_remote_buffer(id, cx)
2704 } else {
2705 debug_panic!("not a remote buffer store");
2706 Task::ready(Err(anyhow!("not a remote buffer store")))
2707 }
2708 }
2709
2710 pub fn serialize_project_transaction_for_peer(
2711 &mut self,
2712 project_transaction: ProjectTransaction,
2713 peer_id: proto::PeerId,
2714 cx: &mut Context<Self>,
2715 ) -> proto::ProjectTransaction {
2716 let mut serialized_transaction = proto::ProjectTransaction {
2717 buffer_ids: Default::default(),
2718 transactions: Default::default(),
2719 };
2720 for (buffer, transaction) in project_transaction.0 {
2721 self.create_buffer_for_peer(&buffer, peer_id, cx)
2722 .detach_and_log_err(cx);
2723 serialized_transaction
2724 .buffer_ids
2725 .push(buffer.read(cx).remote_id().into());
2726 serialized_transaction
2727 .transactions
2728 .push(language::proto::serialize_transaction(&transaction));
2729 }
2730 serialized_transaction
2731 }
2732}
2733
2734impl OpenBuffer {
2735 fn upgrade(&self) -> Option<Entity<Buffer>> {
2736 match self {
2737 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2738 OpenBuffer::Operations(_) => None,
2739 }
2740 }
2741}
2742
2743fn is_not_found_error(error: &anyhow::Error) -> bool {
2744 error
2745 .root_cause()
2746 .downcast_ref::<io::Error>()
2747 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2748}
2749
2750fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2751 let Some(blame) = blame else {
2752 return proto::BlameBufferResponse {
2753 blame_response: None,
2754 };
2755 };
2756
2757 let entries = blame
2758 .entries
2759 .into_iter()
2760 .map(|entry| proto::BlameEntry {
2761 sha: entry.sha.as_bytes().into(),
2762 start_line: entry.range.start,
2763 end_line: entry.range.end,
2764 original_line_number: entry.original_line_number,
2765 author: entry.author.clone(),
2766 author_mail: entry.author_mail.clone(),
2767 author_time: entry.author_time,
2768 author_tz: entry.author_tz.clone(),
2769 committer: entry.committer_name.clone(),
2770 committer_mail: entry.committer_email.clone(),
2771 committer_time: entry.committer_time,
2772 committer_tz: entry.committer_tz.clone(),
2773 summary: entry.summary.clone(),
2774 previous: entry.previous.clone(),
2775 filename: entry.filename.clone(),
2776 })
2777 .collect::<Vec<_>>();
2778
2779 let messages = blame
2780 .messages
2781 .into_iter()
2782 .map(|(oid, message)| proto::CommitMessage {
2783 oid: oid.as_bytes().into(),
2784 message,
2785 })
2786 .collect::<Vec<_>>();
2787
2788 let permalinks = blame
2789 .permalinks
2790 .into_iter()
2791 .map(|(oid, url)| proto::CommitPermalink {
2792 oid: oid.as_bytes().into(),
2793 permalink: url.to_string(),
2794 })
2795 .collect::<Vec<_>>();
2796
2797 proto::BlameBufferResponse {
2798 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2799 entries,
2800 messages,
2801 permalinks,
2802 remote_url: blame.remote_url,
2803 }),
2804 }
2805}
2806
2807fn deserialize_blame_buffer_response(
2808 response: proto::BlameBufferResponse,
2809) -> Option<git::blame::Blame> {
2810 let response = response.blame_response?;
2811 let entries = response
2812 .entries
2813 .into_iter()
2814 .filter_map(|entry| {
2815 Some(git::blame::BlameEntry {
2816 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2817 range: entry.start_line..entry.end_line,
2818 original_line_number: entry.original_line_number,
2819 committer_name: entry.committer,
2820 committer_time: entry.committer_time,
2821 committer_tz: entry.committer_tz,
2822 committer_email: entry.committer_mail,
2823 author: entry.author,
2824 author_mail: entry.author_mail,
2825 author_time: entry.author_time,
2826 author_tz: entry.author_tz,
2827 summary: entry.summary,
2828 previous: entry.previous,
2829 filename: entry.filename,
2830 })
2831 })
2832 .collect::<Vec<_>>();
2833
2834 let messages = response
2835 .messages
2836 .into_iter()
2837 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2838 .collect::<HashMap<_, _>>();
2839
2840 let permalinks = response
2841 .permalinks
2842 .into_iter()
2843 .filter_map(|permalink| {
2844 Some((
2845 git::Oid::from_bytes(&permalink.oid).ok()?,
2846 Url::from_str(&permalink.permalink).ok()?,
2847 ))
2848 })
2849 .collect::<HashMap<_, _>>();
2850
2851 Some(Blame {
2852 entries,
2853 permalinks,
2854 messages,
2855 remote_url: response.remote_url,
2856 })
2857}
2858
2859fn get_permalink_in_rust_registry_src(
2860 provider_registry: Arc<GitHostingProviderRegistry>,
2861 path: PathBuf,
2862 selection: Range<u32>,
2863) -> Result<url::Url> {
2864 #[derive(Deserialize)]
2865 struct CargoVcsGit {
2866 sha1: String,
2867 }
2868
2869 #[derive(Deserialize)]
2870 struct CargoVcsInfo {
2871 git: CargoVcsGit,
2872 path_in_vcs: String,
2873 }
2874
2875 #[derive(Deserialize)]
2876 struct CargoPackage {
2877 repository: String,
2878 }
2879
2880 #[derive(Deserialize)]
2881 struct CargoToml {
2882 package: CargoPackage,
2883 }
2884
2885 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2886 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2887 Some((dir, json))
2888 }) else {
2889 bail!("No .cargo_vcs_info.json found in parent directories")
2890 };
2891 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2892 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2893 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2894 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2895 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2896 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2897 let permalink = provider.build_permalink(
2898 remote,
2899 BuildPermalinkParams {
2900 sha: &cargo_vcs_info.git.sha1,
2901 path: &path.to_string_lossy(),
2902 selection: Some(selection),
2903 },
2904 );
2905 Ok(permalink)
2906}