1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{
13 channel::oneshot,
14 future::{OptionFuture, Shared},
15 Future, FutureExt as _, StreamExt,
16};
17use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
18use gpui::{
19 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
20};
21use http_client::Url;
22use language::{
23 proto::{
24 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
25 split_operations,
26 },
27 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
28};
29use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::{BufferId, Rope};
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum ChangeSetKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_change_sets: HashMap<
58 (BufferId, ChangeSetKind),
59 Shared<Task<Result<Entity<BufferChangeSet>, Arc<anyhow::Error>>>>,
60 >,
61 worktree_store: Entity<WorktreeStore>,
62 opened_buffers: HashMap<BufferId, OpenBuffer>,
63 downstream_client: Option<(AnyProtoClient, u64)>,
64 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
65}
66
67#[derive(Hash, Eq, PartialEq, Clone)]
68struct SharedBuffer {
69 buffer: Entity<Buffer>,
70 change_set: Option<Entity<BufferChangeSet>>,
71 lsp_handle: Option<OpenLspBufferHandle>,
72}
73
74#[derive(Default)]
75struct BufferChangeSetState {
76 unstaged_changes: Option<WeakEntity<BufferChangeSet>>,
77 uncommitted_changes: Option<WeakEntity<BufferChangeSet>>,
78 recalculate_diff_task: Option<Task<Result<()>>>,
79 language: Option<Arc<Language>>,
80 language_registry: Option<Arc<LanguageRegistry>>,
81 diff_updated_futures: Vec<oneshot::Sender<()>>,
82 buffer_subscription: Option<Subscription>,
83
84 head_text: Option<Arc<String>>,
85 index_text: Option<Arc<String>>,
86 head_changed: bool,
87 index_changed: bool,
88 language_changed: bool,
89}
90
91#[derive(Clone, Debug)]
92enum DiffBasesChange {
93 SetIndex(Option<String>),
94 SetHead(Option<String>),
95 SetEach {
96 index: Option<String>,
97 head: Option<String>,
98 },
99 SetBoth(Option<String>),
100}
101
102impl BufferChangeSetState {
103 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
104 self.language = buffer.read(cx).language().cloned();
105 self.language_changed = true;
106 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
107 }
108
109 fn unstaged_changes(&self) -> Option<Entity<BufferChangeSet>> {
110 self.unstaged_changes.as_ref().and_then(|set| set.upgrade())
111 }
112
113 fn uncommitted_changes(&self) -> Option<Entity<BufferChangeSet>> {
114 self.uncommitted_changes
115 .as_ref()
116 .and_then(|set| set.upgrade())
117 }
118
119 fn handle_base_texts_updated(
120 &mut self,
121 buffer: text::BufferSnapshot,
122 message: proto::UpdateDiffBases,
123 cx: &mut Context<Self>,
124 ) {
125 use proto::update_diff_bases::Mode;
126
127 let Some(mode) = Mode::from_i32(message.mode) else {
128 return;
129 };
130
131 let diff_bases_change = match mode {
132 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
133 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
134 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
135 Mode::IndexAndHead => DiffBasesChange::SetEach {
136 index: message.staged_text,
137 head: message.committed_text,
138 },
139 };
140
141 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
142 }
143
144 fn diff_bases_changed(
145 &mut self,
146 buffer: text::BufferSnapshot,
147 diff_bases_change: DiffBasesChange,
148 cx: &mut Context<Self>,
149 ) -> oneshot::Receiver<()> {
150 match diff_bases_change {
151 DiffBasesChange::SetIndex(index) => {
152 self.index_text = index.map(|mut index| {
153 text::LineEnding::normalize(&mut index);
154 Arc::new(index)
155 });
156 self.index_changed = true;
157 }
158 DiffBasesChange::SetHead(head) => {
159 self.head_text = head.map(|mut head| {
160 text::LineEnding::normalize(&mut head);
161 Arc::new(head)
162 });
163 self.head_changed = true;
164 }
165 DiffBasesChange::SetBoth(text) => {
166 let text = text.map(|mut text| {
167 text::LineEnding::normalize(&mut text);
168 Arc::new(text)
169 });
170 self.head_text = text.clone();
171 self.index_text = text;
172 self.head_changed = true;
173 self.index_changed = true;
174 }
175 DiffBasesChange::SetEach { index, head } => {
176 self.index_text = index.map(|mut index| {
177 text::LineEnding::normalize(&mut index);
178 Arc::new(index)
179 });
180 self.index_changed = true;
181 self.head_text = head.map(|mut head| {
182 text::LineEnding::normalize(&mut head);
183 Arc::new(head)
184 });
185 self.head_changed = true;
186 }
187 }
188
189 self.recalculate_diffs(buffer, cx)
190 }
191
192 fn recalculate_diffs(
193 &mut self,
194 buffer: text::BufferSnapshot,
195 cx: &mut Context<Self>,
196 ) -> oneshot::Receiver<()> {
197 let (tx, rx) = oneshot::channel();
198 self.diff_updated_futures.push(tx);
199
200 let language = self.language.clone();
201 let language_registry = self.language_registry.clone();
202 let unstaged_changes = self.unstaged_changes();
203 let uncommitted_changes = self.uncommitted_changes();
204 let head = self.head_text.clone();
205 let index = self.index_text.clone();
206 let index_changed = self.index_changed;
207 let head_changed = self.head_changed;
208 let language_changed = self.language_changed;
209 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
210 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
211 (None, None) => true,
212 _ => false,
213 };
214 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
215 if let Some(unstaged_changes) = &unstaged_changes {
216 let staged_snapshot = if index_changed || language_changed {
217 let staged_snapshot = cx.update(|cx| {
218 index.as_ref().map(|head| {
219 language::Buffer::build_snapshot(
220 Rope::from(head.as_str()),
221 language.clone(),
222 language_registry.clone(),
223 cx,
224 )
225 })
226 })?;
227 cx.background_executor()
228 .spawn(OptionFuture::from(staged_snapshot))
229 } else {
230 Task::ready(
231 unstaged_changes
232 .read_with(&cx, |change_set, _| change_set.base_text.clone())?,
233 )
234 };
235
236 let diff =
237 cx.background_executor().spawn({
238 let buffer = buffer.clone();
239 async move {
240 BufferDiff::build(index.as_ref().map(|index| index.as_str()), &buffer)
241 }
242 });
243
244 let (staged_snapshot, diff) = futures::join!(staged_snapshot, diff);
245
246 unstaged_changes.update(&mut cx, |unstaged_changes, cx| {
247 unstaged_changes.set_state(staged_snapshot.clone(), diff, &buffer, cx);
248 if language_changed {
249 cx.emit(BufferChangeSetEvent::LanguageChanged);
250 }
251 })?;
252 }
253
254 if let Some(uncommitted_changes) = &uncommitted_changes {
255 let (snapshot, diff) = if let (Some(unstaged_changes), true) =
256 (&unstaged_changes, index_matches_head)
257 {
258 unstaged_changes.read_with(&cx, |change_set, _| {
259 (
260 change_set.base_text.clone(),
261 change_set.diff_to_buffer.clone(),
262 )
263 })?
264 } else {
265 let committed_snapshot = if head_changed || language_changed {
266 let committed_snapshot = cx.update(|cx| {
267 head.as_ref().map(|head| {
268 language::Buffer::build_snapshot(
269 Rope::from(head.as_str()),
270 language.clone(),
271 language_registry.clone(),
272 cx,
273 )
274 })
275 })?;
276 cx.background_executor()
277 .spawn(OptionFuture::from(committed_snapshot))
278 } else {
279 Task::ready(
280 uncommitted_changes
281 .read_with(&cx, |change_set, _| change_set.base_text.clone())?,
282 )
283 };
284
285 let diff = cx.background_executor().spawn({
286 let buffer = buffer.clone();
287 let head = head.clone();
288 async move {
289 BufferDiff::build(head.as_ref().map(|head| head.as_str()), &buffer)
290 }
291 });
292 futures::join!(committed_snapshot, diff)
293 };
294
295 uncommitted_changes.update(&mut cx, |change_set, cx| {
296 change_set.set_state(snapshot, diff, &buffer, cx);
297 if language_changed {
298 cx.emit(BufferChangeSetEvent::LanguageChanged);
299 }
300 })?;
301 }
302
303 if let Some(this) = this.upgrade() {
304 this.update(&mut cx, |this, _| {
305 this.index_changed = false;
306 this.head_changed = false;
307 for tx in this.diff_updated_futures.drain(..) {
308 tx.send(()).ok();
309 }
310 })?;
311 }
312
313 Ok(())
314 }));
315
316 rx
317 }
318}
319
320pub struct BufferChangeSet {
321 pub buffer_id: BufferId,
322 pub base_text: Option<language::BufferSnapshot>,
323 pub diff_to_buffer: BufferDiff,
324 pub unstaged_change_set: Option<Entity<BufferChangeSet>>,
325}
326
327impl std::fmt::Debug for BufferChangeSet {
328 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329 f.debug_struct("BufferChangeSet")
330 .field("buffer_id", &self.buffer_id)
331 .field("base_text", &self.base_text.as_ref().map(|s| s.text()))
332 .field("diff_to_buffer", &self.diff_to_buffer)
333 .finish()
334 }
335}
336
337pub enum BufferChangeSetEvent {
338 DiffChanged { changed_range: Range<text::Anchor> },
339 LanguageChanged,
340}
341
342enum BufferStoreState {
343 Local(LocalBufferStore),
344 Remote(RemoteBufferStore),
345}
346
347struct RemoteBufferStore {
348 shared_with_me: HashSet<Entity<Buffer>>,
349 upstream_client: AnyProtoClient,
350 project_id: u64,
351 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
352 remote_buffer_listeners:
353 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
354 worktree_store: Entity<WorktreeStore>,
355}
356
357struct LocalBufferStore {
358 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
359 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
360 worktree_store: Entity<WorktreeStore>,
361 _subscription: Subscription,
362}
363
364enum OpenBuffer {
365 Complete {
366 buffer: WeakEntity<Buffer>,
367 change_set_state: Entity<BufferChangeSetState>,
368 },
369 Operations(Vec<Operation>),
370}
371
372pub enum BufferStoreEvent {
373 BufferAdded(Entity<Buffer>),
374 BufferDropped(BufferId),
375 BufferChangedFilePath {
376 buffer: Entity<Buffer>,
377 old_file: Option<Arc<dyn language::File>>,
378 },
379}
380
381#[derive(Default, Debug)]
382pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
383
384impl EventEmitter<BufferStoreEvent> for BufferStore {}
385
386impl RemoteBufferStore {
387 fn open_unstaged_changes(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
388 let project_id = self.project_id;
389 let client = self.upstream_client.clone();
390 cx.background_executor().spawn(async move {
391 let response = client
392 .request(proto::OpenUnstagedChanges {
393 project_id,
394 buffer_id: buffer_id.to_proto(),
395 })
396 .await?;
397 Ok(response.staged_text)
398 })
399 }
400
401 fn open_uncommitted_changes(
402 &self,
403 buffer_id: BufferId,
404 cx: &App,
405 ) -> Task<Result<DiffBasesChange>> {
406 use proto::open_uncommitted_changes_response::Mode;
407
408 let project_id = self.project_id;
409 let client = self.upstream_client.clone();
410 cx.background_executor().spawn(async move {
411 let response = client
412 .request(proto::OpenUncommittedChanges {
413 project_id,
414 buffer_id: buffer_id.to_proto(),
415 })
416 .await?;
417 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
418 let bases = match mode {
419 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
420 Mode::IndexAndHead => DiffBasesChange::SetEach {
421 head: response.committed_text,
422 index: response.staged_text,
423 },
424 };
425 Ok(bases)
426 })
427 }
428
429 pub fn wait_for_remote_buffer(
430 &mut self,
431 id: BufferId,
432 cx: &mut Context<BufferStore>,
433 ) -> Task<Result<Entity<Buffer>>> {
434 let (tx, rx) = oneshot::channel();
435 self.remote_buffer_listeners.entry(id).or_default().push(tx);
436
437 cx.spawn(|this, cx| async move {
438 if let Some(buffer) = this
439 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
440 .ok()
441 .flatten()
442 {
443 return Ok(buffer);
444 }
445
446 cx.background_executor()
447 .spawn(async move { rx.await? })
448 .await
449 })
450 }
451
452 fn save_remote_buffer(
453 &self,
454 buffer_handle: Entity<Buffer>,
455 new_path: Option<proto::ProjectPath>,
456 cx: &Context<BufferStore>,
457 ) -> Task<Result<()>> {
458 let buffer = buffer_handle.read(cx);
459 let buffer_id = buffer.remote_id().into();
460 let version = buffer.version();
461 let rpc = self.upstream_client.clone();
462 let project_id = self.project_id;
463 cx.spawn(move |_, mut cx| async move {
464 let response = rpc
465 .request(proto::SaveBuffer {
466 project_id,
467 buffer_id,
468 new_path,
469 version: serialize_version(&version),
470 })
471 .await?;
472 let version = deserialize_version(&response.version);
473 let mtime = response.mtime.map(|mtime| mtime.into());
474
475 buffer_handle.update(&mut cx, |buffer, cx| {
476 buffer.did_save(version.clone(), mtime, cx);
477 })?;
478
479 Ok(())
480 })
481 }
482
483 pub fn handle_create_buffer_for_peer(
484 &mut self,
485 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
486 replica_id: u16,
487 capability: Capability,
488 cx: &mut Context<BufferStore>,
489 ) -> Result<Option<Entity<Buffer>>> {
490 match envelope
491 .payload
492 .variant
493 .ok_or_else(|| anyhow!("missing variant"))?
494 {
495 proto::create_buffer_for_peer::Variant::State(mut state) => {
496 let buffer_id = BufferId::new(state.id)?;
497
498 let buffer_result = maybe!({
499 let mut buffer_file = None;
500 if let Some(file) = state.file.take() {
501 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
502 let worktree = self
503 .worktree_store
504 .read(cx)
505 .worktree_for_id(worktree_id, cx)
506 .ok_or_else(|| {
507 anyhow!("no worktree found for id {}", file.worktree_id)
508 })?;
509 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
510 as Arc<dyn language::File>);
511 }
512 Buffer::from_proto(replica_id, capability, state, buffer_file)
513 });
514
515 match buffer_result {
516 Ok(buffer) => {
517 let buffer = cx.new(|_| buffer);
518 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
519 }
520 Err(error) => {
521 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
522 for listener in listeners {
523 listener.send(Err(anyhow!(error.cloned()))).ok();
524 }
525 }
526 }
527 }
528 }
529 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
530 let buffer_id = BufferId::new(chunk.buffer_id)?;
531 let buffer = self
532 .loading_remote_buffers_by_id
533 .get(&buffer_id)
534 .cloned()
535 .ok_or_else(|| {
536 anyhow!(
537 "received chunk for buffer {} without initial state",
538 chunk.buffer_id
539 )
540 })?;
541
542 let result = maybe!({
543 let operations = chunk
544 .operations
545 .into_iter()
546 .map(language::proto::deserialize_operation)
547 .collect::<Result<Vec<_>>>()?;
548 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
549 anyhow::Ok(())
550 });
551
552 if let Err(error) = result {
553 self.loading_remote_buffers_by_id.remove(&buffer_id);
554 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
555 for listener in listeners {
556 listener.send(Err(error.cloned())).ok();
557 }
558 }
559 } else if chunk.is_last {
560 self.loading_remote_buffers_by_id.remove(&buffer_id);
561 if self.upstream_client.is_via_collab() {
562 // retain buffers sent by peers to avoid races.
563 self.shared_with_me.insert(buffer.clone());
564 }
565
566 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
567 for sender in senders {
568 sender.send(Ok(buffer.clone())).ok();
569 }
570 }
571 return Ok(Some(buffer));
572 }
573 }
574 }
575 return Ok(None);
576 }
577
578 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
579 self.loading_remote_buffers_by_id
580 .keys()
581 .copied()
582 .collect::<Vec<_>>()
583 }
584
585 pub fn deserialize_project_transaction(
586 &self,
587 message: proto::ProjectTransaction,
588 push_to_history: bool,
589 cx: &mut Context<BufferStore>,
590 ) -> Task<Result<ProjectTransaction>> {
591 cx.spawn(|this, mut cx| async move {
592 let mut project_transaction = ProjectTransaction::default();
593 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
594 {
595 let buffer_id = BufferId::new(buffer_id)?;
596 let buffer = this
597 .update(&mut cx, |this, cx| {
598 this.wait_for_remote_buffer(buffer_id, cx)
599 })?
600 .await?;
601 let transaction = language::proto::deserialize_transaction(transaction)?;
602 project_transaction.0.insert(buffer, transaction);
603 }
604
605 for (buffer, transaction) in &project_transaction.0 {
606 buffer
607 .update(&mut cx, |buffer, _| {
608 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
609 })?
610 .await?;
611
612 if push_to_history {
613 buffer.update(&mut cx, |buffer, _| {
614 buffer.push_transaction(transaction.clone(), Instant::now());
615 })?;
616 }
617 }
618
619 Ok(project_transaction)
620 })
621 }
622
623 fn open_buffer(
624 &self,
625 path: Arc<Path>,
626 worktree: Entity<Worktree>,
627 cx: &mut Context<BufferStore>,
628 ) -> Task<Result<Entity<Buffer>>> {
629 let worktree_id = worktree.read(cx).id().to_proto();
630 let project_id = self.project_id;
631 let client = self.upstream_client.clone();
632 let path_string = path.clone().to_string_lossy().to_string();
633 cx.spawn(move |this, mut cx| async move {
634 let response = client
635 .request(proto::OpenBufferByPath {
636 project_id,
637 worktree_id,
638 path: path_string,
639 })
640 .await?;
641 let buffer_id = BufferId::new(response.buffer_id)?;
642
643 let buffer = this
644 .update(&mut cx, {
645 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
646 })?
647 .await?;
648
649 Ok(buffer)
650 })
651 }
652
653 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
654 let create = self.upstream_client.request(proto::OpenNewBuffer {
655 project_id: self.project_id,
656 });
657 cx.spawn(|this, mut cx| async move {
658 let response = create.await?;
659 let buffer_id = BufferId::new(response.buffer_id)?;
660
661 this.update(&mut cx, |this, cx| {
662 this.wait_for_remote_buffer(buffer_id, cx)
663 })?
664 .await
665 })
666 }
667
668 fn reload_buffers(
669 &self,
670 buffers: HashSet<Entity<Buffer>>,
671 push_to_history: bool,
672 cx: &mut Context<BufferStore>,
673 ) -> Task<Result<ProjectTransaction>> {
674 let request = self.upstream_client.request(proto::ReloadBuffers {
675 project_id: self.project_id,
676 buffer_ids: buffers
677 .iter()
678 .map(|buffer| buffer.read(cx).remote_id().to_proto())
679 .collect(),
680 });
681
682 cx.spawn(|this, mut cx| async move {
683 let response = request
684 .await?
685 .transaction
686 .ok_or_else(|| anyhow!("missing transaction"))?;
687 this.update(&mut cx, |this, cx| {
688 this.deserialize_project_transaction(response, push_to_history, cx)
689 })?
690 .await
691 })
692 }
693}
694
695impl LocalBufferStore {
696 fn worktree_for_buffer(
697 &self,
698 buffer: &Entity<Buffer>,
699 cx: &App,
700 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
701 let file = buffer.read(cx).file()?;
702 let worktree_id = file.worktree_id(cx);
703 let path = file.path().clone();
704 let worktree = self
705 .worktree_store
706 .read(cx)
707 .worktree_for_id(worktree_id, cx)?;
708 Some((worktree, path))
709 }
710
711 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
712 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
713 worktree.read(cx).load_staged_file(path.as_ref(), cx)
714 } else {
715 return Task::ready(Err(anyhow!("no such worktree")));
716 }
717 }
718
719 fn load_committed_text(
720 &self,
721 buffer: &Entity<Buffer>,
722 cx: &App,
723 ) -> Task<Result<Option<String>>> {
724 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
725 worktree.read(cx).load_committed_file(path.as_ref(), cx)
726 } else {
727 Task::ready(Err(anyhow!("no such worktree")))
728 }
729 }
730
731 fn save_local_buffer(
732 &self,
733 buffer_handle: Entity<Buffer>,
734 worktree: Entity<Worktree>,
735 path: Arc<Path>,
736 mut has_changed_file: bool,
737 cx: &mut Context<BufferStore>,
738 ) -> Task<Result<()>> {
739 let buffer = buffer_handle.read(cx);
740
741 let text = buffer.as_rope().clone();
742 let line_ending = buffer.line_ending();
743 let version = buffer.version();
744 let buffer_id = buffer.remote_id();
745 if buffer
746 .file()
747 .is_some_and(|file| file.disk_state() == DiskState::New)
748 {
749 has_changed_file = true;
750 }
751
752 let save = worktree.update(cx, |worktree, cx| {
753 worktree.write_file(path.as_ref(), text, line_ending, cx)
754 });
755
756 cx.spawn(move |this, mut cx| async move {
757 let new_file = save.await?;
758 let mtime = new_file.disk_state().mtime();
759 this.update(&mut cx, |this, cx| {
760 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
761 if has_changed_file {
762 downstream_client
763 .send(proto::UpdateBufferFile {
764 project_id,
765 buffer_id: buffer_id.to_proto(),
766 file: Some(language::File::to_proto(&*new_file, cx)),
767 })
768 .log_err();
769 }
770 downstream_client
771 .send(proto::BufferSaved {
772 project_id,
773 buffer_id: buffer_id.to_proto(),
774 version: serialize_version(&version),
775 mtime: mtime.map(|time| time.into()),
776 })
777 .log_err();
778 }
779 })?;
780 buffer_handle.update(&mut cx, |buffer, cx| {
781 if has_changed_file {
782 buffer.file_updated(new_file, cx);
783 }
784 buffer.did_save(version.clone(), mtime, cx);
785 })
786 })
787 }
788
789 fn subscribe_to_worktree(
790 &mut self,
791 worktree: &Entity<Worktree>,
792 cx: &mut Context<BufferStore>,
793 ) {
794 cx.subscribe(worktree, |this, worktree, event, cx| {
795 if worktree.read(cx).is_local() {
796 match event {
797 worktree::Event::UpdatedEntries(changes) => {
798 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
799 }
800 worktree::Event::UpdatedGitRepositories(updated_repos) => {
801 Self::local_worktree_git_repos_changed(
802 this,
803 worktree.clone(),
804 updated_repos,
805 cx,
806 )
807 }
808 _ => {}
809 }
810 }
811 })
812 .detach();
813 }
814
815 fn local_worktree_entries_changed(
816 this: &mut BufferStore,
817 worktree_handle: &Entity<Worktree>,
818 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
819 cx: &mut Context<BufferStore>,
820 ) {
821 let snapshot = worktree_handle.read(cx).snapshot();
822 for (path, entry_id, _) in changes {
823 Self::local_worktree_entry_changed(
824 this,
825 *entry_id,
826 path,
827 worktree_handle,
828 &snapshot,
829 cx,
830 );
831 }
832 }
833
834 fn local_worktree_git_repos_changed(
835 this: &mut BufferStore,
836 worktree_handle: Entity<Worktree>,
837 changed_repos: &UpdatedGitRepositoriesSet,
838 cx: &mut Context<BufferStore>,
839 ) {
840 debug_assert!(worktree_handle.read(cx).is_local());
841
842 let mut change_set_state_updates = Vec::new();
843 for buffer in this.opened_buffers.values() {
844 let OpenBuffer::Complete {
845 buffer,
846 change_set_state,
847 } = buffer
848 else {
849 continue;
850 };
851 let Some(buffer) = buffer.upgrade() else {
852 continue;
853 };
854 let buffer = buffer.read(cx);
855 let Some(file) = File::from_dyn(buffer.file()) else {
856 continue;
857 };
858 if file.worktree != worktree_handle {
859 continue;
860 }
861 let change_set_state = change_set_state.read(cx);
862 if changed_repos
863 .iter()
864 .any(|(work_dir, _)| file.path.starts_with(work_dir))
865 {
866 let snapshot = buffer.text_snapshot();
867 change_set_state_updates.push((
868 snapshot.clone(),
869 file.path.clone(),
870 change_set_state
871 .unstaged_changes
872 .as_ref()
873 .and_then(|set| set.upgrade())
874 .is_some(),
875 change_set_state
876 .uncommitted_changes
877 .as_ref()
878 .and_then(|set| set.upgrade())
879 .is_some(),
880 ))
881 }
882 }
883
884 if change_set_state_updates.is_empty() {
885 return;
886 }
887
888 cx.spawn(move |this, mut cx| async move {
889 let snapshot =
890 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
891 let diff_bases_changes_by_buffer = cx
892 .background_executor()
893 .spawn(async move {
894 change_set_state_updates
895 .into_iter()
896 .filter_map(
897 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
898 let local_repo = snapshot.local_repo_for_path(&path)?;
899 let relative_path = local_repo.relativize(&path).ok()?;
900 let staged_text = if needs_staged_text {
901 local_repo.repo().load_index_text(&relative_path)
902 } else {
903 None
904 };
905 let committed_text = if needs_committed_text {
906 local_repo.repo().load_committed_text(&relative_path)
907 } else {
908 None
909 };
910 let diff_bases_change =
911 match (needs_staged_text, needs_committed_text) {
912 (true, true) => Some(if staged_text == committed_text {
913 DiffBasesChange::SetBoth(committed_text)
914 } else {
915 DiffBasesChange::SetEach {
916 index: staged_text,
917 head: committed_text,
918 }
919 }),
920 (true, false) => {
921 Some(DiffBasesChange::SetIndex(staged_text))
922 }
923 (false, true) => {
924 Some(DiffBasesChange::SetHead(committed_text))
925 }
926 (false, false) => None,
927 };
928 Some((buffer_snapshot, diff_bases_change))
929 },
930 )
931 .collect::<Vec<_>>()
932 })
933 .await;
934
935 this.update(&mut cx, |this, cx| {
936 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
937 let Some(OpenBuffer::Complete {
938 change_set_state, ..
939 }) = this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
940 else {
941 continue;
942 };
943 let Some(diff_bases_change) = diff_bases_change else {
944 continue;
945 };
946
947 change_set_state.update(cx, |change_set_state, cx| {
948 use proto::update_diff_bases::Mode;
949
950 if let Some((client, project_id)) = this.downstream_client.as_ref() {
951 let buffer_id = buffer_snapshot.remote_id().to_proto();
952 let (staged_text, committed_text, mode) = match diff_bases_change
953 .clone()
954 {
955 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
956 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
957 DiffBasesChange::SetEach { index, head } => {
958 (index, head, Mode::IndexAndHead)
959 }
960 DiffBasesChange::SetBoth(text) => {
961 (None, text, Mode::IndexMatchesHead)
962 }
963 };
964 let message = proto::UpdateDiffBases {
965 project_id: *project_id,
966 buffer_id,
967 staged_text,
968 committed_text,
969 mode: mode as i32,
970 };
971
972 client.send(message).log_err();
973 }
974
975 let _ = change_set_state.diff_bases_changed(
976 buffer_snapshot,
977 diff_bases_change,
978 cx,
979 );
980 });
981 }
982 })
983 })
984 .detach_and_log_err(cx);
985 }
986
987 fn local_worktree_entry_changed(
988 this: &mut BufferStore,
989 entry_id: ProjectEntryId,
990 path: &Arc<Path>,
991 worktree: &Entity<worktree::Worktree>,
992 snapshot: &worktree::Snapshot,
993 cx: &mut Context<BufferStore>,
994 ) -> Option<()> {
995 let project_path = ProjectPath {
996 worktree_id: snapshot.id(),
997 path: path.clone(),
998 };
999
1000 let buffer_id = {
1001 let local = this.as_local_mut()?;
1002 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1003 Some(&buffer_id) => buffer_id,
1004 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
1005 }
1006 };
1007
1008 let buffer = if let Some(buffer) = this.get(buffer_id) {
1009 Some(buffer)
1010 } else {
1011 this.opened_buffers.remove(&buffer_id);
1012 None
1013 };
1014
1015 let buffer = if let Some(buffer) = buffer {
1016 buffer
1017 } else {
1018 let this = this.as_local_mut()?;
1019 this.local_buffer_ids_by_path.remove(&project_path);
1020 this.local_buffer_ids_by_entry_id.remove(&entry_id);
1021 return None;
1022 };
1023
1024 let events = buffer.update(cx, |buffer, cx| {
1025 let local = this.as_local_mut()?;
1026 let file = buffer.file()?;
1027 let old_file = File::from_dyn(Some(file))?;
1028 if old_file.worktree != *worktree {
1029 return None;
1030 }
1031
1032 let snapshot_entry = old_file
1033 .entry_id
1034 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
1035 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
1036
1037 let new_file = if let Some(entry) = snapshot_entry {
1038 File {
1039 disk_state: match entry.mtime {
1040 Some(mtime) => DiskState::Present { mtime },
1041 None => old_file.disk_state,
1042 },
1043 is_local: true,
1044 entry_id: Some(entry.id),
1045 path: entry.path.clone(),
1046 worktree: worktree.clone(),
1047 is_private: entry.is_private,
1048 }
1049 } else {
1050 File {
1051 disk_state: DiskState::Deleted,
1052 is_local: true,
1053 entry_id: old_file.entry_id,
1054 path: old_file.path.clone(),
1055 worktree: worktree.clone(),
1056 is_private: old_file.is_private,
1057 }
1058 };
1059
1060 if new_file == *old_file {
1061 return None;
1062 }
1063
1064 let mut events = Vec::new();
1065 if new_file.path != old_file.path {
1066 local.local_buffer_ids_by_path.remove(&ProjectPath {
1067 path: old_file.path.clone(),
1068 worktree_id: old_file.worktree_id(cx),
1069 });
1070 local.local_buffer_ids_by_path.insert(
1071 ProjectPath {
1072 worktree_id: new_file.worktree_id(cx),
1073 path: new_file.path.clone(),
1074 },
1075 buffer_id,
1076 );
1077 events.push(BufferStoreEvent::BufferChangedFilePath {
1078 buffer: cx.entity(),
1079 old_file: buffer.file().cloned(),
1080 });
1081 }
1082
1083 if new_file.entry_id != old_file.entry_id {
1084 if let Some(entry_id) = old_file.entry_id {
1085 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1086 }
1087 if let Some(entry_id) = new_file.entry_id {
1088 local
1089 .local_buffer_ids_by_entry_id
1090 .insert(entry_id, buffer_id);
1091 }
1092 }
1093
1094 if let Some((client, project_id)) = &this.downstream_client {
1095 client
1096 .send(proto::UpdateBufferFile {
1097 project_id: *project_id,
1098 buffer_id: buffer_id.to_proto(),
1099 file: Some(new_file.to_proto(cx)),
1100 })
1101 .ok();
1102 }
1103
1104 buffer.file_updated(Arc::new(new_file), cx);
1105 Some(events)
1106 })?;
1107
1108 for event in events {
1109 cx.emit(event);
1110 }
1111
1112 None
1113 }
1114
1115 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1116 let file = File::from_dyn(buffer.read(cx).file())?;
1117
1118 let remote_id = buffer.read(cx).remote_id();
1119 if let Some(entry_id) = file.entry_id {
1120 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1121 Some(_) => {
1122 return None;
1123 }
1124 None => {
1125 self.local_buffer_ids_by_entry_id
1126 .insert(entry_id, remote_id);
1127 }
1128 }
1129 };
1130 self.local_buffer_ids_by_path.insert(
1131 ProjectPath {
1132 worktree_id: file.worktree_id(cx),
1133 path: file.path.clone(),
1134 },
1135 remote_id,
1136 );
1137
1138 Some(())
1139 }
1140
1141 fn save_buffer(
1142 &self,
1143 buffer: Entity<Buffer>,
1144 cx: &mut Context<BufferStore>,
1145 ) -> Task<Result<()>> {
1146 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1147 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1148 };
1149 let worktree = file.worktree.clone();
1150 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1151 }
1152
1153 fn save_buffer_as(
1154 &self,
1155 buffer: Entity<Buffer>,
1156 path: ProjectPath,
1157 cx: &mut Context<BufferStore>,
1158 ) -> Task<Result<()>> {
1159 let Some(worktree) = self
1160 .worktree_store
1161 .read(cx)
1162 .worktree_for_id(path.worktree_id, cx)
1163 else {
1164 return Task::ready(Err(anyhow!("no such worktree")));
1165 };
1166 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1167 }
1168
1169 fn open_buffer(
1170 &self,
1171 path: Arc<Path>,
1172 worktree: Entity<Worktree>,
1173 cx: &mut Context<BufferStore>,
1174 ) -> Task<Result<Entity<Buffer>>> {
1175 let load_buffer = worktree.update(cx, |worktree, cx| {
1176 let load_file = worktree.load_file(path.as_ref(), cx);
1177 let reservation = cx.reserve_entity();
1178 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1179 cx.spawn(move |_, mut cx| async move {
1180 let loaded = load_file.await?;
1181 let text_buffer = cx
1182 .background_executor()
1183 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1184 .await;
1185 cx.insert_entity(reservation, |_| {
1186 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1187 })
1188 })
1189 });
1190
1191 cx.spawn(move |this, mut cx| async move {
1192 let buffer = match load_buffer.await {
1193 Ok(buffer) => Ok(buffer),
1194 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1195 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1196 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1197 Buffer::build(
1198 text_buffer,
1199 Some(Arc::new(File {
1200 worktree,
1201 path,
1202 disk_state: DiskState::New,
1203 entry_id: None,
1204 is_local: true,
1205 is_private: false,
1206 })),
1207 Capability::ReadWrite,
1208 )
1209 }),
1210 Err(e) => Err(e),
1211 }?;
1212 this.update(&mut cx, |this, cx| {
1213 this.add_buffer(buffer.clone(), cx)?;
1214 let buffer_id = buffer.read(cx).remote_id();
1215 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1216 let this = this.as_local_mut().unwrap();
1217 this.local_buffer_ids_by_path.insert(
1218 ProjectPath {
1219 worktree_id: file.worktree_id(cx),
1220 path: file.path.clone(),
1221 },
1222 buffer_id,
1223 );
1224
1225 if let Some(entry_id) = file.entry_id {
1226 this.local_buffer_ids_by_entry_id
1227 .insert(entry_id, buffer_id);
1228 }
1229 }
1230
1231 anyhow::Ok(())
1232 })??;
1233
1234 Ok(buffer)
1235 })
1236 }
1237
1238 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1239 cx.spawn(|buffer_store, mut cx| async move {
1240 let buffer =
1241 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1242 buffer_store.update(&mut cx, |buffer_store, cx| {
1243 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1244 })?;
1245 Ok(buffer)
1246 })
1247 }
1248
1249 fn reload_buffers(
1250 &self,
1251 buffers: HashSet<Entity<Buffer>>,
1252 push_to_history: bool,
1253 cx: &mut Context<BufferStore>,
1254 ) -> Task<Result<ProjectTransaction>> {
1255 cx.spawn(move |_, mut cx| async move {
1256 let mut project_transaction = ProjectTransaction::default();
1257 for buffer in buffers {
1258 let transaction = buffer
1259 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1260 .await?;
1261 buffer.update(&mut cx, |buffer, cx| {
1262 if let Some(transaction) = transaction {
1263 if !push_to_history {
1264 buffer.forget_transaction(transaction.id);
1265 }
1266 project_transaction.0.insert(cx.entity(), transaction);
1267 }
1268 })?;
1269 }
1270
1271 Ok(project_transaction)
1272 })
1273 }
1274}
1275
1276impl BufferStore {
1277 pub fn init(client: &AnyProtoClient) {
1278 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1279 client.add_entity_message_handler(Self::handle_buffer_saved);
1280 client.add_entity_message_handler(Self::handle_update_buffer_file);
1281 client.add_entity_request_handler(Self::handle_save_buffer);
1282 client.add_entity_request_handler(Self::handle_blame_buffer);
1283 client.add_entity_request_handler(Self::handle_reload_buffers);
1284 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1285 client.add_entity_request_handler(Self::handle_open_unstaged_changes);
1286 client.add_entity_request_handler(Self::handle_open_uncommitted_changes);
1287 client.add_entity_message_handler(Self::handle_update_diff_bases);
1288 }
1289
1290 /// Creates a buffer store, optionally retaining its buffers.
1291 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1292 Self {
1293 state: BufferStoreState::Local(LocalBufferStore {
1294 local_buffer_ids_by_path: Default::default(),
1295 local_buffer_ids_by_entry_id: Default::default(),
1296 worktree_store: worktree_store.clone(),
1297 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1298 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1299 let this = this.as_local_mut().unwrap();
1300 this.subscribe_to_worktree(worktree, cx);
1301 }
1302 }),
1303 }),
1304 downstream_client: None,
1305 opened_buffers: Default::default(),
1306 shared_buffers: Default::default(),
1307 loading_buffers: Default::default(),
1308 loading_change_sets: Default::default(),
1309 worktree_store,
1310 }
1311 }
1312
1313 pub fn remote(
1314 worktree_store: Entity<WorktreeStore>,
1315 upstream_client: AnyProtoClient,
1316 remote_id: u64,
1317 _cx: &mut Context<Self>,
1318 ) -> Self {
1319 Self {
1320 state: BufferStoreState::Remote(RemoteBufferStore {
1321 shared_with_me: Default::default(),
1322 loading_remote_buffers_by_id: Default::default(),
1323 remote_buffer_listeners: Default::default(),
1324 project_id: remote_id,
1325 upstream_client,
1326 worktree_store: worktree_store.clone(),
1327 }),
1328 downstream_client: None,
1329 opened_buffers: Default::default(),
1330 loading_buffers: Default::default(),
1331 loading_change_sets: Default::default(),
1332 shared_buffers: Default::default(),
1333 worktree_store,
1334 }
1335 }
1336
1337 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1338 match &mut self.state {
1339 BufferStoreState::Local(state) => Some(state),
1340 _ => None,
1341 }
1342 }
1343
1344 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1345 match &mut self.state {
1346 BufferStoreState::Remote(state) => Some(state),
1347 _ => None,
1348 }
1349 }
1350
1351 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1352 match &self.state {
1353 BufferStoreState::Remote(state) => Some(state),
1354 _ => None,
1355 }
1356 }
1357
1358 pub fn open_buffer(
1359 &mut self,
1360 project_path: ProjectPath,
1361 cx: &mut Context<Self>,
1362 ) -> Task<Result<Entity<Buffer>>> {
1363 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1364 return Task::ready(Ok(buffer));
1365 }
1366
1367 let task = match self.loading_buffers.entry(project_path.clone()) {
1368 hash_map::Entry::Occupied(e) => e.get().clone(),
1369 hash_map::Entry::Vacant(entry) => {
1370 let path = project_path.path.clone();
1371 let Some(worktree) = self
1372 .worktree_store
1373 .read(cx)
1374 .worktree_for_id(project_path.worktree_id, cx)
1375 else {
1376 return Task::ready(Err(anyhow!("no such worktree")));
1377 };
1378 let load_buffer = match &self.state {
1379 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1380 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1381 };
1382
1383 entry
1384 .insert(
1385 cx.spawn(move |this, mut cx| async move {
1386 let load_result = load_buffer.await;
1387 this.update(&mut cx, |this, _cx| {
1388 // Record the fact that the buffer is no longer loading.
1389 this.loading_buffers.remove(&project_path);
1390 })
1391 .ok();
1392 load_result.map_err(Arc::new)
1393 })
1394 .shared(),
1395 )
1396 .clone()
1397 }
1398 };
1399
1400 cx.background_executor()
1401 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1402 }
1403
1404 pub fn open_unstaged_changes(
1405 &mut self,
1406 buffer: Entity<Buffer>,
1407 cx: &mut Context<Self>,
1408 ) -> Task<Result<Entity<BufferChangeSet>>> {
1409 let buffer_id = buffer.read(cx).remote_id();
1410 if let Some(change_set) = self.get_unstaged_changes(buffer_id, cx) {
1411 return Task::ready(Ok(change_set));
1412 }
1413
1414 let task = match self
1415 .loading_change_sets
1416 .entry((buffer_id, ChangeSetKind::Unstaged))
1417 {
1418 hash_map::Entry::Occupied(e) => e.get().clone(),
1419 hash_map::Entry::Vacant(entry) => {
1420 let staged_text = match &self.state {
1421 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1422 BufferStoreState::Remote(this) => this.open_unstaged_changes(buffer_id, cx),
1423 };
1424
1425 entry
1426 .insert(
1427 cx.spawn(move |this, cx| async move {
1428 Self::open_change_set_internal(
1429 this,
1430 ChangeSetKind::Unstaged,
1431 staged_text.await.map(DiffBasesChange::SetIndex),
1432 buffer,
1433 cx,
1434 )
1435 .await
1436 .map_err(Arc::new)
1437 })
1438 .shared(),
1439 )
1440 .clone()
1441 }
1442 };
1443
1444 cx.background_executor()
1445 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1446 }
1447
1448 pub fn open_uncommitted_changes(
1449 &mut self,
1450 buffer: Entity<Buffer>,
1451 cx: &mut Context<Self>,
1452 ) -> Task<Result<Entity<BufferChangeSet>>> {
1453 let buffer_id = buffer.read(cx).remote_id();
1454 if let Some(change_set) = self.get_uncommitted_changes(buffer_id, cx) {
1455 return Task::ready(Ok(change_set));
1456 }
1457
1458 let task = match self
1459 .loading_change_sets
1460 .entry((buffer_id, ChangeSetKind::Uncommitted))
1461 {
1462 hash_map::Entry::Occupied(e) => e.get().clone(),
1463 hash_map::Entry::Vacant(entry) => {
1464 let changes = match &self.state {
1465 BufferStoreState::Local(this) => {
1466 let committed_text = this.load_committed_text(&buffer, cx);
1467 let staged_text = this.load_staged_text(&buffer, cx);
1468 cx.background_executor().spawn(async move {
1469 let committed_text = committed_text.await?;
1470 let staged_text = staged_text.await?;
1471 let diff_bases_change = if committed_text == staged_text {
1472 DiffBasesChange::SetBoth(committed_text)
1473 } else {
1474 DiffBasesChange::SetEach {
1475 index: staged_text,
1476 head: committed_text,
1477 }
1478 };
1479 Ok(diff_bases_change)
1480 })
1481 }
1482 BufferStoreState::Remote(this) => this.open_uncommitted_changes(buffer_id, cx),
1483 };
1484
1485 entry
1486 .insert(
1487 cx.spawn(move |this, cx| async move {
1488 Self::open_change_set_internal(
1489 this,
1490 ChangeSetKind::Uncommitted,
1491 changes.await,
1492 buffer,
1493 cx,
1494 )
1495 .await
1496 .map_err(Arc::new)
1497 })
1498 .shared(),
1499 )
1500 .clone()
1501 }
1502 };
1503
1504 cx.background_executor()
1505 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1506 }
1507
1508 #[cfg(any(test, feature = "test-support"))]
1509 pub fn set_unstaged_change_set(
1510 &mut self,
1511 buffer_id: BufferId,
1512 change_set: Entity<BufferChangeSet>,
1513 ) {
1514 self.loading_change_sets.insert(
1515 (buffer_id, ChangeSetKind::Unstaged),
1516 Task::ready(Ok(change_set)).shared(),
1517 );
1518 }
1519
1520 async fn open_change_set_internal(
1521 this: WeakEntity<Self>,
1522 kind: ChangeSetKind,
1523 texts: Result<DiffBasesChange>,
1524 buffer: Entity<Buffer>,
1525 mut cx: AsyncApp,
1526 ) -> Result<Entity<BufferChangeSet>> {
1527 let diff_bases_change = match texts {
1528 Err(e) => {
1529 this.update(&mut cx, |this, cx| {
1530 let buffer_id = buffer.read(cx).remote_id();
1531 this.loading_change_sets.remove(&(buffer_id, kind));
1532 })?;
1533 return Err(e);
1534 }
1535 Ok(change) => change,
1536 };
1537
1538 this.update(&mut cx, |this, cx| {
1539 let buffer_id = buffer.read(cx).remote_id();
1540 this.loading_change_sets.remove(&(buffer_id, kind));
1541
1542 if let Some(OpenBuffer::Complete {
1543 change_set_state, ..
1544 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1545 {
1546 change_set_state.update(cx, |change_set_state, cx| {
1547 let buffer_id = buffer.read(cx).remote_id();
1548 change_set_state.buffer_subscription.get_or_insert_with(|| {
1549 cx.subscribe(&buffer, |this, buffer, event, cx| match event {
1550 BufferEvent::LanguageChanged => {
1551 this.buffer_language_changed(buffer, cx)
1552 }
1553 _ => {}
1554 })
1555 });
1556
1557 let change_set = cx.new(|cx| BufferChangeSet {
1558 buffer_id,
1559 base_text: None,
1560 diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
1561 unstaged_change_set: None,
1562 });
1563 match kind {
1564 ChangeSetKind::Unstaged => {
1565 change_set_state.unstaged_changes = Some(change_set.downgrade())
1566 }
1567 ChangeSetKind::Uncommitted => {
1568 let unstaged_change_set =
1569 if let Some(change_set) = change_set_state.unstaged_changes() {
1570 change_set
1571 } else {
1572 let unstaged_change_set = cx.new(|cx| BufferChangeSet {
1573 buffer_id,
1574 base_text: None,
1575 diff_to_buffer: BufferDiff::new(
1576 &buffer.read(cx).text_snapshot(),
1577 ),
1578 unstaged_change_set: None,
1579 });
1580 change_set_state.unstaged_changes =
1581 Some(unstaged_change_set.downgrade());
1582 unstaged_change_set
1583 };
1584
1585 change_set.update(cx, |change_set, _| {
1586 change_set.unstaged_change_set = Some(unstaged_change_set);
1587 });
1588 change_set_state.uncommitted_changes = Some(change_set.downgrade())
1589 }
1590 };
1591
1592 let buffer = buffer.read(cx).text_snapshot();
1593 let rx = change_set_state.diff_bases_changed(buffer, diff_bases_change, cx);
1594
1595 Ok(async move {
1596 rx.await.ok();
1597 Ok(change_set)
1598 })
1599 })
1600 } else {
1601 Err(anyhow!("buffer was closed"))
1602 }
1603 })??
1604 .await
1605 }
1606
1607 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1608 match &self.state {
1609 BufferStoreState::Local(this) => this.create_buffer(cx),
1610 BufferStoreState::Remote(this) => this.create_buffer(cx),
1611 }
1612 }
1613
1614 pub fn save_buffer(
1615 &mut self,
1616 buffer: Entity<Buffer>,
1617 cx: &mut Context<Self>,
1618 ) -> Task<Result<()>> {
1619 match &mut self.state {
1620 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1621 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1622 }
1623 }
1624
1625 pub fn save_buffer_as(
1626 &mut self,
1627 buffer: Entity<Buffer>,
1628 path: ProjectPath,
1629 cx: &mut Context<Self>,
1630 ) -> Task<Result<()>> {
1631 let old_file = buffer.read(cx).file().cloned();
1632 let task = match &self.state {
1633 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1634 BufferStoreState::Remote(this) => {
1635 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1636 }
1637 };
1638 cx.spawn(|this, mut cx| async move {
1639 task.await?;
1640 this.update(&mut cx, |_, cx| {
1641 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1642 })
1643 })
1644 }
1645
1646 pub fn blame_buffer(
1647 &self,
1648 buffer: &Entity<Buffer>,
1649 version: Option<clock::Global>,
1650 cx: &App,
1651 ) -> Task<Result<Option<Blame>>> {
1652 let buffer = buffer.read(cx);
1653 let Some(file) = File::from_dyn(buffer.file()) else {
1654 return Task::ready(Err(anyhow!("buffer has no file")));
1655 };
1656
1657 match file.worktree.clone().read(cx) {
1658 Worktree::Local(worktree) => {
1659 let worktree = worktree.snapshot();
1660 let blame_params = maybe!({
1661 let local_repo = match worktree.local_repo_for_path(&file.path) {
1662 Some(repo_for_path) => repo_for_path,
1663 None => return Ok(None),
1664 };
1665
1666 let relative_path = local_repo
1667 .relativize(&file.path)
1668 .context("failed to relativize buffer path")?;
1669
1670 let repo = local_repo.repo().clone();
1671
1672 let content = match version {
1673 Some(version) => buffer.rope_for_version(&version).clone(),
1674 None => buffer.as_rope().clone(),
1675 };
1676
1677 anyhow::Ok(Some((repo, relative_path, content)))
1678 });
1679
1680 cx.background_executor().spawn(async move {
1681 let Some((repo, relative_path, content)) = blame_params? else {
1682 return Ok(None);
1683 };
1684 repo.blame(&relative_path, content)
1685 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1686 .map(Some)
1687 })
1688 }
1689 Worktree::Remote(worktree) => {
1690 let buffer_id = buffer.remote_id();
1691 let version = buffer.version();
1692 let project_id = worktree.project_id();
1693 let client = worktree.client();
1694 cx.spawn(|_| async move {
1695 let response = client
1696 .request(proto::BlameBuffer {
1697 project_id,
1698 buffer_id: buffer_id.into(),
1699 version: serialize_version(&version),
1700 })
1701 .await?;
1702 Ok(deserialize_blame_buffer_response(response))
1703 })
1704 }
1705 }
1706 }
1707
1708 pub fn get_permalink_to_line(
1709 &self,
1710 buffer: &Entity<Buffer>,
1711 selection: Range<u32>,
1712 cx: &App,
1713 ) -> Task<Result<url::Url>> {
1714 let buffer = buffer.read(cx);
1715 let Some(file) = File::from_dyn(buffer.file()) else {
1716 return Task::ready(Err(anyhow!("buffer has no file")));
1717 };
1718
1719 match file.worktree.read(cx) {
1720 Worktree::Local(worktree) => {
1721 let worktree_path = worktree.abs_path().clone();
1722 let Some((repo_entry, repo)) =
1723 worktree.repository_for_path(file.path()).and_then(|entry| {
1724 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1725 Some((entry, repo))
1726 })
1727 else {
1728 // If we're not in a Git repo, check whether this is a Rust source
1729 // file in the Cargo registry (presumably opened with go-to-definition
1730 // from a normal Rust file). If so, we can put together a permalink
1731 // using crate metadata.
1732 if !buffer
1733 .language()
1734 .is_some_and(|lang| lang.name() == "Rust".into())
1735 {
1736 return Task::ready(Err(anyhow!("no permalink available")));
1737 }
1738 let file_path = worktree_path.join(file.path());
1739 return cx.spawn(|cx| async move {
1740 let provider_registry =
1741 cx.update(GitHostingProviderRegistry::default_global)?;
1742 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1743 .map_err(|_| anyhow!("no permalink available"))
1744 });
1745 };
1746
1747 let path = match repo_entry.relativize(file.path()) {
1748 Ok(RepoPath(path)) => path,
1749 Err(e) => return Task::ready(Err(e)),
1750 };
1751
1752 cx.spawn(|cx| async move {
1753 const REMOTE_NAME: &str = "origin";
1754 let origin_url = repo
1755 .remote_url(REMOTE_NAME)
1756 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1757
1758 let sha = repo
1759 .head_sha()
1760 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1761
1762 let provider_registry =
1763 cx.update(GitHostingProviderRegistry::default_global)?;
1764
1765 let (provider, remote) =
1766 parse_git_remote_url(provider_registry, &origin_url)
1767 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1768
1769 let path = path
1770 .to_str()
1771 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1772
1773 Ok(provider.build_permalink(
1774 remote,
1775 BuildPermalinkParams {
1776 sha: &sha,
1777 path,
1778 selection: Some(selection),
1779 },
1780 ))
1781 })
1782 }
1783 Worktree::Remote(worktree) => {
1784 let buffer_id = buffer.remote_id();
1785 let project_id = worktree.project_id();
1786 let client = worktree.client();
1787 cx.spawn(|_| async move {
1788 let response = client
1789 .request(proto::GetPermalinkToLine {
1790 project_id,
1791 buffer_id: buffer_id.into(),
1792 selection: Some(proto::Range {
1793 start: selection.start as u64,
1794 end: selection.end as u64,
1795 }),
1796 })
1797 .await?;
1798
1799 url::Url::parse(&response.permalink).context("failed to parse permalink")
1800 })
1801 }
1802 }
1803 }
1804
1805 fn add_buffer(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1806 let remote_id = buffer.read(cx).remote_id();
1807 let is_remote = buffer.read(cx).replica_id() != 0;
1808 let open_buffer = OpenBuffer::Complete {
1809 buffer: buffer.downgrade(),
1810 change_set_state: cx.new(|_| BufferChangeSetState::default()),
1811 };
1812
1813 let handle = cx.entity().downgrade();
1814 buffer.update(cx, move |_, cx| {
1815 cx.on_release(move |buffer, cx| {
1816 handle
1817 .update(cx, |_, cx| {
1818 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1819 })
1820 .ok();
1821 })
1822 .detach()
1823 });
1824
1825 match self.opened_buffers.entry(remote_id) {
1826 hash_map::Entry::Vacant(entry) => {
1827 entry.insert(open_buffer);
1828 }
1829 hash_map::Entry::Occupied(mut entry) => {
1830 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1831 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1832 } else if entry.get().upgrade().is_some() {
1833 if is_remote {
1834 return Ok(());
1835 } else {
1836 debug_panic!("buffer {} was already registered", remote_id);
1837 Err(anyhow!("buffer {} was already registered", remote_id))?;
1838 }
1839 }
1840 entry.insert(open_buffer);
1841 }
1842 }
1843
1844 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1845 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1846 Ok(())
1847 }
1848
1849 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1850 self.opened_buffers
1851 .values()
1852 .filter_map(|buffer| buffer.upgrade())
1853 }
1854
1855 pub fn loading_buffers(
1856 &self,
1857 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1858 self.loading_buffers.iter().map(|(path, task)| {
1859 let task = task.clone();
1860 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1861 })
1862 }
1863
1864 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1865 self.buffers().find_map(|buffer| {
1866 let file = File::from_dyn(buffer.read(cx).file())?;
1867 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1868 Some(buffer)
1869 } else {
1870 None
1871 }
1872 })
1873 }
1874
1875 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1876 self.opened_buffers.get(&buffer_id)?.upgrade()
1877 }
1878
1879 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1880 self.get(buffer_id)
1881 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1882 }
1883
1884 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1885 self.get(buffer_id).or_else(|| {
1886 self.as_remote()
1887 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1888 })
1889 }
1890
1891 pub fn get_unstaged_changes(
1892 &self,
1893 buffer_id: BufferId,
1894 cx: &App,
1895 ) -> Option<Entity<BufferChangeSet>> {
1896 if let OpenBuffer::Complete {
1897 change_set_state, ..
1898 } = self.opened_buffers.get(&buffer_id)?
1899 {
1900 change_set_state
1901 .read(cx)
1902 .unstaged_changes
1903 .as_ref()?
1904 .upgrade()
1905 } else {
1906 None
1907 }
1908 }
1909
1910 pub fn get_uncommitted_changes(
1911 &self,
1912 buffer_id: BufferId,
1913 cx: &App,
1914 ) -> Option<Entity<BufferChangeSet>> {
1915 if let OpenBuffer::Complete {
1916 change_set_state, ..
1917 } = self.opened_buffers.get(&buffer_id)?
1918 {
1919 change_set_state
1920 .read(cx)
1921 .uncommitted_changes
1922 .as_ref()?
1923 .upgrade()
1924 } else {
1925 None
1926 }
1927 }
1928
1929 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1930 let buffers = self
1931 .buffers()
1932 .map(|buffer| {
1933 let buffer = buffer.read(cx);
1934 proto::BufferVersion {
1935 id: buffer.remote_id().into(),
1936 version: language::proto::serialize_version(&buffer.version),
1937 }
1938 })
1939 .collect();
1940 let incomplete_buffer_ids = self
1941 .as_remote()
1942 .map(|remote| remote.incomplete_buffer_ids())
1943 .unwrap_or_default();
1944 (buffers, incomplete_buffer_ids)
1945 }
1946
1947 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1948 for open_buffer in self.opened_buffers.values_mut() {
1949 if let Some(buffer) = open_buffer.upgrade() {
1950 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1951 }
1952 }
1953
1954 for buffer in self.buffers() {
1955 buffer.update(cx, |buffer, cx| {
1956 buffer.set_capability(Capability::ReadOnly, cx)
1957 });
1958 }
1959
1960 if let Some(remote) = self.as_remote_mut() {
1961 // Wake up all futures currently waiting on a buffer to get opened,
1962 // to give them a chance to fail now that we've disconnected.
1963 remote.remote_buffer_listeners.clear()
1964 }
1965 }
1966
1967 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1968 self.downstream_client = Some((downstream_client, remote_id));
1969 }
1970
1971 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1972 self.downstream_client.take();
1973 self.forget_shared_buffers();
1974 }
1975
1976 pub fn discard_incomplete(&mut self) {
1977 self.opened_buffers
1978 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1979 }
1980
1981 pub fn find_search_candidates(
1982 &mut self,
1983 query: &SearchQuery,
1984 mut limit: usize,
1985 fs: Arc<dyn Fs>,
1986 cx: &mut Context<Self>,
1987 ) -> Receiver<Entity<Buffer>> {
1988 let (tx, rx) = smol::channel::unbounded();
1989 let mut open_buffers = HashSet::default();
1990 let mut unnamed_buffers = Vec::new();
1991 for handle in self.buffers() {
1992 let buffer = handle.read(cx);
1993 if let Some(entry_id) = buffer.entry_id(cx) {
1994 open_buffers.insert(entry_id);
1995 } else {
1996 limit = limit.saturating_sub(1);
1997 unnamed_buffers.push(handle)
1998 };
1999 }
2000
2001 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
2002 let project_paths_rx = self
2003 .worktree_store
2004 .update(cx, |worktree_store, cx| {
2005 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
2006 })
2007 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
2008
2009 cx.spawn(|this, mut cx| async move {
2010 for buffer in unnamed_buffers {
2011 tx.send(buffer).await.ok();
2012 }
2013
2014 let mut project_paths_rx = pin!(project_paths_rx);
2015 while let Some(project_paths) = project_paths_rx.next().await {
2016 let buffers = this.update(&mut cx, |this, cx| {
2017 project_paths
2018 .into_iter()
2019 .map(|project_path| this.open_buffer(project_path, cx))
2020 .collect::<Vec<_>>()
2021 })?;
2022 for buffer_task in buffers {
2023 if let Some(buffer) = buffer_task.await.log_err() {
2024 if tx.send(buffer).await.is_err() {
2025 return anyhow::Ok(());
2026 }
2027 }
2028 }
2029 }
2030 anyhow::Ok(())
2031 })
2032 .detach();
2033 rx
2034 }
2035
2036 pub fn recalculate_buffer_diffs(
2037 &mut self,
2038 buffers: Vec<Entity<Buffer>>,
2039 cx: &mut Context<Self>,
2040 ) -> impl Future<Output = ()> {
2041 let mut futures = Vec::new();
2042 for buffer in buffers {
2043 if let Some(OpenBuffer::Complete {
2044 change_set_state, ..
2045 }) = self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
2046 {
2047 let buffer = buffer.read(cx).text_snapshot();
2048 futures.push(change_set_state.update(cx, |change_set_state, cx| {
2049 change_set_state.recalculate_diffs(buffer, cx)
2050 }));
2051 }
2052 }
2053 async move {
2054 futures::future::join_all(futures).await;
2055 }
2056 }
2057
2058 fn on_buffer_event(
2059 &mut self,
2060 buffer: Entity<Buffer>,
2061 event: &BufferEvent,
2062 cx: &mut Context<Self>,
2063 ) {
2064 match event {
2065 BufferEvent::FileHandleChanged => {
2066 if let Some(local) = self.as_local_mut() {
2067 local.buffer_changed_file(buffer, cx);
2068 }
2069 }
2070 BufferEvent::Reloaded => {
2071 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
2072 return;
2073 };
2074 let buffer = buffer.read(cx);
2075 downstream_client
2076 .send(proto::BufferReloaded {
2077 project_id: *project_id,
2078 buffer_id: buffer.remote_id().to_proto(),
2079 version: serialize_version(&buffer.version()),
2080 mtime: buffer.saved_mtime().map(|t| t.into()),
2081 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
2082 })
2083 .log_err();
2084 }
2085 _ => {}
2086 }
2087 }
2088
2089 pub async fn handle_update_buffer(
2090 this: Entity<Self>,
2091 envelope: TypedEnvelope<proto::UpdateBuffer>,
2092 mut cx: AsyncApp,
2093 ) -> Result<proto::Ack> {
2094 let payload = envelope.payload.clone();
2095 let buffer_id = BufferId::new(payload.buffer_id)?;
2096 let ops = payload
2097 .operations
2098 .into_iter()
2099 .map(language::proto::deserialize_operation)
2100 .collect::<Result<Vec<_>, _>>()?;
2101 this.update(&mut cx, |this, cx| {
2102 match this.opened_buffers.entry(buffer_id) {
2103 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2104 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2105 OpenBuffer::Complete { buffer, .. } => {
2106 if let Some(buffer) = buffer.upgrade() {
2107 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2108 }
2109 }
2110 },
2111 hash_map::Entry::Vacant(e) => {
2112 e.insert(OpenBuffer::Operations(ops));
2113 }
2114 }
2115 Ok(proto::Ack {})
2116 })?
2117 }
2118
2119 pub fn register_shared_lsp_handle(
2120 &mut self,
2121 peer_id: proto::PeerId,
2122 buffer_id: BufferId,
2123 handle: OpenLspBufferHandle,
2124 ) {
2125 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2126 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2127 buffer.lsp_handle = Some(handle);
2128 return;
2129 }
2130 }
2131 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2132 }
2133
2134 pub fn handle_synchronize_buffers(
2135 &mut self,
2136 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2137 cx: &mut Context<Self>,
2138 client: Arc<Client>,
2139 ) -> Result<proto::SynchronizeBuffersResponse> {
2140 let project_id = envelope.payload.project_id;
2141 let mut response = proto::SynchronizeBuffersResponse {
2142 buffers: Default::default(),
2143 };
2144 let Some(guest_id) = envelope.original_sender_id else {
2145 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2146 };
2147
2148 self.shared_buffers.entry(guest_id).or_default().clear();
2149 for buffer in envelope.payload.buffers {
2150 let buffer_id = BufferId::new(buffer.id)?;
2151 let remote_version = language::proto::deserialize_version(&buffer.version);
2152 if let Some(buffer) = self.get(buffer_id) {
2153 self.shared_buffers
2154 .entry(guest_id)
2155 .or_default()
2156 .entry(buffer_id)
2157 .or_insert_with(|| SharedBuffer {
2158 buffer: buffer.clone(),
2159 change_set: None,
2160 lsp_handle: None,
2161 });
2162
2163 let buffer = buffer.read(cx);
2164 response.buffers.push(proto::BufferVersion {
2165 id: buffer_id.into(),
2166 version: language::proto::serialize_version(&buffer.version),
2167 });
2168
2169 let operations = buffer.serialize_ops(Some(remote_version), cx);
2170 let client = client.clone();
2171 if let Some(file) = buffer.file() {
2172 client
2173 .send(proto::UpdateBufferFile {
2174 project_id,
2175 buffer_id: buffer_id.into(),
2176 file: Some(file.to_proto(cx)),
2177 })
2178 .log_err();
2179 }
2180
2181 // TODO(max): do something
2182 // client
2183 // .send(proto::UpdateStagedText {
2184 // project_id,
2185 // buffer_id: buffer_id.into(),
2186 // diff_base: buffer.diff_base().map(ToString::to_string),
2187 // })
2188 // .log_err();
2189
2190 client
2191 .send(proto::BufferReloaded {
2192 project_id,
2193 buffer_id: buffer_id.into(),
2194 version: language::proto::serialize_version(buffer.saved_version()),
2195 mtime: buffer.saved_mtime().map(|time| time.into()),
2196 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2197 as i32,
2198 })
2199 .log_err();
2200
2201 cx.background_executor()
2202 .spawn(
2203 async move {
2204 let operations = operations.await;
2205 for chunk in split_operations(operations) {
2206 client
2207 .request(proto::UpdateBuffer {
2208 project_id,
2209 buffer_id: buffer_id.into(),
2210 operations: chunk,
2211 })
2212 .await?;
2213 }
2214 anyhow::Ok(())
2215 }
2216 .log_err(),
2217 )
2218 .detach();
2219 }
2220 }
2221 Ok(response)
2222 }
2223
2224 pub fn handle_create_buffer_for_peer(
2225 &mut self,
2226 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2227 replica_id: u16,
2228 capability: Capability,
2229 cx: &mut Context<Self>,
2230 ) -> Result<()> {
2231 let Some(remote) = self.as_remote_mut() else {
2232 return Err(anyhow!("buffer store is not a remote"));
2233 };
2234
2235 if let Some(buffer) =
2236 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2237 {
2238 self.add_buffer(buffer, cx)?;
2239 }
2240
2241 Ok(())
2242 }
2243
2244 pub async fn handle_update_buffer_file(
2245 this: Entity<Self>,
2246 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2247 mut cx: AsyncApp,
2248 ) -> Result<()> {
2249 let buffer_id = envelope.payload.buffer_id;
2250 let buffer_id = BufferId::new(buffer_id)?;
2251
2252 this.update(&mut cx, |this, cx| {
2253 let payload = envelope.payload.clone();
2254 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2255 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2256 let worktree = this
2257 .worktree_store
2258 .read(cx)
2259 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2260 .ok_or_else(|| anyhow!("no such worktree"))?;
2261 let file = File::from_proto(file, worktree, cx)?;
2262 let old_file = buffer.update(cx, |buffer, cx| {
2263 let old_file = buffer.file().cloned();
2264 let new_path = file.path.clone();
2265 buffer.file_updated(Arc::new(file), cx);
2266 if old_file
2267 .as_ref()
2268 .map_or(true, |old| *old.path() != new_path)
2269 {
2270 Some(old_file)
2271 } else {
2272 None
2273 }
2274 });
2275 if let Some(old_file) = old_file {
2276 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2277 }
2278 }
2279 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2280 downstream_client
2281 .send(proto::UpdateBufferFile {
2282 project_id: *project_id,
2283 buffer_id: buffer_id.into(),
2284 file: envelope.payload.file,
2285 })
2286 .log_err();
2287 }
2288 Ok(())
2289 })?
2290 }
2291
2292 pub async fn handle_save_buffer(
2293 this: Entity<Self>,
2294 envelope: TypedEnvelope<proto::SaveBuffer>,
2295 mut cx: AsyncApp,
2296 ) -> Result<proto::BufferSaved> {
2297 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2298 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2299 anyhow::Ok((
2300 this.get_existing(buffer_id)?,
2301 this.downstream_client
2302 .as_ref()
2303 .map(|(_, project_id)| *project_id)
2304 .context("project is not shared")?,
2305 ))
2306 })??;
2307 buffer
2308 .update(&mut cx, |buffer, _| {
2309 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2310 })?
2311 .await?;
2312 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2313
2314 if let Some(new_path) = envelope.payload.new_path {
2315 let new_path = ProjectPath::from_proto(new_path);
2316 this.update(&mut cx, |this, cx| {
2317 this.save_buffer_as(buffer.clone(), new_path, cx)
2318 })?
2319 .await?;
2320 } else {
2321 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2322 .await?;
2323 }
2324
2325 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2326 project_id,
2327 buffer_id: buffer_id.into(),
2328 version: serialize_version(buffer.saved_version()),
2329 mtime: buffer.saved_mtime().map(|time| time.into()),
2330 })
2331 }
2332
2333 pub async fn handle_close_buffer(
2334 this: Entity<Self>,
2335 envelope: TypedEnvelope<proto::CloseBuffer>,
2336 mut cx: AsyncApp,
2337 ) -> Result<()> {
2338 let peer_id = envelope.sender_id;
2339 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2340 this.update(&mut cx, |this, _| {
2341 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2342 if shared.remove(&buffer_id).is_some() {
2343 if shared.is_empty() {
2344 this.shared_buffers.remove(&peer_id);
2345 }
2346 return;
2347 }
2348 }
2349 debug_panic!(
2350 "peer_id {} closed buffer_id {} which was either not open or already closed",
2351 peer_id,
2352 buffer_id
2353 )
2354 })
2355 }
2356
2357 pub async fn handle_buffer_saved(
2358 this: Entity<Self>,
2359 envelope: TypedEnvelope<proto::BufferSaved>,
2360 mut cx: AsyncApp,
2361 ) -> Result<()> {
2362 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2363 let version = deserialize_version(&envelope.payload.version);
2364 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2365 this.update(&mut cx, move |this, cx| {
2366 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2367 buffer.update(cx, |buffer, cx| {
2368 buffer.did_save(version, mtime, cx);
2369 });
2370 }
2371
2372 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2373 downstream_client
2374 .send(proto::BufferSaved {
2375 project_id: *project_id,
2376 buffer_id: buffer_id.into(),
2377 mtime: envelope.payload.mtime,
2378 version: envelope.payload.version,
2379 })
2380 .log_err();
2381 }
2382 })
2383 }
2384
2385 pub async fn handle_buffer_reloaded(
2386 this: Entity<Self>,
2387 envelope: TypedEnvelope<proto::BufferReloaded>,
2388 mut cx: AsyncApp,
2389 ) -> Result<()> {
2390 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2391 let version = deserialize_version(&envelope.payload.version);
2392 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2393 let line_ending = deserialize_line_ending(
2394 proto::LineEnding::from_i32(envelope.payload.line_ending)
2395 .ok_or_else(|| anyhow!("missing line ending"))?,
2396 );
2397 this.update(&mut cx, |this, cx| {
2398 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2399 buffer.update(cx, |buffer, cx| {
2400 buffer.did_reload(version, line_ending, mtime, cx);
2401 });
2402 }
2403
2404 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2405 downstream_client
2406 .send(proto::BufferReloaded {
2407 project_id: *project_id,
2408 buffer_id: buffer_id.into(),
2409 mtime: envelope.payload.mtime,
2410 version: envelope.payload.version,
2411 line_ending: envelope.payload.line_ending,
2412 })
2413 .log_err();
2414 }
2415 })
2416 }
2417
2418 pub async fn handle_blame_buffer(
2419 this: Entity<Self>,
2420 envelope: TypedEnvelope<proto::BlameBuffer>,
2421 mut cx: AsyncApp,
2422 ) -> Result<proto::BlameBufferResponse> {
2423 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2424 let version = deserialize_version(&envelope.payload.version);
2425 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2426 buffer
2427 .update(&mut cx, |buffer, _| {
2428 buffer.wait_for_version(version.clone())
2429 })?
2430 .await?;
2431 let blame = this
2432 .update(&mut cx, |this, cx| {
2433 this.blame_buffer(&buffer, Some(version), cx)
2434 })?
2435 .await?;
2436 Ok(serialize_blame_buffer_response(blame))
2437 }
2438
2439 pub async fn handle_get_permalink_to_line(
2440 this: Entity<Self>,
2441 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2442 mut cx: AsyncApp,
2443 ) -> Result<proto::GetPermalinkToLineResponse> {
2444 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2445 // let version = deserialize_version(&envelope.payload.version);
2446 let selection = {
2447 let proto_selection = envelope
2448 .payload
2449 .selection
2450 .context("no selection to get permalink for defined")?;
2451 proto_selection.start as u32..proto_selection.end as u32
2452 };
2453 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2454 let permalink = this
2455 .update(&mut cx, |this, cx| {
2456 this.get_permalink_to_line(&buffer, selection, cx)
2457 })?
2458 .await?;
2459 Ok(proto::GetPermalinkToLineResponse {
2460 permalink: permalink.to_string(),
2461 })
2462 }
2463
2464 pub async fn handle_open_unstaged_changes(
2465 this: Entity<Self>,
2466 request: TypedEnvelope<proto::OpenUnstagedChanges>,
2467 mut cx: AsyncApp,
2468 ) -> Result<proto::OpenUnstagedChangesResponse> {
2469 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2470 let change_set = this
2471 .update(&mut cx, |this, cx| {
2472 let buffer = this.get(buffer_id)?;
2473 Some(this.open_unstaged_changes(buffer, cx))
2474 })?
2475 .ok_or_else(|| anyhow!("no such buffer"))?
2476 .await?;
2477 this.update(&mut cx, |this, _| {
2478 let shared_buffers = this
2479 .shared_buffers
2480 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2481 .or_default();
2482 debug_assert!(shared_buffers.contains_key(&buffer_id));
2483 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2484 shared.change_set = Some(change_set.clone());
2485 }
2486 })?;
2487 let staged_text = change_set.read_with(&cx, |change_set, _| {
2488 change_set.base_text.as_ref().map(|buffer| buffer.text())
2489 })?;
2490 Ok(proto::OpenUnstagedChangesResponse { staged_text })
2491 }
2492
2493 pub async fn handle_open_uncommitted_changes(
2494 this: Entity<Self>,
2495 request: TypedEnvelope<proto::OpenUncommittedChanges>,
2496 mut cx: AsyncApp,
2497 ) -> Result<proto::OpenUncommittedChangesResponse> {
2498 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2499 let change_set = this
2500 .update(&mut cx, |this, cx| {
2501 let buffer = this.get(buffer_id)?;
2502 Some(this.open_uncommitted_changes(buffer, cx))
2503 })?
2504 .ok_or_else(|| anyhow!("no such buffer"))?
2505 .await?;
2506 this.update(&mut cx, |this, _| {
2507 let shared_buffers = this
2508 .shared_buffers
2509 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2510 .or_default();
2511 debug_assert!(shared_buffers.contains_key(&buffer_id));
2512 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2513 shared.change_set = Some(change_set.clone());
2514 }
2515 })?;
2516 change_set.read_with(&cx, |change_set, cx| {
2517 use proto::open_uncommitted_changes_response::Mode;
2518
2519 let staged_buffer = change_set
2520 .unstaged_change_set
2521 .as_ref()
2522 .and_then(|change_set| change_set.read(cx).base_text.as_ref());
2523
2524 let mode;
2525 let staged_text;
2526 let committed_text;
2527 if let Some(committed_buffer) = &change_set.base_text {
2528 committed_text = Some(committed_buffer.text());
2529 if let Some(staged_buffer) = staged_buffer {
2530 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2531 mode = Mode::IndexMatchesHead;
2532 staged_text = None;
2533 } else {
2534 mode = Mode::IndexAndHead;
2535 staged_text = Some(staged_buffer.text());
2536 }
2537 } else {
2538 mode = Mode::IndexAndHead;
2539 staged_text = None;
2540 }
2541 } else {
2542 mode = Mode::IndexAndHead;
2543 committed_text = None;
2544 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2545 }
2546
2547 proto::OpenUncommittedChangesResponse {
2548 committed_text,
2549 staged_text,
2550 mode: mode.into(),
2551 }
2552 })
2553 }
2554
2555 pub async fn handle_update_diff_bases(
2556 this: Entity<Self>,
2557 request: TypedEnvelope<proto::UpdateDiffBases>,
2558 mut cx: AsyncApp,
2559 ) -> Result<()> {
2560 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2561 this.update(&mut cx, |this, cx| {
2562 if let Some(OpenBuffer::Complete {
2563 change_set_state,
2564 buffer,
2565 }) = this.opened_buffers.get_mut(&buffer_id)
2566 {
2567 if let Some(buffer) = buffer.upgrade() {
2568 let buffer = buffer.read(cx).text_snapshot();
2569 change_set_state.update(cx, |change_set_state, cx| {
2570 change_set_state.handle_base_texts_updated(buffer, request.payload, cx);
2571 })
2572 }
2573 }
2574 })
2575 }
2576
2577 pub fn reload_buffers(
2578 &self,
2579 buffers: HashSet<Entity<Buffer>>,
2580 push_to_history: bool,
2581 cx: &mut Context<Self>,
2582 ) -> Task<Result<ProjectTransaction>> {
2583 if buffers.is_empty() {
2584 return Task::ready(Ok(ProjectTransaction::default()));
2585 }
2586 match &self.state {
2587 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2588 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2589 }
2590 }
2591
2592 async fn handle_reload_buffers(
2593 this: Entity<Self>,
2594 envelope: TypedEnvelope<proto::ReloadBuffers>,
2595 mut cx: AsyncApp,
2596 ) -> Result<proto::ReloadBuffersResponse> {
2597 let sender_id = envelope.original_sender_id().unwrap_or_default();
2598 let reload = this.update(&mut cx, |this, cx| {
2599 let mut buffers = HashSet::default();
2600 for buffer_id in &envelope.payload.buffer_ids {
2601 let buffer_id = BufferId::new(*buffer_id)?;
2602 buffers.insert(this.get_existing(buffer_id)?);
2603 }
2604 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2605 })??;
2606
2607 let project_transaction = reload.await?;
2608 let project_transaction = this.update(&mut cx, |this, cx| {
2609 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2610 })?;
2611 Ok(proto::ReloadBuffersResponse {
2612 transaction: Some(project_transaction),
2613 })
2614 }
2615
2616 pub fn create_buffer_for_peer(
2617 &mut self,
2618 buffer: &Entity<Buffer>,
2619 peer_id: proto::PeerId,
2620 cx: &mut Context<Self>,
2621 ) -> Task<Result<()>> {
2622 let buffer_id = buffer.read(cx).remote_id();
2623 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2624 if shared_buffers.contains_key(&buffer_id) {
2625 return Task::ready(Ok(()));
2626 }
2627 shared_buffers.insert(
2628 buffer_id,
2629 SharedBuffer {
2630 buffer: buffer.clone(),
2631 change_set: None,
2632 lsp_handle: None,
2633 },
2634 );
2635
2636 let Some((client, project_id)) = self.downstream_client.clone() else {
2637 return Task::ready(Ok(()));
2638 };
2639
2640 cx.spawn(|this, mut cx| async move {
2641 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2642 return anyhow::Ok(());
2643 };
2644
2645 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2646 let operations = operations.await;
2647 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2648
2649 let initial_state = proto::CreateBufferForPeer {
2650 project_id,
2651 peer_id: Some(peer_id),
2652 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2653 };
2654
2655 if client.send(initial_state).log_err().is_some() {
2656 let client = client.clone();
2657 cx.background_executor()
2658 .spawn(async move {
2659 let mut chunks = split_operations(operations).peekable();
2660 while let Some(chunk) = chunks.next() {
2661 let is_last = chunks.peek().is_none();
2662 client.send(proto::CreateBufferForPeer {
2663 project_id,
2664 peer_id: Some(peer_id),
2665 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2666 proto::BufferChunk {
2667 buffer_id: buffer_id.into(),
2668 operations: chunk,
2669 is_last,
2670 },
2671 )),
2672 })?;
2673 }
2674 anyhow::Ok(())
2675 })
2676 .await
2677 .log_err();
2678 }
2679 Ok(())
2680 })
2681 }
2682
2683 pub fn forget_shared_buffers(&mut self) {
2684 self.shared_buffers.clear();
2685 }
2686
2687 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2688 self.shared_buffers.remove(peer_id);
2689 }
2690
2691 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2692 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2693 self.shared_buffers.insert(new_peer_id, buffers);
2694 }
2695 }
2696
2697 pub fn has_shared_buffers(&self) -> bool {
2698 !self.shared_buffers.is_empty()
2699 }
2700
2701 pub fn create_local_buffer(
2702 &mut self,
2703 text: &str,
2704 language: Option<Arc<Language>>,
2705 cx: &mut Context<Self>,
2706 ) -> Entity<Buffer> {
2707 let buffer = cx.new(|cx| {
2708 Buffer::local(text, cx)
2709 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2710 });
2711
2712 self.add_buffer(buffer.clone(), cx).log_err();
2713 let buffer_id = buffer.read(cx).remote_id();
2714
2715 let this = self
2716 .as_local_mut()
2717 .expect("local-only method called in a non-local context");
2718 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2719 this.local_buffer_ids_by_path.insert(
2720 ProjectPath {
2721 worktree_id: file.worktree_id(cx),
2722 path: file.path.clone(),
2723 },
2724 buffer_id,
2725 );
2726
2727 if let Some(entry_id) = file.entry_id {
2728 this.local_buffer_ids_by_entry_id
2729 .insert(entry_id, buffer_id);
2730 }
2731 }
2732 buffer
2733 }
2734
2735 pub fn deserialize_project_transaction(
2736 &mut self,
2737 message: proto::ProjectTransaction,
2738 push_to_history: bool,
2739 cx: &mut Context<Self>,
2740 ) -> Task<Result<ProjectTransaction>> {
2741 if let Some(this) = self.as_remote_mut() {
2742 this.deserialize_project_transaction(message, push_to_history, cx)
2743 } else {
2744 debug_panic!("not a remote buffer store");
2745 Task::ready(Err(anyhow!("not a remote buffer store")))
2746 }
2747 }
2748
2749 pub fn wait_for_remote_buffer(
2750 &mut self,
2751 id: BufferId,
2752 cx: &mut Context<BufferStore>,
2753 ) -> Task<Result<Entity<Buffer>>> {
2754 if let Some(this) = self.as_remote_mut() {
2755 this.wait_for_remote_buffer(id, cx)
2756 } else {
2757 debug_panic!("not a remote buffer store");
2758 Task::ready(Err(anyhow!("not a remote buffer store")))
2759 }
2760 }
2761
2762 pub fn serialize_project_transaction_for_peer(
2763 &mut self,
2764 project_transaction: ProjectTransaction,
2765 peer_id: proto::PeerId,
2766 cx: &mut Context<Self>,
2767 ) -> proto::ProjectTransaction {
2768 let mut serialized_transaction = proto::ProjectTransaction {
2769 buffer_ids: Default::default(),
2770 transactions: Default::default(),
2771 };
2772 for (buffer, transaction) in project_transaction.0 {
2773 self.create_buffer_for_peer(&buffer, peer_id, cx)
2774 .detach_and_log_err(cx);
2775 serialized_transaction
2776 .buffer_ids
2777 .push(buffer.read(cx).remote_id().into());
2778 serialized_transaction
2779 .transactions
2780 .push(language::proto::serialize_transaction(&transaction));
2781 }
2782 serialized_transaction
2783 }
2784}
2785
2786impl EventEmitter<BufferChangeSetEvent> for BufferChangeSet {}
2787
2788impl BufferChangeSet {
2789 fn set_state(
2790 &mut self,
2791 base_text: Option<language::BufferSnapshot>,
2792 diff: BufferDiff,
2793 buffer: &text::BufferSnapshot,
2794 cx: &mut Context<Self>,
2795 ) {
2796 if let Some(base_text) = base_text.as_ref() {
2797 let changed_range = if Some(base_text.remote_id())
2798 != self.base_text.as_ref().map(|buffer| buffer.remote_id())
2799 {
2800 Some(text::Anchor::MIN..text::Anchor::MAX)
2801 } else {
2802 diff.compare(&self.diff_to_buffer, buffer)
2803 };
2804 if let Some(changed_range) = changed_range {
2805 cx.emit(BufferChangeSetEvent::DiffChanged { changed_range });
2806 }
2807 }
2808 self.base_text = base_text;
2809 self.diff_to_buffer = diff;
2810 }
2811
2812 pub fn diff_hunks_intersecting_range<'a>(
2813 &'a self,
2814 range: Range<text::Anchor>,
2815 buffer_snapshot: &'a text::BufferSnapshot,
2816 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2817 self.diff_to_buffer
2818 .hunks_intersecting_range(range, buffer_snapshot)
2819 }
2820
2821 pub fn diff_hunks_intersecting_range_rev<'a>(
2822 &'a self,
2823 range: Range<text::Anchor>,
2824 buffer_snapshot: &'a text::BufferSnapshot,
2825 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2826 self.diff_to_buffer
2827 .hunks_intersecting_range_rev(range, buffer_snapshot)
2828 }
2829
2830 /// Used in cases where the change set isn't derived from git.
2831 pub fn set_base_text(
2832 &mut self,
2833 base_buffer: Entity<language::Buffer>,
2834 buffer: text::BufferSnapshot,
2835 cx: &mut Context<Self>,
2836 ) -> oneshot::Receiver<()> {
2837 let (tx, rx) = oneshot::channel();
2838 let this = cx.weak_entity();
2839 let base_buffer = base_buffer.read(cx).snapshot();
2840 cx.spawn(|_, mut cx| async move {
2841 let diff = cx
2842 .background_executor()
2843 .spawn({
2844 let base_buffer = base_buffer.clone();
2845 let buffer = buffer.clone();
2846 async move { BufferDiff::build(Some(&base_buffer.text()), &buffer) }
2847 })
2848 .await;
2849 let Some(this) = this.upgrade() else {
2850 tx.send(()).ok();
2851 return;
2852 };
2853 this.update(&mut cx, |this, cx| {
2854 this.set_state(Some(base_buffer), diff, &buffer, cx);
2855 })
2856 .log_err();
2857 tx.send(()).ok();
2858 })
2859 .detach();
2860 rx
2861 }
2862
2863 #[cfg(any(test, feature = "test-support"))]
2864 pub fn base_text_string(&self) -> Option<String> {
2865 self.base_text.as_ref().map(|buffer| buffer.text())
2866 }
2867
2868 pub fn new(buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2869 BufferChangeSet {
2870 buffer_id: buffer.read(cx).remote_id(),
2871 base_text: None,
2872 diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
2873 unstaged_change_set: None,
2874 }
2875 }
2876
2877 #[cfg(any(test, feature = "test-support"))]
2878 pub fn new_with_base_text(base_text: &str, buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2879 let mut base_text = base_text.to_owned();
2880 text::LineEnding::normalize(&mut base_text);
2881 let diff_to_buffer = BufferDiff::build(Some(&base_text), &buffer.read(cx).text_snapshot());
2882 let base_text = language::Buffer::build_snapshot_sync(base_text.into(), None, None, cx);
2883 BufferChangeSet {
2884 buffer_id: buffer.read(cx).remote_id(),
2885 base_text: Some(base_text),
2886 diff_to_buffer,
2887 unstaged_change_set: None,
2888 }
2889 }
2890
2891 #[cfg(any(test, feature = "test-support"))]
2892 pub fn recalculate_diff_sync(
2893 &mut self,
2894 snapshot: text::BufferSnapshot,
2895 cx: &mut Context<Self>,
2896 ) {
2897 let mut base_text = self.base_text.as_ref().map(|buffer| buffer.text());
2898 if let Some(base_text) = base_text.as_mut() {
2899 text::LineEnding::normalize(base_text);
2900 }
2901 let diff_to_buffer = BufferDiff::build(base_text.as_deref(), &snapshot);
2902 self.set_state(self.base_text.clone(), diff_to_buffer, &snapshot, cx);
2903 }
2904}
2905
2906impl OpenBuffer {
2907 fn upgrade(&self) -> Option<Entity<Buffer>> {
2908 match self {
2909 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2910 OpenBuffer::Operations(_) => None,
2911 }
2912 }
2913}
2914
2915fn is_not_found_error(error: &anyhow::Error) -> bool {
2916 error
2917 .root_cause()
2918 .downcast_ref::<io::Error>()
2919 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2920}
2921
2922fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2923 let Some(blame) = blame else {
2924 return proto::BlameBufferResponse {
2925 blame_response: None,
2926 };
2927 };
2928
2929 let entries = blame
2930 .entries
2931 .into_iter()
2932 .map(|entry| proto::BlameEntry {
2933 sha: entry.sha.as_bytes().into(),
2934 start_line: entry.range.start,
2935 end_line: entry.range.end,
2936 original_line_number: entry.original_line_number,
2937 author: entry.author.clone(),
2938 author_mail: entry.author_mail.clone(),
2939 author_time: entry.author_time,
2940 author_tz: entry.author_tz.clone(),
2941 committer: entry.committer.clone(),
2942 committer_mail: entry.committer_mail.clone(),
2943 committer_time: entry.committer_time,
2944 committer_tz: entry.committer_tz.clone(),
2945 summary: entry.summary.clone(),
2946 previous: entry.previous.clone(),
2947 filename: entry.filename.clone(),
2948 })
2949 .collect::<Vec<_>>();
2950
2951 let messages = blame
2952 .messages
2953 .into_iter()
2954 .map(|(oid, message)| proto::CommitMessage {
2955 oid: oid.as_bytes().into(),
2956 message,
2957 })
2958 .collect::<Vec<_>>();
2959
2960 let permalinks = blame
2961 .permalinks
2962 .into_iter()
2963 .map(|(oid, url)| proto::CommitPermalink {
2964 oid: oid.as_bytes().into(),
2965 permalink: url.to_string(),
2966 })
2967 .collect::<Vec<_>>();
2968
2969 proto::BlameBufferResponse {
2970 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2971 entries,
2972 messages,
2973 permalinks,
2974 remote_url: blame.remote_url,
2975 }),
2976 }
2977}
2978
2979fn deserialize_blame_buffer_response(
2980 response: proto::BlameBufferResponse,
2981) -> Option<git::blame::Blame> {
2982 let response = response.blame_response?;
2983 let entries = response
2984 .entries
2985 .into_iter()
2986 .filter_map(|entry| {
2987 Some(git::blame::BlameEntry {
2988 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2989 range: entry.start_line..entry.end_line,
2990 original_line_number: entry.original_line_number,
2991 committer: entry.committer,
2992 committer_time: entry.committer_time,
2993 committer_tz: entry.committer_tz,
2994 committer_mail: entry.committer_mail,
2995 author: entry.author,
2996 author_mail: entry.author_mail,
2997 author_time: entry.author_time,
2998 author_tz: entry.author_tz,
2999 summary: entry.summary,
3000 previous: entry.previous,
3001 filename: entry.filename,
3002 })
3003 })
3004 .collect::<Vec<_>>();
3005
3006 let messages = response
3007 .messages
3008 .into_iter()
3009 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
3010 .collect::<HashMap<_, _>>();
3011
3012 let permalinks = response
3013 .permalinks
3014 .into_iter()
3015 .filter_map(|permalink| {
3016 Some((
3017 git::Oid::from_bytes(&permalink.oid).ok()?,
3018 Url::from_str(&permalink.permalink).ok()?,
3019 ))
3020 })
3021 .collect::<HashMap<_, _>>();
3022
3023 Some(Blame {
3024 entries,
3025 permalinks,
3026 messages,
3027 remote_url: response.remote_url,
3028 })
3029}
3030
3031fn get_permalink_in_rust_registry_src(
3032 provider_registry: Arc<GitHostingProviderRegistry>,
3033 path: PathBuf,
3034 selection: Range<u32>,
3035) -> Result<url::Url> {
3036 #[derive(Deserialize)]
3037 struct CargoVcsGit {
3038 sha1: String,
3039 }
3040
3041 #[derive(Deserialize)]
3042 struct CargoVcsInfo {
3043 git: CargoVcsGit,
3044 path_in_vcs: String,
3045 }
3046
3047 #[derive(Deserialize)]
3048 struct CargoPackage {
3049 repository: String,
3050 }
3051
3052 #[derive(Deserialize)]
3053 struct CargoToml {
3054 package: CargoPackage,
3055 }
3056
3057 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
3058 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
3059 Some((dir, json))
3060 }) else {
3061 bail!("No .cargo_vcs_info.json found in parent directories")
3062 };
3063 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
3064 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
3065 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
3066 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
3067 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
3068 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
3069 let permalink = provider.build_permalink(
3070 remote,
3071 BuildPermalinkParams {
3072 sha: &cargo_vcs_info.git.sha1,
3073 path: &path.to_string_lossy(),
3074 selection: Some(selection),
3075 },
3076 );
3077 Ok(permalink)
3078}