1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use buffer_diff::BufferDiff;
10use client::Client;
11use collections::{hash_map, HashMap, HashSet};
12use fs::Fs;
13use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
14use git::{blame::Blame, repository::RepoPath};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{
27 proto::{self, ToProto},
28 AnyProtoClient, ErrorExt as _, TypedEnvelope,
29};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::BufferId;
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum DiffKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_diffs:
58 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
59 worktree_store: Entity<WorktreeStore>,
60 opened_buffers: HashMap<BufferId, OpenBuffer>,
61 downstream_client: Option<(AnyProtoClient, u64)>,
62 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
63}
64
65#[derive(Hash, Eq, PartialEq, Clone)]
66struct SharedBuffer {
67 buffer: Entity<Buffer>,
68 diff: Option<Entity<BufferDiff>>,
69 lsp_handle: Option<OpenLspBufferHandle>,
70}
71
72#[derive(Default)]
73struct BufferDiffState {
74 unstaged_diff: Option<WeakEntity<BufferDiff>>,
75 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
76 recalculate_diff_task: Option<Task<Result<()>>>,
77 language: Option<Arc<Language>>,
78 language_registry: Option<Arc<LanguageRegistry>>,
79 diff_updated_futures: Vec<oneshot::Sender<()>>,
80
81 head_text: Option<Arc<String>>,
82 index_text: Option<Arc<String>>,
83 head_changed: bool,
84 index_changed: bool,
85 language_changed: bool,
86}
87
88#[derive(Clone, Debug)]
89enum DiffBasesChange {
90 SetIndex(Option<String>),
91 SetHead(Option<String>),
92 SetEach {
93 index: Option<String>,
94 head: Option<String>,
95 },
96 SetBoth(Option<String>),
97}
98
99impl BufferDiffState {
100 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
101 self.language = buffer.read(cx).language().cloned();
102 self.language_changed = true;
103 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
104 }
105
106 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
107 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
108 }
109
110 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
111 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
112 }
113
114 fn handle_base_texts_updated(
115 &mut self,
116 buffer: text::BufferSnapshot,
117 message: proto::UpdateDiffBases,
118 cx: &mut Context<Self>,
119 ) {
120 use proto::update_diff_bases::Mode;
121
122 let Some(mode) = Mode::from_i32(message.mode) else {
123 return;
124 };
125
126 let diff_bases_change = match mode {
127 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
128 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
129 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
130 Mode::IndexAndHead => DiffBasesChange::SetEach {
131 index: message.staged_text,
132 head: message.committed_text,
133 },
134 };
135
136 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
137 }
138
139 pub fn wait_for_recalculation(&mut self) -> Option<oneshot::Receiver<()>> {
140 if self.diff_updated_futures.is_empty() {
141 return None;
142 }
143 let (tx, rx) = oneshot::channel();
144 self.diff_updated_futures.push(tx);
145 Some(rx)
146 }
147
148 fn diff_bases_changed(
149 &mut self,
150 buffer: text::BufferSnapshot,
151 diff_bases_change: DiffBasesChange,
152 cx: &mut Context<Self>,
153 ) -> oneshot::Receiver<()> {
154 match diff_bases_change {
155 DiffBasesChange::SetIndex(index) => {
156 self.index_text = index.map(|mut index| {
157 text::LineEnding::normalize(&mut index);
158 Arc::new(index)
159 });
160 self.index_changed = true;
161 }
162 DiffBasesChange::SetHead(head) => {
163 self.head_text = head.map(|mut head| {
164 text::LineEnding::normalize(&mut head);
165 Arc::new(head)
166 });
167 self.head_changed = true;
168 }
169 DiffBasesChange::SetBoth(text) => {
170 let text = text.map(|mut text| {
171 text::LineEnding::normalize(&mut text);
172 Arc::new(text)
173 });
174 self.head_text = text.clone();
175 self.index_text = text;
176 self.head_changed = true;
177 self.index_changed = true;
178 }
179 DiffBasesChange::SetEach { index, head } => {
180 self.index_text = index.map(|mut index| {
181 text::LineEnding::normalize(&mut index);
182 Arc::new(index)
183 });
184 self.index_changed = true;
185 self.head_text = head.map(|mut head| {
186 text::LineEnding::normalize(&mut head);
187 Arc::new(head)
188 });
189 self.head_changed = true;
190 }
191 }
192
193 self.recalculate_diffs(buffer, cx)
194 }
195
196 fn recalculate_diffs(
197 &mut self,
198 buffer: text::BufferSnapshot,
199 cx: &mut Context<Self>,
200 ) -> oneshot::Receiver<()> {
201 log::debug!("recalculate diffs");
202 let (tx, rx) = oneshot::channel();
203 self.diff_updated_futures.push(tx);
204
205 let language = self.language.clone();
206 let language_registry = self.language_registry.clone();
207 let unstaged_diff = self.unstaged_diff();
208 let uncommitted_diff = self.uncommitted_diff();
209 let head = self.head_text.clone();
210 let index = self.index_text.clone();
211 let index_changed = self.index_changed;
212 let head_changed = self.head_changed;
213 let language_changed = self.language_changed;
214 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
215 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
216 (None, None) => true,
217 _ => false,
218 };
219 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
220 let mut new_unstaged_diff = None;
221 if let Some(unstaged_diff) = &unstaged_diff {
222 new_unstaged_diff = Some(
223 BufferDiff::update_diff(
224 unstaged_diff.clone(),
225 buffer.clone(),
226 index,
227 index_changed,
228 language_changed,
229 language.clone(),
230 language_registry.clone(),
231 &mut cx,
232 )
233 .await?,
234 );
235 }
236
237 let mut new_uncommitted_diff = None;
238 if let Some(uncommitted_diff) = &uncommitted_diff {
239 new_uncommitted_diff = if index_matches_head {
240 new_unstaged_diff.clone()
241 } else {
242 Some(
243 BufferDiff::update_diff(
244 uncommitted_diff.clone(),
245 buffer.clone(),
246 head,
247 head_changed,
248 language_changed,
249 language.clone(),
250 language_registry.clone(),
251 &mut cx,
252 )
253 .await?,
254 )
255 }
256 }
257
258 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
259 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
260 {
261 unstaged_diff.update(&mut cx, |diff, cx| {
262 diff.set_snapshot(&buffer, new_unstaged_diff, language_changed, None, cx)
263 })?
264 } else {
265 None
266 };
267
268 if let Some((uncommitted_diff, new_uncommitted_diff)) =
269 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
270 {
271 uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| {
272 uncommitted_diff.set_snapshot(
273 &buffer,
274 new_uncommitted_diff,
275 language_changed,
276 unstaged_changed_range,
277 cx,
278 );
279 })?;
280 }
281
282 if let Some(this) = this.upgrade() {
283 this.update(&mut cx, |this, _| {
284 this.index_changed = false;
285 this.head_changed = false;
286 this.language_changed = false;
287 for tx in this.diff_updated_futures.drain(..) {
288 tx.send(()).ok();
289 }
290 })?;
291 }
292
293 Ok(())
294 }));
295
296 rx
297 }
298}
299
300enum BufferStoreState {
301 Local(LocalBufferStore),
302 Remote(RemoteBufferStore),
303}
304
305struct RemoteBufferStore {
306 shared_with_me: HashSet<Entity<Buffer>>,
307 upstream_client: AnyProtoClient,
308 project_id: u64,
309 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
310 remote_buffer_listeners:
311 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
312 worktree_store: Entity<WorktreeStore>,
313}
314
315struct LocalBufferStore {
316 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
317 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
318 worktree_store: Entity<WorktreeStore>,
319 _subscription: Subscription,
320}
321
322enum OpenBuffer {
323 Complete {
324 buffer: WeakEntity<Buffer>,
325 diff_state: Entity<BufferDiffState>,
326 },
327 Operations(Vec<Operation>),
328}
329
330pub enum BufferStoreEvent {
331 BufferAdded(Entity<Buffer>),
332 BufferDiffAdded(Entity<BufferDiff>),
333 BufferDropped(BufferId),
334 BufferChangedFilePath {
335 buffer: Entity<Buffer>,
336 old_file: Option<Arc<dyn language::File>>,
337 },
338}
339
340#[derive(Default, Debug)]
341pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
342
343impl EventEmitter<BufferStoreEvent> for BufferStore {}
344
345impl RemoteBufferStore {
346 fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
347 let project_id = self.project_id;
348 let client = self.upstream_client.clone();
349 cx.background_spawn(async move {
350 let response = client
351 .request(proto::OpenUnstagedDiff {
352 project_id,
353 buffer_id: buffer_id.to_proto(),
354 })
355 .await?;
356 Ok(response.staged_text)
357 })
358 }
359
360 fn open_uncommitted_diff(
361 &self,
362 buffer_id: BufferId,
363 cx: &App,
364 ) -> Task<Result<DiffBasesChange>> {
365 use proto::open_uncommitted_diff_response::Mode;
366
367 let project_id = self.project_id;
368 let client = self.upstream_client.clone();
369 cx.background_spawn(async move {
370 let response = client
371 .request(proto::OpenUncommittedDiff {
372 project_id,
373 buffer_id: buffer_id.to_proto(),
374 })
375 .await?;
376 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
377 let bases = match mode {
378 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
379 Mode::IndexAndHead => DiffBasesChange::SetEach {
380 head: response.committed_text,
381 index: response.staged_text,
382 },
383 };
384 Ok(bases)
385 })
386 }
387
388 pub fn wait_for_remote_buffer(
389 &mut self,
390 id: BufferId,
391 cx: &mut Context<BufferStore>,
392 ) -> Task<Result<Entity<Buffer>>> {
393 let (tx, rx) = oneshot::channel();
394 self.remote_buffer_listeners.entry(id).or_default().push(tx);
395
396 cx.spawn(|this, cx| async move {
397 if let Some(buffer) = this
398 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
399 .ok()
400 .flatten()
401 {
402 return Ok(buffer);
403 }
404
405 cx.background_spawn(async move { rx.await? }).await
406 })
407 }
408
409 fn save_remote_buffer(
410 &self,
411 buffer_handle: Entity<Buffer>,
412 new_path: Option<proto::ProjectPath>,
413 cx: &Context<BufferStore>,
414 ) -> Task<Result<()>> {
415 let buffer = buffer_handle.read(cx);
416 let buffer_id = buffer.remote_id().into();
417 let version = buffer.version();
418 let rpc = self.upstream_client.clone();
419 let project_id = self.project_id;
420 cx.spawn(move |_, mut cx| async move {
421 let response = rpc
422 .request(proto::SaveBuffer {
423 project_id,
424 buffer_id,
425 new_path,
426 version: serialize_version(&version),
427 })
428 .await?;
429 let version = deserialize_version(&response.version);
430 let mtime = response.mtime.map(|mtime| mtime.into());
431
432 buffer_handle.update(&mut cx, |buffer, cx| {
433 buffer.did_save(version.clone(), mtime, cx);
434 })?;
435
436 Ok(())
437 })
438 }
439
440 pub fn handle_create_buffer_for_peer(
441 &mut self,
442 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
443 replica_id: u16,
444 capability: Capability,
445 cx: &mut Context<BufferStore>,
446 ) -> Result<Option<Entity<Buffer>>> {
447 match envelope
448 .payload
449 .variant
450 .ok_or_else(|| anyhow!("missing variant"))?
451 {
452 proto::create_buffer_for_peer::Variant::State(mut state) => {
453 let buffer_id = BufferId::new(state.id)?;
454
455 let buffer_result = maybe!({
456 let mut buffer_file = None;
457 if let Some(file) = state.file.take() {
458 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
459 let worktree = self
460 .worktree_store
461 .read(cx)
462 .worktree_for_id(worktree_id, cx)
463 .ok_or_else(|| {
464 anyhow!("no worktree found for id {}", file.worktree_id)
465 })?;
466 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
467 as Arc<dyn language::File>);
468 }
469 Buffer::from_proto(replica_id, capability, state, buffer_file)
470 });
471
472 match buffer_result {
473 Ok(buffer) => {
474 let buffer = cx.new(|_| buffer);
475 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
476 }
477 Err(error) => {
478 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
479 for listener in listeners {
480 listener.send(Err(anyhow!(error.cloned()))).ok();
481 }
482 }
483 }
484 }
485 }
486 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
487 let buffer_id = BufferId::new(chunk.buffer_id)?;
488 let buffer = self
489 .loading_remote_buffers_by_id
490 .get(&buffer_id)
491 .cloned()
492 .ok_or_else(|| {
493 anyhow!(
494 "received chunk for buffer {} without initial state",
495 chunk.buffer_id
496 )
497 })?;
498
499 let result = maybe!({
500 let operations = chunk
501 .operations
502 .into_iter()
503 .map(language::proto::deserialize_operation)
504 .collect::<Result<Vec<_>>>()?;
505 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
506 anyhow::Ok(())
507 });
508
509 if let Err(error) = result {
510 self.loading_remote_buffers_by_id.remove(&buffer_id);
511 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
512 for listener in listeners {
513 listener.send(Err(error.cloned())).ok();
514 }
515 }
516 } else if chunk.is_last {
517 self.loading_remote_buffers_by_id.remove(&buffer_id);
518 if self.upstream_client.is_via_collab() {
519 // retain buffers sent by peers to avoid races.
520 self.shared_with_me.insert(buffer.clone());
521 }
522
523 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
524 for sender in senders {
525 sender.send(Ok(buffer.clone())).ok();
526 }
527 }
528 return Ok(Some(buffer));
529 }
530 }
531 }
532 return Ok(None);
533 }
534
535 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
536 self.loading_remote_buffers_by_id
537 .keys()
538 .copied()
539 .collect::<Vec<_>>()
540 }
541
542 pub fn deserialize_project_transaction(
543 &self,
544 message: proto::ProjectTransaction,
545 push_to_history: bool,
546 cx: &mut Context<BufferStore>,
547 ) -> Task<Result<ProjectTransaction>> {
548 cx.spawn(|this, mut cx| async move {
549 let mut project_transaction = ProjectTransaction::default();
550 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
551 {
552 let buffer_id = BufferId::new(buffer_id)?;
553 let buffer = this
554 .update(&mut cx, |this, cx| {
555 this.wait_for_remote_buffer(buffer_id, cx)
556 })?
557 .await?;
558 let transaction = language::proto::deserialize_transaction(transaction)?;
559 project_transaction.0.insert(buffer, transaction);
560 }
561
562 for (buffer, transaction) in &project_transaction.0 {
563 buffer
564 .update(&mut cx, |buffer, _| {
565 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
566 })?
567 .await?;
568
569 if push_to_history {
570 buffer.update(&mut cx, |buffer, _| {
571 buffer.push_transaction(transaction.clone(), Instant::now());
572 })?;
573 }
574 }
575
576 Ok(project_transaction)
577 })
578 }
579
580 fn open_buffer(
581 &self,
582 path: Arc<Path>,
583 worktree: Entity<Worktree>,
584 cx: &mut Context<BufferStore>,
585 ) -> Task<Result<Entity<Buffer>>> {
586 let worktree_id = worktree.read(cx).id().to_proto();
587 let project_id = self.project_id;
588 let client = self.upstream_client.clone();
589 cx.spawn(move |this, mut cx| async move {
590 let response = client
591 .request(proto::OpenBufferByPath {
592 project_id,
593 worktree_id,
594 path: path.to_proto(),
595 })
596 .await?;
597 let buffer_id = BufferId::new(response.buffer_id)?;
598
599 let buffer = this
600 .update(&mut cx, {
601 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
602 })?
603 .await?;
604
605 Ok(buffer)
606 })
607 }
608
609 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
610 let create = self.upstream_client.request(proto::OpenNewBuffer {
611 project_id: self.project_id,
612 });
613 cx.spawn(|this, mut cx| async move {
614 let response = create.await?;
615 let buffer_id = BufferId::new(response.buffer_id)?;
616
617 this.update(&mut cx, |this, cx| {
618 this.wait_for_remote_buffer(buffer_id, cx)
619 })?
620 .await
621 })
622 }
623
624 fn reload_buffers(
625 &self,
626 buffers: HashSet<Entity<Buffer>>,
627 push_to_history: bool,
628 cx: &mut Context<BufferStore>,
629 ) -> Task<Result<ProjectTransaction>> {
630 let request = self.upstream_client.request(proto::ReloadBuffers {
631 project_id: self.project_id,
632 buffer_ids: buffers
633 .iter()
634 .map(|buffer| buffer.read(cx).remote_id().to_proto())
635 .collect(),
636 });
637
638 cx.spawn(|this, mut cx| async move {
639 let response = request
640 .await?
641 .transaction
642 .ok_or_else(|| anyhow!("missing transaction"))?;
643 this.update(&mut cx, |this, cx| {
644 this.deserialize_project_transaction(response, push_to_history, cx)
645 })?
646 .await
647 })
648 }
649}
650
651impl LocalBufferStore {
652 fn worktree_for_buffer(
653 &self,
654 buffer: &Entity<Buffer>,
655 cx: &App,
656 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
657 let file = buffer.read(cx).file()?;
658 let worktree_id = file.worktree_id(cx);
659 let path = file.path().clone();
660 let worktree = self
661 .worktree_store
662 .read(cx)
663 .worktree_for_id(worktree_id, cx)?;
664 Some((worktree, path))
665 }
666
667 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
668 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
669 worktree.read(cx).load_staged_file(path.as_ref(), cx)
670 } else {
671 return Task::ready(Err(anyhow!("no such worktree")));
672 }
673 }
674
675 fn load_committed_text(
676 &self,
677 buffer: &Entity<Buffer>,
678 cx: &App,
679 ) -> Task<Result<Option<String>>> {
680 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
681 worktree.read(cx).load_committed_file(path.as_ref(), cx)
682 } else {
683 Task::ready(Err(anyhow!("no such worktree")))
684 }
685 }
686
687 fn save_local_buffer(
688 &self,
689 buffer_handle: Entity<Buffer>,
690 worktree: Entity<Worktree>,
691 path: Arc<Path>,
692 mut has_changed_file: bool,
693 cx: &mut Context<BufferStore>,
694 ) -> Task<Result<()>> {
695 let buffer = buffer_handle.read(cx);
696
697 let text = buffer.as_rope().clone();
698 let line_ending = buffer.line_ending();
699 let version = buffer.version();
700 let buffer_id = buffer.remote_id();
701 if buffer
702 .file()
703 .is_some_and(|file| file.disk_state() == DiskState::New)
704 {
705 has_changed_file = true;
706 }
707
708 let save = worktree.update(cx, |worktree, cx| {
709 worktree.write_file(path.as_ref(), text, line_ending, cx)
710 });
711
712 cx.spawn(move |this, mut cx| async move {
713 let new_file = save.await?;
714 let mtime = new_file.disk_state().mtime();
715 this.update(&mut cx, |this, cx| {
716 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
717 if has_changed_file {
718 downstream_client
719 .send(proto::UpdateBufferFile {
720 project_id,
721 buffer_id: buffer_id.to_proto(),
722 file: Some(language::File::to_proto(&*new_file, cx)),
723 })
724 .log_err();
725 }
726 downstream_client
727 .send(proto::BufferSaved {
728 project_id,
729 buffer_id: buffer_id.to_proto(),
730 version: serialize_version(&version),
731 mtime: mtime.map(|time| time.into()),
732 })
733 .log_err();
734 }
735 })?;
736 buffer_handle.update(&mut cx, |buffer, cx| {
737 if has_changed_file {
738 buffer.file_updated(new_file, cx);
739 }
740 buffer.did_save(version.clone(), mtime, cx);
741 })
742 })
743 }
744
745 fn subscribe_to_worktree(
746 &mut self,
747 worktree: &Entity<Worktree>,
748 cx: &mut Context<BufferStore>,
749 ) {
750 cx.subscribe(worktree, |this, worktree, event, cx| {
751 if worktree.read(cx).is_local() {
752 match event {
753 worktree::Event::UpdatedEntries(changes) => {
754 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
755 }
756 worktree::Event::UpdatedGitRepositories(updated_repos) => {
757 Self::local_worktree_git_repos_changed(
758 this,
759 worktree.clone(),
760 updated_repos,
761 cx,
762 )
763 }
764 _ => {}
765 }
766 }
767 })
768 .detach();
769 }
770
771 fn local_worktree_entries_changed(
772 this: &mut BufferStore,
773 worktree_handle: &Entity<Worktree>,
774 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
775 cx: &mut Context<BufferStore>,
776 ) {
777 let snapshot = worktree_handle.read(cx).snapshot();
778 for (path, entry_id, _) in changes {
779 Self::local_worktree_entry_changed(
780 this,
781 *entry_id,
782 path,
783 worktree_handle,
784 &snapshot,
785 cx,
786 );
787 }
788 }
789
790 fn local_worktree_git_repos_changed(
791 this: &mut BufferStore,
792 worktree_handle: Entity<Worktree>,
793 changed_repos: &UpdatedGitRepositoriesSet,
794 cx: &mut Context<BufferStore>,
795 ) {
796 debug_assert!(worktree_handle.read(cx).is_local());
797
798 let mut diff_state_updates = Vec::new();
799 for buffer in this.opened_buffers.values() {
800 let OpenBuffer::Complete { buffer, diff_state } = buffer else {
801 continue;
802 };
803 let Some(buffer) = buffer.upgrade() else {
804 continue;
805 };
806 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
807 continue;
808 };
809 if file.worktree != worktree_handle {
810 continue;
811 }
812 let diff_state = diff_state.read(cx);
813 if changed_repos
814 .iter()
815 .any(|(work_dir, _)| file.path.starts_with(work_dir))
816 {
817 let has_unstaged_diff = diff_state
818 .unstaged_diff
819 .as_ref()
820 .is_some_and(|diff| diff.is_upgradable());
821 let has_uncommitted_diff = diff_state
822 .uncommitted_diff
823 .as_ref()
824 .is_some_and(|set| set.is_upgradable());
825 diff_state_updates.push((
826 buffer,
827 file.path.clone(),
828 has_unstaged_diff.then(|| diff_state.index_text.clone()),
829 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
830 ));
831 }
832 }
833
834 if diff_state_updates.is_empty() {
835 return;
836 }
837
838 cx.spawn(move |this, mut cx| async move {
839 let snapshot =
840 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
841 let diff_bases_changes_by_buffer = cx
842 .background_spawn(async move {
843 diff_state_updates
844 .into_iter()
845 .filter_map(|(buffer, path, current_index_text, current_head_text)| {
846 let local_repo = snapshot.local_repo_for_path(&path)?;
847 let relative_path = local_repo.relativize(&path).ok()?;
848 let index_text = if current_index_text.is_some() {
849 local_repo.repo().load_index_text(&relative_path)
850 } else {
851 None
852 };
853 let head_text = if current_head_text.is_some() {
854 local_repo.repo().load_committed_text(&relative_path)
855 } else {
856 None
857 };
858
859 // Avoid triggering a diff update if the base text has not changed.
860 if let Some((current_index, current_head)) =
861 current_index_text.as_ref().zip(current_head_text.as_ref())
862 {
863 if current_index.as_deref() == index_text.as_ref()
864 && current_head.as_deref() == head_text.as_ref()
865 {
866 return None;
867 }
868 }
869
870 let diff_bases_change =
871 match (current_index_text.is_some(), current_head_text.is_some()) {
872 (true, true) => Some(if index_text == head_text {
873 DiffBasesChange::SetBoth(head_text)
874 } else {
875 DiffBasesChange::SetEach {
876 index: index_text,
877 head: head_text,
878 }
879 }),
880 (true, false) => Some(DiffBasesChange::SetIndex(index_text)),
881 (false, true) => Some(DiffBasesChange::SetHead(head_text)),
882 (false, false) => None,
883 };
884
885 Some((buffer, diff_bases_change))
886 })
887 .collect::<Vec<_>>()
888 })
889 .await;
890
891 this.update(&mut cx, |this, cx| {
892 for (buffer, diff_bases_change) in diff_bases_changes_by_buffer {
893 let Some(OpenBuffer::Complete { diff_state, .. }) =
894 this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
895 else {
896 continue;
897 };
898 let Some(diff_bases_change) = diff_bases_change else {
899 continue;
900 };
901
902 diff_state.update(cx, |diff_state, cx| {
903 use proto::update_diff_bases::Mode;
904
905 let buffer = buffer.read(cx);
906 if let Some((client, project_id)) = this.downstream_client.as_ref() {
907 let buffer_id = buffer.remote_id().to_proto();
908 let (staged_text, committed_text, mode) = match diff_bases_change
909 .clone()
910 {
911 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
912 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
913 DiffBasesChange::SetEach { index, head } => {
914 (index, head, Mode::IndexAndHead)
915 }
916 DiffBasesChange::SetBoth(text) => {
917 (None, text, Mode::IndexMatchesHead)
918 }
919 };
920 let message = proto::UpdateDiffBases {
921 project_id: *project_id,
922 buffer_id,
923 staged_text,
924 committed_text,
925 mode: mode as i32,
926 };
927
928 client.send(message).log_err();
929 }
930
931 let _ = diff_state.diff_bases_changed(
932 buffer.text_snapshot(),
933 diff_bases_change,
934 cx,
935 );
936 });
937 }
938 })
939 })
940 .detach_and_log_err(cx);
941 }
942
943 fn local_worktree_entry_changed(
944 this: &mut BufferStore,
945 entry_id: ProjectEntryId,
946 path: &Arc<Path>,
947 worktree: &Entity<worktree::Worktree>,
948 snapshot: &worktree::Snapshot,
949 cx: &mut Context<BufferStore>,
950 ) -> Option<()> {
951 let project_path = ProjectPath {
952 worktree_id: snapshot.id(),
953 path: path.clone(),
954 };
955
956 let buffer_id = {
957 let local = this.as_local_mut()?;
958 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
959 Some(&buffer_id) => buffer_id,
960 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
961 }
962 };
963
964 let buffer = if let Some(buffer) = this.get(buffer_id) {
965 Some(buffer)
966 } else {
967 this.opened_buffers.remove(&buffer_id);
968 None
969 };
970
971 let buffer = if let Some(buffer) = buffer {
972 buffer
973 } else {
974 let this = this.as_local_mut()?;
975 this.local_buffer_ids_by_path.remove(&project_path);
976 this.local_buffer_ids_by_entry_id.remove(&entry_id);
977 return None;
978 };
979
980 let events = buffer.update(cx, |buffer, cx| {
981 let local = this.as_local_mut()?;
982 let file = buffer.file()?;
983 let old_file = File::from_dyn(Some(file))?;
984 if old_file.worktree != *worktree {
985 return None;
986 }
987
988 let snapshot_entry = old_file
989 .entry_id
990 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
991 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
992
993 let new_file = if let Some(entry) = snapshot_entry {
994 File {
995 disk_state: match entry.mtime {
996 Some(mtime) => DiskState::Present { mtime },
997 None => old_file.disk_state,
998 },
999 is_local: true,
1000 entry_id: Some(entry.id),
1001 path: entry.path.clone(),
1002 worktree: worktree.clone(),
1003 is_private: entry.is_private,
1004 }
1005 } else {
1006 File {
1007 disk_state: DiskState::Deleted,
1008 is_local: true,
1009 entry_id: old_file.entry_id,
1010 path: old_file.path.clone(),
1011 worktree: worktree.clone(),
1012 is_private: old_file.is_private,
1013 }
1014 };
1015
1016 if new_file == *old_file {
1017 return None;
1018 }
1019
1020 let mut events = Vec::new();
1021 if new_file.path != old_file.path {
1022 local.local_buffer_ids_by_path.remove(&ProjectPath {
1023 path: old_file.path.clone(),
1024 worktree_id: old_file.worktree_id(cx),
1025 });
1026 local.local_buffer_ids_by_path.insert(
1027 ProjectPath {
1028 worktree_id: new_file.worktree_id(cx),
1029 path: new_file.path.clone(),
1030 },
1031 buffer_id,
1032 );
1033 events.push(BufferStoreEvent::BufferChangedFilePath {
1034 buffer: cx.entity(),
1035 old_file: buffer.file().cloned(),
1036 });
1037 }
1038
1039 if new_file.entry_id != old_file.entry_id {
1040 if let Some(entry_id) = old_file.entry_id {
1041 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1042 }
1043 if let Some(entry_id) = new_file.entry_id {
1044 local
1045 .local_buffer_ids_by_entry_id
1046 .insert(entry_id, buffer_id);
1047 }
1048 }
1049
1050 if let Some((client, project_id)) = &this.downstream_client {
1051 client
1052 .send(proto::UpdateBufferFile {
1053 project_id: *project_id,
1054 buffer_id: buffer_id.to_proto(),
1055 file: Some(new_file.to_proto(cx)),
1056 })
1057 .ok();
1058 }
1059
1060 buffer.file_updated(Arc::new(new_file), cx);
1061 Some(events)
1062 })?;
1063
1064 for event in events {
1065 cx.emit(event);
1066 }
1067
1068 None
1069 }
1070
1071 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1072 let file = File::from_dyn(buffer.read(cx).file())?;
1073
1074 let remote_id = buffer.read(cx).remote_id();
1075 if let Some(entry_id) = file.entry_id {
1076 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1077 Some(_) => {
1078 return None;
1079 }
1080 None => {
1081 self.local_buffer_ids_by_entry_id
1082 .insert(entry_id, remote_id);
1083 }
1084 }
1085 };
1086 self.local_buffer_ids_by_path.insert(
1087 ProjectPath {
1088 worktree_id: file.worktree_id(cx),
1089 path: file.path.clone(),
1090 },
1091 remote_id,
1092 );
1093
1094 Some(())
1095 }
1096
1097 fn save_buffer(
1098 &self,
1099 buffer: Entity<Buffer>,
1100 cx: &mut Context<BufferStore>,
1101 ) -> Task<Result<()>> {
1102 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1103 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1104 };
1105 let worktree = file.worktree.clone();
1106 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1107 }
1108
1109 fn save_buffer_as(
1110 &self,
1111 buffer: Entity<Buffer>,
1112 path: ProjectPath,
1113 cx: &mut Context<BufferStore>,
1114 ) -> Task<Result<()>> {
1115 let Some(worktree) = self
1116 .worktree_store
1117 .read(cx)
1118 .worktree_for_id(path.worktree_id, cx)
1119 else {
1120 return Task::ready(Err(anyhow!("no such worktree")));
1121 };
1122 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1123 }
1124
1125 fn open_buffer(
1126 &self,
1127 path: Arc<Path>,
1128 worktree: Entity<Worktree>,
1129 cx: &mut Context<BufferStore>,
1130 ) -> Task<Result<Entity<Buffer>>> {
1131 let load_buffer = worktree.update(cx, |worktree, cx| {
1132 let load_file = worktree.load_file(path.as_ref(), cx);
1133 let reservation = cx.reserve_entity();
1134 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1135 cx.spawn(move |_, mut cx| async move {
1136 let loaded = load_file.await?;
1137 let text_buffer = cx
1138 .background_spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1139 .await;
1140 cx.insert_entity(reservation, |_| {
1141 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1142 })
1143 })
1144 });
1145
1146 cx.spawn(move |this, mut cx| async move {
1147 let buffer = match load_buffer.await {
1148 Ok(buffer) => Ok(buffer),
1149 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1150 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1151 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1152 Buffer::build(
1153 text_buffer,
1154 Some(Arc::new(File {
1155 worktree,
1156 path,
1157 disk_state: DiskState::New,
1158 entry_id: None,
1159 is_local: true,
1160 is_private: false,
1161 })),
1162 Capability::ReadWrite,
1163 )
1164 }),
1165 Err(e) => Err(e),
1166 }?;
1167 this.update(&mut cx, |this, cx| {
1168 this.add_buffer(buffer.clone(), cx)?;
1169 let buffer_id = buffer.read(cx).remote_id();
1170 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1171 let this = this.as_local_mut().unwrap();
1172 this.local_buffer_ids_by_path.insert(
1173 ProjectPath {
1174 worktree_id: file.worktree_id(cx),
1175 path: file.path.clone(),
1176 },
1177 buffer_id,
1178 );
1179
1180 if let Some(entry_id) = file.entry_id {
1181 this.local_buffer_ids_by_entry_id
1182 .insert(entry_id, buffer_id);
1183 }
1184 }
1185
1186 anyhow::Ok(())
1187 })??;
1188
1189 Ok(buffer)
1190 })
1191 }
1192
1193 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1194 cx.spawn(|buffer_store, mut cx| async move {
1195 let buffer =
1196 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1197 buffer_store.update(&mut cx, |buffer_store, cx| {
1198 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1199 })?;
1200 Ok(buffer)
1201 })
1202 }
1203
1204 fn reload_buffers(
1205 &self,
1206 buffers: HashSet<Entity<Buffer>>,
1207 push_to_history: bool,
1208 cx: &mut Context<BufferStore>,
1209 ) -> Task<Result<ProjectTransaction>> {
1210 cx.spawn(move |_, mut cx| async move {
1211 let mut project_transaction = ProjectTransaction::default();
1212 for buffer in buffers {
1213 let transaction = buffer
1214 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1215 .await?;
1216 buffer.update(&mut cx, |buffer, cx| {
1217 if let Some(transaction) = transaction {
1218 if !push_to_history {
1219 buffer.forget_transaction(transaction.id);
1220 }
1221 project_transaction.0.insert(cx.entity(), transaction);
1222 }
1223 })?;
1224 }
1225
1226 Ok(project_transaction)
1227 })
1228 }
1229}
1230
1231impl BufferStore {
1232 pub fn init(client: &AnyProtoClient) {
1233 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1234 client.add_entity_message_handler(Self::handle_buffer_saved);
1235 client.add_entity_message_handler(Self::handle_update_buffer_file);
1236 client.add_entity_request_handler(Self::handle_save_buffer);
1237 client.add_entity_request_handler(Self::handle_blame_buffer);
1238 client.add_entity_request_handler(Self::handle_reload_buffers);
1239 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1240 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
1241 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
1242 client.add_entity_message_handler(Self::handle_update_diff_bases);
1243 }
1244
1245 /// Creates a buffer store, optionally retaining its buffers.
1246 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1247 Self {
1248 state: BufferStoreState::Local(LocalBufferStore {
1249 local_buffer_ids_by_path: Default::default(),
1250 local_buffer_ids_by_entry_id: Default::default(),
1251 worktree_store: worktree_store.clone(),
1252 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1253 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1254 let this = this.as_local_mut().unwrap();
1255 this.subscribe_to_worktree(worktree, cx);
1256 }
1257 }),
1258 }),
1259 downstream_client: None,
1260 opened_buffers: Default::default(),
1261 shared_buffers: Default::default(),
1262 loading_buffers: Default::default(),
1263 loading_diffs: Default::default(),
1264 worktree_store,
1265 }
1266 }
1267
1268 pub fn remote(
1269 worktree_store: Entity<WorktreeStore>,
1270 upstream_client: AnyProtoClient,
1271 remote_id: u64,
1272 _cx: &mut Context<Self>,
1273 ) -> Self {
1274 Self {
1275 state: BufferStoreState::Remote(RemoteBufferStore {
1276 shared_with_me: Default::default(),
1277 loading_remote_buffers_by_id: Default::default(),
1278 remote_buffer_listeners: Default::default(),
1279 project_id: remote_id,
1280 upstream_client,
1281 worktree_store: worktree_store.clone(),
1282 }),
1283 downstream_client: None,
1284 opened_buffers: Default::default(),
1285 loading_buffers: Default::default(),
1286 loading_diffs: Default::default(),
1287 shared_buffers: Default::default(),
1288 worktree_store,
1289 }
1290 }
1291
1292 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1293 match &mut self.state {
1294 BufferStoreState::Local(state) => Some(state),
1295 _ => None,
1296 }
1297 }
1298
1299 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1300 match &mut self.state {
1301 BufferStoreState::Remote(state) => Some(state),
1302 _ => None,
1303 }
1304 }
1305
1306 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1307 match &self.state {
1308 BufferStoreState::Remote(state) => Some(state),
1309 _ => None,
1310 }
1311 }
1312
1313 pub fn open_buffer(
1314 &mut self,
1315 project_path: ProjectPath,
1316 cx: &mut Context<Self>,
1317 ) -> Task<Result<Entity<Buffer>>> {
1318 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1319 return Task::ready(Ok(buffer));
1320 }
1321
1322 let task = match self.loading_buffers.entry(project_path.clone()) {
1323 hash_map::Entry::Occupied(e) => e.get().clone(),
1324 hash_map::Entry::Vacant(entry) => {
1325 let path = project_path.path.clone();
1326 let Some(worktree) = self
1327 .worktree_store
1328 .read(cx)
1329 .worktree_for_id(project_path.worktree_id, cx)
1330 else {
1331 return Task::ready(Err(anyhow!("no such worktree")));
1332 };
1333 let load_buffer = match &self.state {
1334 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1335 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1336 };
1337
1338 entry
1339 .insert(
1340 cx.spawn(move |this, mut cx| async move {
1341 let load_result = load_buffer.await;
1342 this.update(&mut cx, |this, _cx| {
1343 // Record the fact that the buffer is no longer loading.
1344 this.loading_buffers.remove(&project_path);
1345 })
1346 .ok();
1347 load_result.map_err(Arc::new)
1348 })
1349 .shared(),
1350 )
1351 .clone()
1352 }
1353 };
1354
1355 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1356 }
1357
1358 pub fn open_unstaged_diff(
1359 &mut self,
1360 buffer: Entity<Buffer>,
1361 cx: &mut Context<Self>,
1362 ) -> Task<Result<Entity<BufferDiff>>> {
1363 let buffer_id = buffer.read(cx).remote_id();
1364 if let Some(OpenBuffer::Complete { diff_state, .. }) = self.opened_buffers.get(&buffer_id) {
1365 if let Some(unstaged_diff) = diff_state
1366 .read(cx)
1367 .unstaged_diff
1368 .as_ref()
1369 .and_then(|weak| weak.upgrade())
1370 {
1371 if let Some(task) =
1372 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
1373 {
1374 return cx.background_executor().spawn(async move {
1375 task.await?;
1376 Ok(unstaged_diff)
1377 });
1378 }
1379 return Task::ready(Ok(unstaged_diff));
1380 }
1381 }
1382
1383 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) {
1384 hash_map::Entry::Occupied(e) => e.get().clone(),
1385 hash_map::Entry::Vacant(entry) => {
1386 let staged_text = match &self.state {
1387 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1388 BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx),
1389 };
1390
1391 entry
1392 .insert(
1393 cx.spawn(move |this, cx| async move {
1394 Self::open_diff_internal(
1395 this,
1396 DiffKind::Unstaged,
1397 staged_text.await.map(DiffBasesChange::SetIndex),
1398 buffer,
1399 cx,
1400 )
1401 .await
1402 .map_err(Arc::new)
1403 })
1404 .shared(),
1405 )
1406 .clone()
1407 }
1408 };
1409
1410 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1411 }
1412
1413 pub fn open_uncommitted_diff(
1414 &mut self,
1415 buffer: Entity<Buffer>,
1416 cx: &mut Context<Self>,
1417 ) -> Task<Result<Entity<BufferDiff>>> {
1418 let buffer_id = buffer.read(cx).remote_id();
1419
1420 if let Some(OpenBuffer::Complete { diff_state, .. }) = self.opened_buffers.get(&buffer_id) {
1421 if let Some(uncommitted_diff) = diff_state
1422 .read(cx)
1423 .uncommitted_diff
1424 .as_ref()
1425 .and_then(|weak| weak.upgrade())
1426 {
1427 if let Some(task) =
1428 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
1429 {
1430 return cx.background_executor().spawn(async move {
1431 task.await?;
1432 Ok(uncommitted_diff)
1433 });
1434 }
1435 return Task::ready(Ok(uncommitted_diff));
1436 }
1437 }
1438
1439 let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) {
1440 hash_map::Entry::Occupied(e) => e.get().clone(),
1441 hash_map::Entry::Vacant(entry) => {
1442 let changes = match &self.state {
1443 BufferStoreState::Local(this) => {
1444 let committed_text = this.load_committed_text(&buffer, cx);
1445 let staged_text = this.load_staged_text(&buffer, cx);
1446 cx.background_spawn(async move {
1447 let committed_text = committed_text.await?;
1448 let staged_text = staged_text.await?;
1449 let diff_bases_change = if committed_text == staged_text {
1450 DiffBasesChange::SetBoth(committed_text)
1451 } else {
1452 DiffBasesChange::SetEach {
1453 index: staged_text,
1454 head: committed_text,
1455 }
1456 };
1457 Ok(diff_bases_change)
1458 })
1459 }
1460 BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx),
1461 };
1462
1463 entry
1464 .insert(
1465 cx.spawn(move |this, cx| async move {
1466 Self::open_diff_internal(
1467 this,
1468 DiffKind::Uncommitted,
1469 changes.await,
1470 buffer,
1471 cx,
1472 )
1473 .await
1474 .map_err(Arc::new)
1475 })
1476 .shared(),
1477 )
1478 .clone()
1479 }
1480 };
1481
1482 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1483 }
1484
1485 async fn open_diff_internal(
1486 this: WeakEntity<Self>,
1487 kind: DiffKind,
1488 texts: Result<DiffBasesChange>,
1489 buffer_entity: Entity<Buffer>,
1490 mut cx: AsyncApp,
1491 ) -> Result<Entity<BufferDiff>> {
1492 let diff_bases_change = match texts {
1493 Err(e) => {
1494 this.update(&mut cx, |this, cx| {
1495 let buffer = buffer_entity.read(cx);
1496 let buffer_id = buffer.remote_id();
1497 this.loading_diffs.remove(&(buffer_id, kind));
1498 })?;
1499 return Err(e);
1500 }
1501 Ok(change) => change,
1502 };
1503
1504 this.update(&mut cx, |this, cx| {
1505 let buffer = buffer_entity.read(cx);
1506 let buffer_id = buffer.remote_id();
1507 let language = buffer.language().cloned();
1508 let language_registry = buffer.language_registry();
1509 let text_snapshot = buffer.text_snapshot();
1510 this.loading_diffs.remove(&(buffer_id, kind));
1511
1512 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1513 this.opened_buffers.get_mut(&buffer_id)
1514 {
1515 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
1516 cx.emit(BufferStoreEvent::BufferDiffAdded(diff.clone()));
1517 diff_state.update(cx, |diff_state, cx| {
1518 diff_state.language = language;
1519 diff_state.language_registry = language_registry;
1520
1521 match kind {
1522 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
1523 DiffKind::Uncommitted => {
1524 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
1525 diff
1526 } else {
1527 let unstaged_diff =
1528 cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
1529 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
1530 unstaged_diff
1531 };
1532
1533 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
1534 diff_state.uncommitted_diff = Some(diff.downgrade())
1535 }
1536 };
1537
1538 let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx);
1539
1540 Ok(async move {
1541 rx.await.ok();
1542 Ok(diff)
1543 })
1544 })
1545 } else {
1546 Err(anyhow!("buffer was closed"))
1547 }
1548 })??
1549 .await
1550 }
1551
1552 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1553 match &self.state {
1554 BufferStoreState::Local(this) => this.create_buffer(cx),
1555 BufferStoreState::Remote(this) => this.create_buffer(cx),
1556 }
1557 }
1558
1559 pub fn save_buffer(
1560 &mut self,
1561 buffer: Entity<Buffer>,
1562 cx: &mut Context<Self>,
1563 ) -> Task<Result<()>> {
1564 match &mut self.state {
1565 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1566 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1567 }
1568 }
1569
1570 pub fn save_buffer_as(
1571 &mut self,
1572 buffer: Entity<Buffer>,
1573 path: ProjectPath,
1574 cx: &mut Context<Self>,
1575 ) -> Task<Result<()>> {
1576 let old_file = buffer.read(cx).file().cloned();
1577 let task = match &self.state {
1578 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1579 BufferStoreState::Remote(this) => {
1580 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1581 }
1582 };
1583 cx.spawn(|this, mut cx| async move {
1584 task.await?;
1585 this.update(&mut cx, |_, cx| {
1586 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1587 })
1588 })
1589 }
1590
1591 pub fn blame_buffer(
1592 &self,
1593 buffer: &Entity<Buffer>,
1594 version: Option<clock::Global>,
1595 cx: &App,
1596 ) -> Task<Result<Option<Blame>>> {
1597 let buffer = buffer.read(cx);
1598 let Some(file) = File::from_dyn(buffer.file()) else {
1599 return Task::ready(Err(anyhow!("buffer has no file")));
1600 };
1601
1602 match file.worktree.clone().read(cx) {
1603 Worktree::Local(worktree) => {
1604 let worktree = worktree.snapshot();
1605 let blame_params = maybe!({
1606 let local_repo = match worktree.local_repo_for_path(&file.path) {
1607 Some(repo_for_path) => repo_for_path,
1608 None => return Ok(None),
1609 };
1610
1611 let relative_path = local_repo
1612 .relativize(&file.path)
1613 .context("failed to relativize buffer path")?;
1614
1615 let repo = local_repo.repo().clone();
1616
1617 let content = match version {
1618 Some(version) => buffer.rope_for_version(&version).clone(),
1619 None => buffer.as_rope().clone(),
1620 };
1621
1622 anyhow::Ok(Some((repo, relative_path, content)))
1623 });
1624
1625 cx.background_spawn(async move {
1626 let Some((repo, relative_path, content)) = blame_params? else {
1627 return Ok(None);
1628 };
1629 repo.blame(&relative_path, content)
1630 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1631 .map(Some)
1632 })
1633 }
1634 Worktree::Remote(worktree) => {
1635 let buffer_id = buffer.remote_id();
1636 let version = buffer.version();
1637 let project_id = worktree.project_id();
1638 let client = worktree.client();
1639 cx.spawn(|_| async move {
1640 let response = client
1641 .request(proto::BlameBuffer {
1642 project_id,
1643 buffer_id: buffer_id.into(),
1644 version: serialize_version(&version),
1645 })
1646 .await?;
1647 Ok(deserialize_blame_buffer_response(response))
1648 })
1649 }
1650 }
1651 }
1652
1653 pub fn get_permalink_to_line(
1654 &self,
1655 buffer: &Entity<Buffer>,
1656 selection: Range<u32>,
1657 cx: &App,
1658 ) -> Task<Result<url::Url>> {
1659 let buffer = buffer.read(cx);
1660 let Some(file) = File::from_dyn(buffer.file()) else {
1661 return Task::ready(Err(anyhow!("buffer has no file")));
1662 };
1663
1664 match file.worktree.read(cx) {
1665 Worktree::Local(worktree) => {
1666 let worktree_path = worktree.abs_path().clone();
1667 let Some((repo_entry, repo)) =
1668 worktree.repository_for_path(file.path()).and_then(|entry| {
1669 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1670 Some((entry, repo))
1671 })
1672 else {
1673 // If we're not in a Git repo, check whether this is a Rust source
1674 // file in the Cargo registry (presumably opened with go-to-definition
1675 // from a normal Rust file). If so, we can put together a permalink
1676 // using crate metadata.
1677 if buffer
1678 .language()
1679 .is_none_or(|lang| lang.name() != "Rust".into())
1680 {
1681 return Task::ready(Err(anyhow!("no permalink available")));
1682 }
1683 let file_path = worktree_path.join(file.path());
1684 return cx.spawn(|cx| async move {
1685 let provider_registry =
1686 cx.update(GitHostingProviderRegistry::default_global)?;
1687 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1688 .map_err(|_| anyhow!("no permalink available"))
1689 });
1690 };
1691
1692 let path = match repo_entry.relativize(file.path()) {
1693 Ok(RepoPath(path)) => path,
1694 Err(e) => return Task::ready(Err(e)),
1695 };
1696
1697 cx.spawn(|cx| async move {
1698 const REMOTE_NAME: &str = "origin";
1699 let origin_url = repo
1700 .remote_url(REMOTE_NAME)
1701 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1702
1703 let sha = repo
1704 .head_sha()
1705 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1706
1707 let provider_registry =
1708 cx.update(GitHostingProviderRegistry::default_global)?;
1709
1710 let (provider, remote) =
1711 parse_git_remote_url(provider_registry, &origin_url)
1712 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1713
1714 let path = path
1715 .to_str()
1716 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1717
1718 Ok(provider.build_permalink(
1719 remote,
1720 BuildPermalinkParams {
1721 sha: &sha,
1722 path,
1723 selection: Some(selection),
1724 },
1725 ))
1726 })
1727 }
1728 Worktree::Remote(worktree) => {
1729 let buffer_id = buffer.remote_id();
1730 let project_id = worktree.project_id();
1731 let client = worktree.client();
1732 cx.spawn(|_| async move {
1733 let response = client
1734 .request(proto::GetPermalinkToLine {
1735 project_id,
1736 buffer_id: buffer_id.into(),
1737 selection: Some(proto::Range {
1738 start: selection.start as u64,
1739 end: selection.end as u64,
1740 }),
1741 })
1742 .await?;
1743
1744 url::Url::parse(&response.permalink).context("failed to parse permalink")
1745 })
1746 }
1747 }
1748 }
1749
1750 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1751 let buffer = buffer_entity.read(cx);
1752 let language = buffer.language().cloned();
1753 let language_registry = buffer.language_registry();
1754 let remote_id = buffer.remote_id();
1755 let is_remote = buffer.replica_id() != 0;
1756 let open_buffer = OpenBuffer::Complete {
1757 buffer: buffer_entity.downgrade(),
1758 diff_state: cx.new(|_| BufferDiffState {
1759 language,
1760 language_registry,
1761 ..Default::default()
1762 }),
1763 };
1764
1765 let handle = cx.entity().downgrade();
1766 buffer_entity.update(cx, move |_, cx| {
1767 cx.on_release(move |buffer, cx| {
1768 handle
1769 .update(cx, |_, cx| {
1770 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1771 })
1772 .ok();
1773 })
1774 .detach()
1775 });
1776
1777 match self.opened_buffers.entry(remote_id) {
1778 hash_map::Entry::Vacant(entry) => {
1779 entry.insert(open_buffer);
1780 }
1781 hash_map::Entry::Occupied(mut entry) => {
1782 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1783 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1784 } else if entry.get().upgrade().is_some() {
1785 if is_remote {
1786 return Ok(());
1787 } else {
1788 debug_panic!("buffer {} was already registered", remote_id);
1789 Err(anyhow!("buffer {} was already registered", remote_id))?;
1790 }
1791 }
1792 entry.insert(open_buffer);
1793 }
1794 }
1795
1796 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1797 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1798 Ok(())
1799 }
1800
1801 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1802 self.opened_buffers
1803 .values()
1804 .filter_map(|buffer| buffer.upgrade())
1805 }
1806
1807 pub fn loading_buffers(
1808 &self,
1809 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1810 self.loading_buffers.iter().map(|(path, task)| {
1811 let task = task.clone();
1812 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1813 })
1814 }
1815
1816 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1817 self.buffers().find_map(|buffer| {
1818 let file = File::from_dyn(buffer.read(cx).file())?;
1819 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1820 Some(buffer)
1821 } else {
1822 None
1823 }
1824 })
1825 }
1826
1827 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1828 self.opened_buffers.get(&buffer_id)?.upgrade()
1829 }
1830
1831 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1832 self.get(buffer_id)
1833 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1834 }
1835
1836 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1837 self.get(buffer_id).or_else(|| {
1838 self.as_remote()
1839 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1840 })
1841 }
1842
1843 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
1844 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1845 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
1846 } else {
1847 None
1848 }
1849 }
1850
1851 pub fn get_uncommitted_diff(
1852 &self,
1853 buffer_id: BufferId,
1854 cx: &App,
1855 ) -> Option<Entity<BufferDiff>> {
1856 if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? {
1857 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
1858 } else {
1859 None
1860 }
1861 }
1862
1863 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1864 let buffers = self
1865 .buffers()
1866 .map(|buffer| {
1867 let buffer = buffer.read(cx);
1868 proto::BufferVersion {
1869 id: buffer.remote_id().into(),
1870 version: language::proto::serialize_version(&buffer.version),
1871 }
1872 })
1873 .collect();
1874 let incomplete_buffer_ids = self
1875 .as_remote()
1876 .map(|remote| remote.incomplete_buffer_ids())
1877 .unwrap_or_default();
1878 (buffers, incomplete_buffer_ids)
1879 }
1880
1881 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1882 for open_buffer in self.opened_buffers.values_mut() {
1883 if let Some(buffer) = open_buffer.upgrade() {
1884 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1885 }
1886 }
1887
1888 for buffer in self.buffers() {
1889 buffer.update(cx, |buffer, cx| {
1890 buffer.set_capability(Capability::ReadOnly, cx)
1891 });
1892 }
1893
1894 if let Some(remote) = self.as_remote_mut() {
1895 // Wake up all futures currently waiting on a buffer to get opened,
1896 // to give them a chance to fail now that we've disconnected.
1897 remote.remote_buffer_listeners.clear()
1898 }
1899 }
1900
1901 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1902 self.downstream_client = Some((downstream_client, remote_id));
1903 }
1904
1905 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1906 self.downstream_client.take();
1907 self.forget_shared_buffers();
1908 }
1909
1910 pub fn discard_incomplete(&mut self) {
1911 self.opened_buffers
1912 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1913 }
1914
1915 pub fn find_search_candidates(
1916 &mut self,
1917 query: &SearchQuery,
1918 mut limit: usize,
1919 fs: Arc<dyn Fs>,
1920 cx: &mut Context<Self>,
1921 ) -> Receiver<Entity<Buffer>> {
1922 let (tx, rx) = smol::channel::unbounded();
1923 let mut open_buffers = HashSet::default();
1924 let mut unnamed_buffers = Vec::new();
1925 for handle in self.buffers() {
1926 let buffer = handle.read(cx);
1927 if let Some(entry_id) = buffer.entry_id(cx) {
1928 open_buffers.insert(entry_id);
1929 } else {
1930 limit = limit.saturating_sub(1);
1931 unnamed_buffers.push(handle)
1932 };
1933 }
1934
1935 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1936 let project_paths_rx = self
1937 .worktree_store
1938 .update(cx, |worktree_store, cx| {
1939 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1940 })
1941 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1942
1943 cx.spawn(|this, mut cx| async move {
1944 for buffer in unnamed_buffers {
1945 tx.send(buffer).await.ok();
1946 }
1947
1948 let mut project_paths_rx = pin!(project_paths_rx);
1949 while let Some(project_paths) = project_paths_rx.next().await {
1950 let buffers = this.update(&mut cx, |this, cx| {
1951 project_paths
1952 .into_iter()
1953 .map(|project_path| this.open_buffer(project_path, cx))
1954 .collect::<Vec<_>>()
1955 })?;
1956 for buffer_task in buffers {
1957 if let Some(buffer) = buffer_task.await.log_err() {
1958 if tx.send(buffer).await.is_err() {
1959 return anyhow::Ok(());
1960 }
1961 }
1962 }
1963 }
1964 anyhow::Ok(())
1965 })
1966 .detach();
1967 rx
1968 }
1969
1970 pub fn recalculate_buffer_diffs(
1971 &mut self,
1972 buffers: Vec<Entity<Buffer>>,
1973 cx: &mut Context<Self>,
1974 ) -> impl Future<Output = ()> {
1975 let mut futures = Vec::new();
1976 for buffer in buffers {
1977 if let Some(OpenBuffer::Complete { diff_state, .. }) =
1978 self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1979 {
1980 let buffer = buffer.read(cx).text_snapshot();
1981 futures.push(diff_state.update(cx, |diff_state, cx| {
1982 diff_state.recalculate_diffs(buffer, cx)
1983 }));
1984 }
1985 }
1986 async move {
1987 futures::future::join_all(futures).await;
1988 }
1989 }
1990
1991 fn on_buffer_event(
1992 &mut self,
1993 buffer: Entity<Buffer>,
1994 event: &BufferEvent,
1995 cx: &mut Context<Self>,
1996 ) {
1997 match event {
1998 BufferEvent::FileHandleChanged => {
1999 if let Some(local) = self.as_local_mut() {
2000 local.buffer_changed_file(buffer, cx);
2001 }
2002 }
2003 BufferEvent::Reloaded => {
2004 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
2005 return;
2006 };
2007 let buffer = buffer.read(cx);
2008 downstream_client
2009 .send(proto::BufferReloaded {
2010 project_id: *project_id,
2011 buffer_id: buffer.remote_id().to_proto(),
2012 version: serialize_version(&buffer.version()),
2013 mtime: buffer.saved_mtime().map(|t| t.into()),
2014 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
2015 })
2016 .log_err();
2017 }
2018 BufferEvent::LanguageChanged => {
2019 let buffer_id = buffer.read(cx).remote_id();
2020 if let Some(OpenBuffer::Complete { diff_state, .. }) =
2021 self.opened_buffers.get(&buffer_id)
2022 {
2023 diff_state.update(cx, |diff_state, cx| {
2024 diff_state.buffer_language_changed(buffer, cx);
2025 });
2026 }
2027 }
2028 _ => {}
2029 }
2030 }
2031
2032 pub async fn handle_update_buffer(
2033 this: Entity<Self>,
2034 envelope: TypedEnvelope<proto::UpdateBuffer>,
2035 mut cx: AsyncApp,
2036 ) -> Result<proto::Ack> {
2037 let payload = envelope.payload.clone();
2038 let buffer_id = BufferId::new(payload.buffer_id)?;
2039 let ops = payload
2040 .operations
2041 .into_iter()
2042 .map(language::proto::deserialize_operation)
2043 .collect::<Result<Vec<_>, _>>()?;
2044 this.update(&mut cx, |this, cx| {
2045 match this.opened_buffers.entry(buffer_id) {
2046 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2047 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2048 OpenBuffer::Complete { buffer, .. } => {
2049 if let Some(buffer) = buffer.upgrade() {
2050 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2051 }
2052 }
2053 },
2054 hash_map::Entry::Vacant(e) => {
2055 e.insert(OpenBuffer::Operations(ops));
2056 }
2057 }
2058 Ok(proto::Ack {})
2059 })?
2060 }
2061
2062 pub fn register_shared_lsp_handle(
2063 &mut self,
2064 peer_id: proto::PeerId,
2065 buffer_id: BufferId,
2066 handle: OpenLspBufferHandle,
2067 ) {
2068 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2069 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2070 buffer.lsp_handle = Some(handle);
2071 return;
2072 }
2073 }
2074 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2075 }
2076
2077 pub fn handle_synchronize_buffers(
2078 &mut self,
2079 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2080 cx: &mut Context<Self>,
2081 client: Arc<Client>,
2082 ) -> Result<proto::SynchronizeBuffersResponse> {
2083 let project_id = envelope.payload.project_id;
2084 let mut response = proto::SynchronizeBuffersResponse {
2085 buffers: Default::default(),
2086 };
2087 let Some(guest_id) = envelope.original_sender_id else {
2088 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2089 };
2090
2091 self.shared_buffers.entry(guest_id).or_default().clear();
2092 for buffer in envelope.payload.buffers {
2093 let buffer_id = BufferId::new(buffer.id)?;
2094 let remote_version = language::proto::deserialize_version(&buffer.version);
2095 if let Some(buffer) = self.get(buffer_id) {
2096 self.shared_buffers
2097 .entry(guest_id)
2098 .or_default()
2099 .entry(buffer_id)
2100 .or_insert_with(|| SharedBuffer {
2101 buffer: buffer.clone(),
2102 diff: None,
2103 lsp_handle: None,
2104 });
2105
2106 let buffer = buffer.read(cx);
2107 response.buffers.push(proto::BufferVersion {
2108 id: buffer_id.into(),
2109 version: language::proto::serialize_version(&buffer.version),
2110 });
2111
2112 let operations = buffer.serialize_ops(Some(remote_version), cx);
2113 let client = client.clone();
2114 if let Some(file) = buffer.file() {
2115 client
2116 .send(proto::UpdateBufferFile {
2117 project_id,
2118 buffer_id: buffer_id.into(),
2119 file: Some(file.to_proto(cx)),
2120 })
2121 .log_err();
2122 }
2123
2124 // TODO(max): do something
2125 // client
2126 // .send(proto::UpdateStagedText {
2127 // project_id,
2128 // buffer_id: buffer_id.into(),
2129 // diff_base: buffer.diff_base().map(ToString::to_string),
2130 // })
2131 // .log_err();
2132
2133 client
2134 .send(proto::BufferReloaded {
2135 project_id,
2136 buffer_id: buffer_id.into(),
2137 version: language::proto::serialize_version(buffer.saved_version()),
2138 mtime: buffer.saved_mtime().map(|time| time.into()),
2139 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2140 as i32,
2141 })
2142 .log_err();
2143
2144 cx.background_spawn(
2145 async move {
2146 let operations = operations.await;
2147 for chunk in split_operations(operations) {
2148 client
2149 .request(proto::UpdateBuffer {
2150 project_id,
2151 buffer_id: buffer_id.into(),
2152 operations: chunk,
2153 })
2154 .await?;
2155 }
2156 anyhow::Ok(())
2157 }
2158 .log_err(),
2159 )
2160 .detach();
2161 }
2162 }
2163 Ok(response)
2164 }
2165
2166 pub fn handle_create_buffer_for_peer(
2167 &mut self,
2168 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2169 replica_id: u16,
2170 capability: Capability,
2171 cx: &mut Context<Self>,
2172 ) -> Result<()> {
2173 let Some(remote) = self.as_remote_mut() else {
2174 return Err(anyhow!("buffer store is not a remote"));
2175 };
2176
2177 if let Some(buffer) =
2178 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2179 {
2180 self.add_buffer(buffer, cx)?;
2181 }
2182
2183 Ok(())
2184 }
2185
2186 pub async fn handle_update_buffer_file(
2187 this: Entity<Self>,
2188 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2189 mut cx: AsyncApp,
2190 ) -> Result<()> {
2191 let buffer_id = envelope.payload.buffer_id;
2192 let buffer_id = BufferId::new(buffer_id)?;
2193
2194 this.update(&mut cx, |this, cx| {
2195 let payload = envelope.payload.clone();
2196 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2197 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2198 let worktree = this
2199 .worktree_store
2200 .read(cx)
2201 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2202 .ok_or_else(|| anyhow!("no such worktree"))?;
2203 let file = File::from_proto(file, worktree, cx)?;
2204 let old_file = buffer.update(cx, |buffer, cx| {
2205 let old_file = buffer.file().cloned();
2206 let new_path = file.path.clone();
2207 buffer.file_updated(Arc::new(file), cx);
2208 if old_file
2209 .as_ref()
2210 .map_or(true, |old| *old.path() != new_path)
2211 {
2212 Some(old_file)
2213 } else {
2214 None
2215 }
2216 });
2217 if let Some(old_file) = old_file {
2218 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2219 }
2220 }
2221 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2222 downstream_client
2223 .send(proto::UpdateBufferFile {
2224 project_id: *project_id,
2225 buffer_id: buffer_id.into(),
2226 file: envelope.payload.file,
2227 })
2228 .log_err();
2229 }
2230 Ok(())
2231 })?
2232 }
2233
2234 pub async fn handle_save_buffer(
2235 this: Entity<Self>,
2236 envelope: TypedEnvelope<proto::SaveBuffer>,
2237 mut cx: AsyncApp,
2238 ) -> Result<proto::BufferSaved> {
2239 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2240 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2241 anyhow::Ok((
2242 this.get_existing(buffer_id)?,
2243 this.downstream_client
2244 .as_ref()
2245 .map(|(_, project_id)| *project_id)
2246 .context("project is not shared")?,
2247 ))
2248 })??;
2249 buffer
2250 .update(&mut cx, |buffer, _| {
2251 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2252 })?
2253 .await?;
2254 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2255
2256 if let Some(new_path) = envelope.payload.new_path {
2257 let new_path = ProjectPath::from_proto(new_path);
2258 this.update(&mut cx, |this, cx| {
2259 this.save_buffer_as(buffer.clone(), new_path, cx)
2260 })?
2261 .await?;
2262 } else {
2263 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2264 .await?;
2265 }
2266
2267 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2268 project_id,
2269 buffer_id: buffer_id.into(),
2270 version: serialize_version(buffer.saved_version()),
2271 mtime: buffer.saved_mtime().map(|time| time.into()),
2272 })
2273 }
2274
2275 pub async fn handle_close_buffer(
2276 this: Entity<Self>,
2277 envelope: TypedEnvelope<proto::CloseBuffer>,
2278 mut cx: AsyncApp,
2279 ) -> Result<()> {
2280 let peer_id = envelope.sender_id;
2281 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2282 this.update(&mut cx, |this, _| {
2283 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2284 if shared.remove(&buffer_id).is_some() {
2285 if shared.is_empty() {
2286 this.shared_buffers.remove(&peer_id);
2287 }
2288 return;
2289 }
2290 }
2291 debug_panic!(
2292 "peer_id {} closed buffer_id {} which was either not open or already closed",
2293 peer_id,
2294 buffer_id
2295 )
2296 })
2297 }
2298
2299 pub async fn handle_buffer_saved(
2300 this: Entity<Self>,
2301 envelope: TypedEnvelope<proto::BufferSaved>,
2302 mut cx: AsyncApp,
2303 ) -> Result<()> {
2304 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2305 let version = deserialize_version(&envelope.payload.version);
2306 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2307 this.update(&mut cx, move |this, cx| {
2308 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2309 buffer.update(cx, |buffer, cx| {
2310 buffer.did_save(version, mtime, cx);
2311 });
2312 }
2313
2314 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2315 downstream_client
2316 .send(proto::BufferSaved {
2317 project_id: *project_id,
2318 buffer_id: buffer_id.into(),
2319 mtime: envelope.payload.mtime,
2320 version: envelope.payload.version,
2321 })
2322 .log_err();
2323 }
2324 })
2325 }
2326
2327 pub async fn handle_buffer_reloaded(
2328 this: Entity<Self>,
2329 envelope: TypedEnvelope<proto::BufferReloaded>,
2330 mut cx: AsyncApp,
2331 ) -> Result<()> {
2332 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2333 let version = deserialize_version(&envelope.payload.version);
2334 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2335 let line_ending = deserialize_line_ending(
2336 proto::LineEnding::from_i32(envelope.payload.line_ending)
2337 .ok_or_else(|| anyhow!("missing line ending"))?,
2338 );
2339 this.update(&mut cx, |this, cx| {
2340 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2341 buffer.update(cx, |buffer, cx| {
2342 buffer.did_reload(version, line_ending, mtime, cx);
2343 });
2344 }
2345
2346 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2347 downstream_client
2348 .send(proto::BufferReloaded {
2349 project_id: *project_id,
2350 buffer_id: buffer_id.into(),
2351 mtime: envelope.payload.mtime,
2352 version: envelope.payload.version,
2353 line_ending: envelope.payload.line_ending,
2354 })
2355 .log_err();
2356 }
2357 })
2358 }
2359
2360 pub async fn handle_blame_buffer(
2361 this: Entity<Self>,
2362 envelope: TypedEnvelope<proto::BlameBuffer>,
2363 mut cx: AsyncApp,
2364 ) -> Result<proto::BlameBufferResponse> {
2365 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2366 let version = deserialize_version(&envelope.payload.version);
2367 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2368 buffer
2369 .update(&mut cx, |buffer, _| {
2370 buffer.wait_for_version(version.clone())
2371 })?
2372 .await?;
2373 let blame = this
2374 .update(&mut cx, |this, cx| {
2375 this.blame_buffer(&buffer, Some(version), cx)
2376 })?
2377 .await?;
2378 Ok(serialize_blame_buffer_response(blame))
2379 }
2380
2381 pub async fn handle_get_permalink_to_line(
2382 this: Entity<Self>,
2383 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2384 mut cx: AsyncApp,
2385 ) -> Result<proto::GetPermalinkToLineResponse> {
2386 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2387 // let version = deserialize_version(&envelope.payload.version);
2388 let selection = {
2389 let proto_selection = envelope
2390 .payload
2391 .selection
2392 .context("no selection to get permalink for defined")?;
2393 proto_selection.start as u32..proto_selection.end as u32
2394 };
2395 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2396 let permalink = this
2397 .update(&mut cx, |this, cx| {
2398 this.get_permalink_to_line(&buffer, selection, cx)
2399 })?
2400 .await?;
2401 Ok(proto::GetPermalinkToLineResponse {
2402 permalink: permalink.to_string(),
2403 })
2404 }
2405
2406 pub async fn handle_open_unstaged_diff(
2407 this: Entity<Self>,
2408 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2409 mut cx: AsyncApp,
2410 ) -> Result<proto::OpenUnstagedDiffResponse> {
2411 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2412 let diff = this
2413 .update(&mut cx, |this, cx| {
2414 let buffer = this.get(buffer_id)?;
2415 Some(this.open_unstaged_diff(buffer, cx))
2416 })?
2417 .ok_or_else(|| anyhow!("no such buffer"))?
2418 .await?;
2419 this.update(&mut cx, |this, _| {
2420 let shared_buffers = this
2421 .shared_buffers
2422 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2423 .or_default();
2424 debug_assert!(shared_buffers.contains_key(&buffer_id));
2425 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2426 shared.diff = Some(diff.clone());
2427 }
2428 })?;
2429 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2430 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2431 }
2432
2433 pub async fn handle_open_uncommitted_diff(
2434 this: Entity<Self>,
2435 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2436 mut cx: AsyncApp,
2437 ) -> Result<proto::OpenUncommittedDiffResponse> {
2438 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2439 let diff = this
2440 .update(&mut cx, |this, cx| {
2441 let buffer = this.get(buffer_id)?;
2442 Some(this.open_uncommitted_diff(buffer, cx))
2443 })?
2444 .ok_or_else(|| anyhow!("no such buffer"))?
2445 .await?;
2446 this.update(&mut cx, |this, _| {
2447 let shared_buffers = this
2448 .shared_buffers
2449 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2450 .or_default();
2451 debug_assert!(shared_buffers.contains_key(&buffer_id));
2452 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2453 shared.diff = Some(diff.clone());
2454 }
2455 })?;
2456 diff.read_with(&cx, |diff, cx| {
2457 use proto::open_uncommitted_diff_response::Mode;
2458
2459 let unstaged_diff = diff.secondary_diff();
2460 let index_snapshot = unstaged_diff.and_then(|diff| {
2461 let diff = diff.read(cx);
2462 diff.base_text_exists().then(|| diff.base_text())
2463 });
2464
2465 let mode;
2466 let staged_text;
2467 let committed_text;
2468 if diff.base_text_exists() {
2469 let committed_snapshot = diff.base_text();
2470 committed_text = Some(committed_snapshot.text());
2471 if let Some(index_text) = index_snapshot {
2472 if index_text.remote_id() == committed_snapshot.remote_id() {
2473 mode = Mode::IndexMatchesHead;
2474 staged_text = None;
2475 } else {
2476 mode = Mode::IndexAndHead;
2477 staged_text = Some(index_text.text());
2478 }
2479 } else {
2480 mode = Mode::IndexAndHead;
2481 staged_text = None;
2482 }
2483 } else {
2484 mode = Mode::IndexAndHead;
2485 committed_text = None;
2486 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2487 }
2488
2489 proto::OpenUncommittedDiffResponse {
2490 committed_text,
2491 staged_text,
2492 mode: mode.into(),
2493 }
2494 })
2495 }
2496
2497 pub async fn handle_update_diff_bases(
2498 this: Entity<Self>,
2499 request: TypedEnvelope<proto::UpdateDiffBases>,
2500 mut cx: AsyncApp,
2501 ) -> Result<()> {
2502 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2503 this.update(&mut cx, |this, cx| {
2504 if let Some(OpenBuffer::Complete { diff_state, buffer }) =
2505 this.opened_buffers.get_mut(&buffer_id)
2506 {
2507 if let Some(buffer) = buffer.upgrade() {
2508 let buffer = buffer.read(cx).text_snapshot();
2509 diff_state.update(cx, |diff_state, cx| {
2510 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2511 })
2512 }
2513 }
2514 })
2515 }
2516
2517 pub fn reload_buffers(
2518 &self,
2519 buffers: HashSet<Entity<Buffer>>,
2520 push_to_history: bool,
2521 cx: &mut Context<Self>,
2522 ) -> Task<Result<ProjectTransaction>> {
2523 if buffers.is_empty() {
2524 return Task::ready(Ok(ProjectTransaction::default()));
2525 }
2526 match &self.state {
2527 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2528 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2529 }
2530 }
2531
2532 async fn handle_reload_buffers(
2533 this: Entity<Self>,
2534 envelope: TypedEnvelope<proto::ReloadBuffers>,
2535 mut cx: AsyncApp,
2536 ) -> Result<proto::ReloadBuffersResponse> {
2537 let sender_id = envelope.original_sender_id().unwrap_or_default();
2538 let reload = this.update(&mut cx, |this, cx| {
2539 let mut buffers = HashSet::default();
2540 for buffer_id in &envelope.payload.buffer_ids {
2541 let buffer_id = BufferId::new(*buffer_id)?;
2542 buffers.insert(this.get_existing(buffer_id)?);
2543 }
2544 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2545 })??;
2546
2547 let project_transaction = reload.await?;
2548 let project_transaction = this.update(&mut cx, |this, cx| {
2549 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2550 })?;
2551 Ok(proto::ReloadBuffersResponse {
2552 transaction: Some(project_transaction),
2553 })
2554 }
2555
2556 pub fn create_buffer_for_peer(
2557 &mut self,
2558 buffer: &Entity<Buffer>,
2559 peer_id: proto::PeerId,
2560 cx: &mut Context<Self>,
2561 ) -> Task<Result<()>> {
2562 let buffer_id = buffer.read(cx).remote_id();
2563 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2564 if shared_buffers.contains_key(&buffer_id) {
2565 return Task::ready(Ok(()));
2566 }
2567 shared_buffers.insert(
2568 buffer_id,
2569 SharedBuffer {
2570 buffer: buffer.clone(),
2571 diff: None,
2572 lsp_handle: None,
2573 },
2574 );
2575
2576 let Some((client, project_id)) = self.downstream_client.clone() else {
2577 return Task::ready(Ok(()));
2578 };
2579
2580 cx.spawn(|this, mut cx| async move {
2581 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2582 return anyhow::Ok(());
2583 };
2584
2585 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2586 let operations = operations.await;
2587 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2588
2589 let initial_state = proto::CreateBufferForPeer {
2590 project_id,
2591 peer_id: Some(peer_id),
2592 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2593 };
2594
2595 if client.send(initial_state).log_err().is_some() {
2596 let client = client.clone();
2597 cx.background_spawn(async move {
2598 let mut chunks = split_operations(operations).peekable();
2599 while let Some(chunk) = chunks.next() {
2600 let is_last = chunks.peek().is_none();
2601 client.send(proto::CreateBufferForPeer {
2602 project_id,
2603 peer_id: Some(peer_id),
2604 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2605 proto::BufferChunk {
2606 buffer_id: buffer_id.into(),
2607 operations: chunk,
2608 is_last,
2609 },
2610 )),
2611 })?;
2612 }
2613 anyhow::Ok(())
2614 })
2615 .await
2616 .log_err();
2617 }
2618 Ok(())
2619 })
2620 }
2621
2622 pub fn forget_shared_buffers(&mut self) {
2623 self.shared_buffers.clear();
2624 }
2625
2626 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2627 self.shared_buffers.remove(peer_id);
2628 }
2629
2630 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2631 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2632 self.shared_buffers.insert(new_peer_id, buffers);
2633 }
2634 }
2635
2636 pub fn has_shared_buffers(&self) -> bool {
2637 !self.shared_buffers.is_empty()
2638 }
2639
2640 pub fn create_local_buffer(
2641 &mut self,
2642 text: &str,
2643 language: Option<Arc<Language>>,
2644 cx: &mut Context<Self>,
2645 ) -> Entity<Buffer> {
2646 let buffer = cx.new(|cx| {
2647 Buffer::local(text, cx)
2648 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2649 });
2650
2651 self.add_buffer(buffer.clone(), cx).log_err();
2652 let buffer_id = buffer.read(cx).remote_id();
2653
2654 let this = self
2655 .as_local_mut()
2656 .expect("local-only method called in a non-local context");
2657 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2658 this.local_buffer_ids_by_path.insert(
2659 ProjectPath {
2660 worktree_id: file.worktree_id(cx),
2661 path: file.path.clone(),
2662 },
2663 buffer_id,
2664 );
2665
2666 if let Some(entry_id) = file.entry_id {
2667 this.local_buffer_ids_by_entry_id
2668 .insert(entry_id, buffer_id);
2669 }
2670 }
2671 buffer
2672 }
2673
2674 pub fn deserialize_project_transaction(
2675 &mut self,
2676 message: proto::ProjectTransaction,
2677 push_to_history: bool,
2678 cx: &mut Context<Self>,
2679 ) -> Task<Result<ProjectTransaction>> {
2680 if let Some(this) = self.as_remote_mut() {
2681 this.deserialize_project_transaction(message, push_to_history, cx)
2682 } else {
2683 debug_panic!("not a remote buffer store");
2684 Task::ready(Err(anyhow!("not a remote buffer store")))
2685 }
2686 }
2687
2688 pub fn wait_for_remote_buffer(
2689 &mut self,
2690 id: BufferId,
2691 cx: &mut Context<BufferStore>,
2692 ) -> Task<Result<Entity<Buffer>>> {
2693 if let Some(this) = self.as_remote_mut() {
2694 this.wait_for_remote_buffer(id, cx)
2695 } else {
2696 debug_panic!("not a remote buffer store");
2697 Task::ready(Err(anyhow!("not a remote buffer store")))
2698 }
2699 }
2700
2701 pub fn serialize_project_transaction_for_peer(
2702 &mut self,
2703 project_transaction: ProjectTransaction,
2704 peer_id: proto::PeerId,
2705 cx: &mut Context<Self>,
2706 ) -> proto::ProjectTransaction {
2707 let mut serialized_transaction = proto::ProjectTransaction {
2708 buffer_ids: Default::default(),
2709 transactions: Default::default(),
2710 };
2711 for (buffer, transaction) in project_transaction.0 {
2712 self.create_buffer_for_peer(&buffer, peer_id, cx)
2713 .detach_and_log_err(cx);
2714 serialized_transaction
2715 .buffer_ids
2716 .push(buffer.read(cx).remote_id().into());
2717 serialized_transaction
2718 .transactions
2719 .push(language::proto::serialize_transaction(&transaction));
2720 }
2721 serialized_transaction
2722 }
2723}
2724
2725impl OpenBuffer {
2726 fn upgrade(&self) -> Option<Entity<Buffer>> {
2727 match self {
2728 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2729 OpenBuffer::Operations(_) => None,
2730 }
2731 }
2732}
2733
2734fn is_not_found_error(error: &anyhow::Error) -> bool {
2735 error
2736 .root_cause()
2737 .downcast_ref::<io::Error>()
2738 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2739}
2740
2741fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2742 let Some(blame) = blame else {
2743 return proto::BlameBufferResponse {
2744 blame_response: None,
2745 };
2746 };
2747
2748 let entries = blame
2749 .entries
2750 .into_iter()
2751 .map(|entry| proto::BlameEntry {
2752 sha: entry.sha.as_bytes().into(),
2753 start_line: entry.range.start,
2754 end_line: entry.range.end,
2755 original_line_number: entry.original_line_number,
2756 author: entry.author.clone(),
2757 author_mail: entry.author_mail.clone(),
2758 author_time: entry.author_time,
2759 author_tz: entry.author_tz.clone(),
2760 committer: entry.committer_name.clone(),
2761 committer_mail: entry.committer_email.clone(),
2762 committer_time: entry.committer_time,
2763 committer_tz: entry.committer_tz.clone(),
2764 summary: entry.summary.clone(),
2765 previous: entry.previous.clone(),
2766 filename: entry.filename.clone(),
2767 })
2768 .collect::<Vec<_>>();
2769
2770 let messages = blame
2771 .messages
2772 .into_iter()
2773 .map(|(oid, message)| proto::CommitMessage {
2774 oid: oid.as_bytes().into(),
2775 message,
2776 })
2777 .collect::<Vec<_>>();
2778
2779 let permalinks = blame
2780 .permalinks
2781 .into_iter()
2782 .map(|(oid, url)| proto::CommitPermalink {
2783 oid: oid.as_bytes().into(),
2784 permalink: url.to_string(),
2785 })
2786 .collect::<Vec<_>>();
2787
2788 proto::BlameBufferResponse {
2789 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2790 entries,
2791 messages,
2792 permalinks,
2793 remote_url: blame.remote_url,
2794 }),
2795 }
2796}
2797
2798fn deserialize_blame_buffer_response(
2799 response: proto::BlameBufferResponse,
2800) -> Option<git::blame::Blame> {
2801 let response = response.blame_response?;
2802 let entries = response
2803 .entries
2804 .into_iter()
2805 .filter_map(|entry| {
2806 Some(git::blame::BlameEntry {
2807 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2808 range: entry.start_line..entry.end_line,
2809 original_line_number: entry.original_line_number,
2810 committer_name: entry.committer,
2811 committer_time: entry.committer_time,
2812 committer_tz: entry.committer_tz,
2813 committer_email: entry.committer_mail,
2814 author: entry.author,
2815 author_mail: entry.author_mail,
2816 author_time: entry.author_time,
2817 author_tz: entry.author_tz,
2818 summary: entry.summary,
2819 previous: entry.previous,
2820 filename: entry.filename,
2821 })
2822 })
2823 .collect::<Vec<_>>();
2824
2825 let messages = response
2826 .messages
2827 .into_iter()
2828 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2829 .collect::<HashMap<_, _>>();
2830
2831 let permalinks = response
2832 .permalinks
2833 .into_iter()
2834 .filter_map(|permalink| {
2835 Some((
2836 git::Oid::from_bytes(&permalink.oid).ok()?,
2837 Url::from_str(&permalink.permalink).ok()?,
2838 ))
2839 })
2840 .collect::<HashMap<_, _>>();
2841
2842 Some(Blame {
2843 entries,
2844 permalinks,
2845 messages,
2846 remote_url: response.remote_url,
2847 })
2848}
2849
2850fn get_permalink_in_rust_registry_src(
2851 provider_registry: Arc<GitHostingProviderRegistry>,
2852 path: PathBuf,
2853 selection: Range<u32>,
2854) -> Result<url::Url> {
2855 #[derive(Deserialize)]
2856 struct CargoVcsGit {
2857 sha1: String,
2858 }
2859
2860 #[derive(Deserialize)]
2861 struct CargoVcsInfo {
2862 git: CargoVcsGit,
2863 path_in_vcs: String,
2864 }
2865
2866 #[derive(Deserialize)]
2867 struct CargoPackage {
2868 repository: String,
2869 }
2870
2871 #[derive(Deserialize)]
2872 struct CargoToml {
2873 package: CargoPackage,
2874 }
2875
2876 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2877 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2878 Some((dir, json))
2879 }) else {
2880 bail!("No .cargo_vcs_info.json found in parent directories")
2881 };
2882 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2883 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2884 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2885 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2886 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2887 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2888 let permalink = provider.build_permalink(
2889 remote,
2890 BuildPermalinkParams {
2891 sha: &cargo_vcs_info.git.sha1,
2892 path: &path.to_string_lossy(),
2893 selection: Some(selection),
2894 },
2895 );
2896 Ok(permalink)
2897}