1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{
13 channel::oneshot,
14 future::{OptionFuture, Shared},
15 Future, FutureExt as _, StreamExt,
16};
17use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
18use gpui::{
19 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
20};
21use http_client::Url;
22use language::{
23 proto::{
24 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
25 split_operations,
26 },
27 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
28};
29use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::{BufferId, Rope};
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum ChangeSetKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_change_sets: HashMap<
58 (BufferId, ChangeSetKind),
59 Shared<Task<Result<Entity<BufferChangeSet>, Arc<anyhow::Error>>>>,
60 >,
61 worktree_store: Entity<WorktreeStore>,
62 opened_buffers: HashMap<BufferId, OpenBuffer>,
63 downstream_client: Option<(AnyProtoClient, u64)>,
64 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
65}
66
67#[derive(Hash, Eq, PartialEq, Clone)]
68struct SharedBuffer {
69 buffer: Entity<Buffer>,
70 change_set: Option<Entity<BufferChangeSet>>,
71 lsp_handle: Option<OpenLspBufferHandle>,
72}
73
74#[derive(Default)]
75struct BufferChangeSetState {
76 unstaged_changes: Option<WeakEntity<BufferChangeSet>>,
77 uncommitted_changes: Option<WeakEntity<BufferChangeSet>>,
78 recalculate_diff_task: Option<Task<Result<()>>>,
79 language: Option<Arc<Language>>,
80 language_registry: Option<Arc<LanguageRegistry>>,
81 diff_updated_futures: Vec<oneshot::Sender<()>>,
82 buffer_subscription: Option<Subscription>,
83
84 head_text: Option<Arc<String>>,
85 index_text: Option<Arc<String>>,
86 head_changed: bool,
87 index_changed: bool,
88}
89
90#[derive(Clone, Debug)]
91enum DiffBasesChange {
92 SetIndex(Option<String>),
93 SetHead(Option<String>),
94 SetEach {
95 index: Option<String>,
96 head: Option<String>,
97 },
98 SetBoth(Option<String>),
99}
100
101impl BufferChangeSetState {
102 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
103 self.language = buffer.read(cx).language().cloned();
104 self.index_changed = self.index_text.is_some();
105 self.head_changed = self.head_text.is_some();
106 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
107 }
108
109 fn unstaged_changes(&self) -> Option<Entity<BufferChangeSet>> {
110 self.unstaged_changes.as_ref().and_then(|set| set.upgrade())
111 }
112
113 fn uncommitted_changes(&self) -> Option<Entity<BufferChangeSet>> {
114 self.uncommitted_changes
115 .as_ref()
116 .and_then(|set| set.upgrade())
117 }
118
119 fn handle_base_texts_updated(
120 &mut self,
121 buffer: text::BufferSnapshot,
122 message: proto::UpdateDiffBases,
123 cx: &mut Context<Self>,
124 ) {
125 use proto::update_diff_bases::Mode;
126
127 let Some(mode) = Mode::from_i32(message.mode) else {
128 return;
129 };
130
131 let diff_bases_change = match mode {
132 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
133 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
134 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.staged_text),
135 Mode::IndexAndHead => DiffBasesChange::SetEach {
136 index: message.staged_text,
137 head: message.committed_text,
138 },
139 };
140
141 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
142 }
143
144 fn diff_bases_changed(
145 &mut self,
146 buffer: text::BufferSnapshot,
147 diff_bases_change: DiffBasesChange,
148 cx: &mut Context<Self>,
149 ) -> oneshot::Receiver<()> {
150 match diff_bases_change {
151 DiffBasesChange::SetIndex(index) => {
152 self.index_text = index.map(|mut text| {
153 text::LineEnding::normalize(&mut text);
154 Arc::new(text)
155 });
156 self.index_changed = true;
157 }
158 DiffBasesChange::SetHead(head) => {
159 self.head_text = head.map(|mut text| {
160 text::LineEnding::normalize(&mut text);
161 Arc::new(text)
162 });
163 self.head_changed = true;
164 }
165 DiffBasesChange::SetBoth(mut text) => {
166 if let Some(text) = text.as_mut() {
167 text::LineEnding::normalize(text);
168 }
169 self.head_text = text.map(Arc::new);
170 self.index_text = self.head_text.clone();
171 self.head_changed = true;
172 self.index_changed = true;
173 }
174 DiffBasesChange::SetEach { index, head } => {
175 self.index_text = index.map(|mut text| {
176 text::LineEnding::normalize(&mut text);
177 Arc::new(text)
178 });
179 self.head_text = head.map(|mut text| {
180 text::LineEnding::normalize(&mut text);
181 Arc::new(text)
182 });
183 self.head_changed = true;
184 self.index_changed = true;
185 }
186 }
187
188 self.recalculate_diffs(buffer, cx)
189 }
190
191 fn recalculate_diffs(
192 &mut self,
193 buffer: text::BufferSnapshot,
194 cx: &mut Context<Self>,
195 ) -> oneshot::Receiver<()> {
196 let (tx, rx) = oneshot::channel();
197 self.diff_updated_futures.push(tx);
198
199 let language = self.language.clone();
200 let language_registry = self.language_registry.clone();
201 let unstaged_changes = self.unstaged_changes();
202 let uncommitted_changes = self.uncommitted_changes();
203 let head = self.head_text.clone();
204 let index = self.index_text.clone();
205 let index_changed = self.index_changed;
206 let head_changed = self.head_changed;
207 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
208 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
209 (None, None) => true,
210 _ => false,
211 };
212 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
213 let snapshot = if index_changed {
214 let snapshot = cx.update(|cx| {
215 index.as_ref().map(|head| {
216 language::Buffer::build_snapshot(
217 Rope::from(head.as_str()),
218 language.clone(),
219 language_registry.clone(),
220 cx,
221 )
222 })
223 })?;
224 cx.background_executor()
225 .spawn(OptionFuture::from(snapshot))
226 .await
227 } else if let Some(unstaged_changes) = &unstaged_changes {
228 unstaged_changes.read_with(&cx, |change_set, _| change_set.base_text.clone())?
229 } else if let Some(uncommitted_changes) = &uncommitted_changes {
230 uncommitted_changes
231 .read_with(&cx, |change_set, _| change_set.staged_text.clone())?
232 } else {
233 return Ok(());
234 };
235
236 if let Some(unstaged_changes) = &unstaged_changes {
237 let diff = cx
238 .background_executor()
239 .spawn({
240 let buffer = buffer.clone();
241 async move {
242 BufferDiff::build(index.as_ref().map(|index| index.as_str()), &buffer)
243 }
244 })
245 .await;
246
247 unstaged_changes.update(&mut cx, |unstaged_changes, cx| {
248 unstaged_changes.set_state(snapshot.clone(), diff, &buffer, cx);
249 })?;
250
251 if let Some(uncommitted_changes) = &uncommitted_changes {
252 uncommitted_changes.update(&mut cx, |uncommitted_changes, _| {
253 uncommitted_changes.staged_text = snapshot;
254 })?;
255 }
256 }
257
258 if let Some(uncommitted_changes) = &uncommitted_changes {
259 let (snapshot, diff) = if let (Some(unstaged_changes), true) =
260 (&unstaged_changes, index_matches_head)
261 {
262 unstaged_changes.read_with(&cx, |change_set, _| {
263 (
264 change_set.base_text.clone(),
265 change_set.diff_to_buffer.clone(),
266 )
267 })?
268 } else {
269 let snapshot = cx.update(|cx| {
270 head.as_deref().map(|head| {
271 language::Buffer::build_snapshot(
272 Rope::from(head.as_str()),
273 language.clone(),
274 language_registry.clone(),
275 cx,
276 )
277 })
278 })?;
279 let snapshot = cx.background_executor().spawn(OptionFuture::from(snapshot));
280 let diff = cx.background_executor().spawn({
281 let buffer = buffer.clone();
282 let head = head.clone();
283 async move {
284 BufferDiff::build(head.as_ref().map(|head| head.as_str()), &buffer)
285 }
286 });
287 futures::join!(snapshot, diff)
288 };
289
290 uncommitted_changes.update(&mut cx, |change_set, cx| {
291 change_set.set_state(snapshot, diff, &buffer, cx);
292 })?;
293
294 if index_changed || head_changed {
295 let staged_text = uncommitted_changes
296 .read_with(&cx, |change_set, _| change_set.staged_text.clone())?;
297
298 let diff = if index_matches_head {
299 staged_text.as_ref().map(|buffer| BufferDiff::new(buffer))
300 } else if let Some(staged_text) = staged_text {
301 Some(
302 cx.background_executor()
303 .spawn(async move {
304 BufferDiff::build(
305 head.as_ref().map(|head| head.as_str()),
306 &staged_text,
307 )
308 })
309 .await,
310 )
311 } else {
312 None
313 };
314
315 uncommitted_changes.update(&mut cx, |change_set, _| {
316 change_set.staged_diff = diff;
317 })?;
318 }
319 }
320
321 if let Some(this) = this.upgrade() {
322 this.update(&mut cx, |this, _| {
323 this.index_changed = false;
324 this.head_changed = false;
325 for tx in this.diff_updated_futures.drain(..) {
326 tx.send(()).ok();
327 }
328 })?;
329 }
330
331 Ok(())
332 }));
333
334 rx
335 }
336}
337
338pub struct BufferChangeSet {
339 pub buffer_id: BufferId,
340 pub base_text: Option<language::BufferSnapshot>,
341 pub diff_to_buffer: BufferDiff,
342 pub staged_text: Option<language::BufferSnapshot>,
343 // For an uncommitted changeset, this is the diff between HEAD and the index.
344 pub staged_diff: Option<BufferDiff>,
345}
346
347impl std::fmt::Debug for BufferChangeSet {
348 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349 f.debug_struct("BufferChangeSet")
350 .field("buffer_id", &self.buffer_id)
351 .field("base_text", &self.base_text.as_ref().map(|s| s.text()))
352 .field("diff_to_buffer", &self.diff_to_buffer)
353 .field("staged_text", &self.staged_text.as_ref().map(|s| s.text()))
354 .field("staged_diff", &self.staged_diff)
355 .finish()
356 }
357}
358
359pub enum BufferChangeSetEvent {
360 DiffChanged { changed_range: Range<text::Anchor> },
361}
362
363enum BufferStoreState {
364 Local(LocalBufferStore),
365 Remote(RemoteBufferStore),
366}
367
368struct RemoteBufferStore {
369 shared_with_me: HashSet<Entity<Buffer>>,
370 upstream_client: AnyProtoClient,
371 project_id: u64,
372 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
373 remote_buffer_listeners:
374 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
375 worktree_store: Entity<WorktreeStore>,
376}
377
378struct LocalBufferStore {
379 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
380 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
381 worktree_store: Entity<WorktreeStore>,
382 _subscription: Subscription,
383}
384
385enum OpenBuffer {
386 Complete {
387 buffer: WeakEntity<Buffer>,
388 change_set_state: Entity<BufferChangeSetState>,
389 },
390 Operations(Vec<Operation>),
391}
392
393pub enum BufferStoreEvent {
394 BufferAdded(Entity<Buffer>),
395 BufferDropped(BufferId),
396 BufferChangedFilePath {
397 buffer: Entity<Buffer>,
398 old_file: Option<Arc<dyn language::File>>,
399 },
400}
401
402#[derive(Default, Debug)]
403pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
404
405impl EventEmitter<BufferStoreEvent> for BufferStore {}
406
407impl RemoteBufferStore {
408 fn open_unstaged_changes(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
409 let project_id = self.project_id;
410 let client = self.upstream_client.clone();
411 cx.background_executor().spawn(async move {
412 let response = client
413 .request(proto::OpenUnstagedChanges {
414 project_id,
415 buffer_id: buffer_id.to_proto(),
416 })
417 .await?;
418 Ok(response.staged_text)
419 })
420 }
421
422 fn open_uncommitted_changes(
423 &self,
424 buffer_id: BufferId,
425 cx: &App,
426 ) -> Task<Result<DiffBasesChange>> {
427 use proto::open_uncommitted_changes_response::Mode;
428
429 let project_id = self.project_id;
430 let client = self.upstream_client.clone();
431 cx.background_executor().spawn(async move {
432 let response = client
433 .request(proto::OpenUncommittedChanges {
434 project_id,
435 buffer_id: buffer_id.to_proto(),
436 })
437 .await?;
438 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
439 let bases = match mode {
440 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.staged_text),
441 Mode::IndexAndHead => DiffBasesChange::SetEach {
442 head: response.committed_text,
443 index: response.staged_text,
444 },
445 };
446 Ok(bases)
447 })
448 }
449
450 pub fn wait_for_remote_buffer(
451 &mut self,
452 id: BufferId,
453 cx: &mut Context<BufferStore>,
454 ) -> Task<Result<Entity<Buffer>>> {
455 let (tx, rx) = oneshot::channel();
456 self.remote_buffer_listeners.entry(id).or_default().push(tx);
457
458 cx.spawn(|this, cx| async move {
459 if let Some(buffer) = this
460 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
461 .ok()
462 .flatten()
463 {
464 return Ok(buffer);
465 }
466
467 cx.background_executor()
468 .spawn(async move { rx.await? })
469 .await
470 })
471 }
472
473 fn save_remote_buffer(
474 &self,
475 buffer_handle: Entity<Buffer>,
476 new_path: Option<proto::ProjectPath>,
477 cx: &Context<BufferStore>,
478 ) -> Task<Result<()>> {
479 let buffer = buffer_handle.read(cx);
480 let buffer_id = buffer.remote_id().into();
481 let version = buffer.version();
482 let rpc = self.upstream_client.clone();
483 let project_id = self.project_id;
484 cx.spawn(move |_, mut cx| async move {
485 let response = rpc
486 .request(proto::SaveBuffer {
487 project_id,
488 buffer_id,
489 new_path,
490 version: serialize_version(&version),
491 })
492 .await?;
493 let version = deserialize_version(&response.version);
494 let mtime = response.mtime.map(|mtime| mtime.into());
495
496 buffer_handle.update(&mut cx, |buffer, cx| {
497 buffer.did_save(version.clone(), mtime, cx);
498 })?;
499
500 Ok(())
501 })
502 }
503
504 pub fn handle_create_buffer_for_peer(
505 &mut self,
506 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
507 replica_id: u16,
508 capability: Capability,
509 cx: &mut Context<BufferStore>,
510 ) -> Result<Option<Entity<Buffer>>> {
511 match envelope
512 .payload
513 .variant
514 .ok_or_else(|| anyhow!("missing variant"))?
515 {
516 proto::create_buffer_for_peer::Variant::State(mut state) => {
517 let buffer_id = BufferId::new(state.id)?;
518
519 let buffer_result = maybe!({
520 let mut buffer_file = None;
521 if let Some(file) = state.file.take() {
522 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
523 let worktree = self
524 .worktree_store
525 .read(cx)
526 .worktree_for_id(worktree_id, cx)
527 .ok_or_else(|| {
528 anyhow!("no worktree found for id {}", file.worktree_id)
529 })?;
530 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
531 as Arc<dyn language::File>);
532 }
533 Buffer::from_proto(replica_id, capability, state, buffer_file)
534 });
535
536 match buffer_result {
537 Ok(buffer) => {
538 let buffer = cx.new(|_| buffer);
539 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
540 }
541 Err(error) => {
542 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
543 for listener in listeners {
544 listener.send(Err(anyhow!(error.cloned()))).ok();
545 }
546 }
547 }
548 }
549 }
550 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
551 let buffer_id = BufferId::new(chunk.buffer_id)?;
552 let buffer = self
553 .loading_remote_buffers_by_id
554 .get(&buffer_id)
555 .cloned()
556 .ok_or_else(|| {
557 anyhow!(
558 "received chunk for buffer {} without initial state",
559 chunk.buffer_id
560 )
561 })?;
562
563 let result = maybe!({
564 let operations = chunk
565 .operations
566 .into_iter()
567 .map(language::proto::deserialize_operation)
568 .collect::<Result<Vec<_>>>()?;
569 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
570 anyhow::Ok(())
571 });
572
573 if let Err(error) = result {
574 self.loading_remote_buffers_by_id.remove(&buffer_id);
575 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
576 for listener in listeners {
577 listener.send(Err(error.cloned())).ok();
578 }
579 }
580 } else if chunk.is_last {
581 self.loading_remote_buffers_by_id.remove(&buffer_id);
582 if self.upstream_client.is_via_collab() {
583 // retain buffers sent by peers to avoid races.
584 self.shared_with_me.insert(buffer.clone());
585 }
586
587 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
588 for sender in senders {
589 sender.send(Ok(buffer.clone())).ok();
590 }
591 }
592 return Ok(Some(buffer));
593 }
594 }
595 }
596 return Ok(None);
597 }
598
599 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
600 self.loading_remote_buffers_by_id
601 .keys()
602 .copied()
603 .collect::<Vec<_>>()
604 }
605
606 pub fn deserialize_project_transaction(
607 &self,
608 message: proto::ProjectTransaction,
609 push_to_history: bool,
610 cx: &mut Context<BufferStore>,
611 ) -> Task<Result<ProjectTransaction>> {
612 cx.spawn(|this, mut cx| async move {
613 let mut project_transaction = ProjectTransaction::default();
614 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
615 {
616 let buffer_id = BufferId::new(buffer_id)?;
617 let buffer = this
618 .update(&mut cx, |this, cx| {
619 this.wait_for_remote_buffer(buffer_id, cx)
620 })?
621 .await?;
622 let transaction = language::proto::deserialize_transaction(transaction)?;
623 project_transaction.0.insert(buffer, transaction);
624 }
625
626 for (buffer, transaction) in &project_transaction.0 {
627 buffer
628 .update(&mut cx, |buffer, _| {
629 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
630 })?
631 .await?;
632
633 if push_to_history {
634 buffer.update(&mut cx, |buffer, _| {
635 buffer.push_transaction(transaction.clone(), Instant::now());
636 })?;
637 }
638 }
639
640 Ok(project_transaction)
641 })
642 }
643
644 fn open_buffer(
645 &self,
646 path: Arc<Path>,
647 worktree: Entity<Worktree>,
648 cx: &mut Context<BufferStore>,
649 ) -> Task<Result<Entity<Buffer>>> {
650 let worktree_id = worktree.read(cx).id().to_proto();
651 let project_id = self.project_id;
652 let client = self.upstream_client.clone();
653 let path_string = path.clone().to_string_lossy().to_string();
654 cx.spawn(move |this, mut cx| async move {
655 let response = client
656 .request(proto::OpenBufferByPath {
657 project_id,
658 worktree_id,
659 path: path_string,
660 })
661 .await?;
662 let buffer_id = BufferId::new(response.buffer_id)?;
663
664 let buffer = this
665 .update(&mut cx, {
666 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
667 })?
668 .await?;
669
670 Ok(buffer)
671 })
672 }
673
674 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
675 let create = self.upstream_client.request(proto::OpenNewBuffer {
676 project_id: self.project_id,
677 });
678 cx.spawn(|this, mut cx| async move {
679 let response = create.await?;
680 let buffer_id = BufferId::new(response.buffer_id)?;
681
682 this.update(&mut cx, |this, cx| {
683 this.wait_for_remote_buffer(buffer_id, cx)
684 })?
685 .await
686 })
687 }
688
689 fn reload_buffers(
690 &self,
691 buffers: HashSet<Entity<Buffer>>,
692 push_to_history: bool,
693 cx: &mut Context<BufferStore>,
694 ) -> Task<Result<ProjectTransaction>> {
695 let request = self.upstream_client.request(proto::ReloadBuffers {
696 project_id: self.project_id,
697 buffer_ids: buffers
698 .iter()
699 .map(|buffer| buffer.read(cx).remote_id().to_proto())
700 .collect(),
701 });
702
703 cx.spawn(|this, mut cx| async move {
704 let response = request
705 .await?
706 .transaction
707 .ok_or_else(|| anyhow!("missing transaction"))?;
708 this.update(&mut cx, |this, cx| {
709 this.deserialize_project_transaction(response, push_to_history, cx)
710 })?
711 .await
712 })
713 }
714}
715
716impl LocalBufferStore {
717 fn worktree_for_buffer(
718 &self,
719 buffer: &Entity<Buffer>,
720 cx: &App,
721 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
722 let file = buffer.read(cx).file()?;
723 let worktree_id = file.worktree_id(cx);
724 let path = file.path().clone();
725 let worktree = self
726 .worktree_store
727 .read(cx)
728 .worktree_for_id(worktree_id, cx)?;
729 Some((worktree, path))
730 }
731
732 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
733 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
734 worktree.read(cx).load_staged_file(path.as_ref(), cx)
735 } else {
736 return Task::ready(Err(anyhow!("no such worktree")));
737 }
738 }
739
740 fn load_committed_text(
741 &self,
742 buffer: &Entity<Buffer>,
743 cx: &App,
744 ) -> Task<Result<Option<String>>> {
745 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
746 worktree.read(cx).load_committed_file(path.as_ref(), cx)
747 } else {
748 Task::ready(Err(anyhow!("no such worktree")))
749 }
750 }
751
752 fn save_local_buffer(
753 &self,
754 buffer_handle: Entity<Buffer>,
755 worktree: Entity<Worktree>,
756 path: Arc<Path>,
757 mut has_changed_file: bool,
758 cx: &mut Context<BufferStore>,
759 ) -> Task<Result<()>> {
760 let buffer = buffer_handle.read(cx);
761
762 let text = buffer.as_rope().clone();
763 let line_ending = buffer.line_ending();
764 let version = buffer.version();
765 let buffer_id = buffer.remote_id();
766 if buffer
767 .file()
768 .is_some_and(|file| file.disk_state() == DiskState::New)
769 {
770 has_changed_file = true;
771 }
772
773 let save = worktree.update(cx, |worktree, cx| {
774 worktree.write_file(path.as_ref(), text, line_ending, cx)
775 });
776
777 cx.spawn(move |this, mut cx| async move {
778 let new_file = save.await?;
779 let mtime = new_file.disk_state().mtime();
780 this.update(&mut cx, |this, cx| {
781 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
782 if has_changed_file {
783 downstream_client
784 .send(proto::UpdateBufferFile {
785 project_id,
786 buffer_id: buffer_id.to_proto(),
787 file: Some(language::File::to_proto(&*new_file, cx)),
788 })
789 .log_err();
790 }
791 downstream_client
792 .send(proto::BufferSaved {
793 project_id,
794 buffer_id: buffer_id.to_proto(),
795 version: serialize_version(&version),
796 mtime: mtime.map(|time| time.into()),
797 })
798 .log_err();
799 }
800 })?;
801 buffer_handle.update(&mut cx, |buffer, cx| {
802 if has_changed_file {
803 buffer.file_updated(new_file, cx);
804 }
805 buffer.did_save(version.clone(), mtime, cx);
806 })
807 })
808 }
809
810 fn subscribe_to_worktree(
811 &mut self,
812 worktree: &Entity<Worktree>,
813 cx: &mut Context<BufferStore>,
814 ) {
815 cx.subscribe(worktree, |this, worktree, event, cx| {
816 if worktree.read(cx).is_local() {
817 match event {
818 worktree::Event::UpdatedEntries(changes) => {
819 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
820 }
821 worktree::Event::UpdatedGitRepositories(updated_repos) => {
822 Self::local_worktree_git_repos_changed(
823 this,
824 worktree.clone(),
825 updated_repos,
826 cx,
827 )
828 }
829 _ => {}
830 }
831 }
832 })
833 .detach();
834 }
835
836 fn local_worktree_entries_changed(
837 this: &mut BufferStore,
838 worktree_handle: &Entity<Worktree>,
839 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
840 cx: &mut Context<BufferStore>,
841 ) {
842 let snapshot = worktree_handle.read(cx).snapshot();
843 for (path, entry_id, _) in changes {
844 Self::local_worktree_entry_changed(
845 this,
846 *entry_id,
847 path,
848 worktree_handle,
849 &snapshot,
850 cx,
851 );
852 }
853 }
854
855 fn local_worktree_git_repos_changed(
856 this: &mut BufferStore,
857 worktree_handle: Entity<Worktree>,
858 changed_repos: &UpdatedGitRepositoriesSet,
859 cx: &mut Context<BufferStore>,
860 ) {
861 debug_assert!(worktree_handle.read(cx).is_local());
862
863 let mut change_set_state_updates = Vec::new();
864 for buffer in this.opened_buffers.values() {
865 let OpenBuffer::Complete {
866 buffer,
867 change_set_state,
868 } = buffer
869 else {
870 continue;
871 };
872 let Some(buffer) = buffer.upgrade() else {
873 continue;
874 };
875 let buffer = buffer.read(cx);
876 let Some(file) = File::from_dyn(buffer.file()) else {
877 continue;
878 };
879 if file.worktree != worktree_handle {
880 continue;
881 }
882 let change_set_state = change_set_state.read(cx);
883 if changed_repos
884 .iter()
885 .any(|(work_dir, _)| file.path.starts_with(work_dir))
886 {
887 let snapshot = buffer.text_snapshot();
888 change_set_state_updates.push((
889 snapshot.clone(),
890 file.path.clone(),
891 change_set_state
892 .unstaged_changes
893 .as_ref()
894 .and_then(|set| set.upgrade())
895 .is_some(),
896 change_set_state
897 .uncommitted_changes
898 .as_ref()
899 .and_then(|set| set.upgrade())
900 .is_some(),
901 ))
902 }
903 }
904
905 if change_set_state_updates.is_empty() {
906 return;
907 }
908
909 cx.spawn(move |this, mut cx| async move {
910 let snapshot =
911 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
912 let diff_bases_changes_by_buffer = cx
913 .background_executor()
914 .spawn(async move {
915 change_set_state_updates
916 .into_iter()
917 .filter_map(
918 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
919 let local_repo = snapshot.local_repo_for_path(&path)?;
920 let relative_path = local_repo.relativize(&path).ok()?;
921 let staged_text = if needs_staged_text {
922 local_repo.repo().load_index_text(&relative_path)
923 } else {
924 None
925 };
926 let committed_text = if needs_committed_text {
927 local_repo.repo().load_committed_text(&relative_path)
928 } else {
929 None
930 };
931 let diff_bases_change =
932 match (needs_staged_text, needs_committed_text) {
933 (true, true) => Some(if staged_text == committed_text {
934 DiffBasesChange::SetBoth(staged_text)
935 } else {
936 DiffBasesChange::SetEach {
937 index: staged_text,
938 head: committed_text,
939 }
940 }),
941 (true, false) => {
942 Some(DiffBasesChange::SetIndex(staged_text))
943 }
944 (false, true) => {
945 Some(DiffBasesChange::SetHead(committed_text))
946 }
947 (false, false) => None,
948 };
949 Some((buffer_snapshot, diff_bases_change))
950 },
951 )
952 .collect::<Vec<_>>()
953 })
954 .await;
955
956 this.update(&mut cx, |this, cx| {
957 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
958 let Some(OpenBuffer::Complete {
959 change_set_state, ..
960 }) = this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
961 else {
962 continue;
963 };
964 let Some(diff_bases_change) = diff_bases_change else {
965 continue;
966 };
967
968 change_set_state.update(cx, |change_set_state, cx| {
969 use proto::update_diff_bases::Mode;
970
971 if let Some((client, project_id)) = this.downstream_client.as_ref() {
972 let buffer_id = buffer_snapshot.remote_id().to_proto();
973 let (staged_text, committed_text, mode) = match diff_bases_change
974 .clone()
975 {
976 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
977 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
978 DiffBasesChange::SetEach { index, head } => {
979 (index, head, Mode::IndexAndHead)
980 }
981 DiffBasesChange::SetBoth(text) => {
982 (text, None, Mode::IndexMatchesHead)
983 }
984 };
985 let message = proto::UpdateDiffBases {
986 project_id: *project_id,
987 buffer_id,
988 staged_text,
989 committed_text,
990 mode: mode as i32,
991 };
992
993 client.send(message).log_err();
994 }
995
996 let _ = change_set_state.diff_bases_changed(
997 buffer_snapshot,
998 diff_bases_change,
999 cx,
1000 );
1001 });
1002 }
1003 })
1004 })
1005 .detach_and_log_err(cx);
1006 }
1007
1008 fn local_worktree_entry_changed(
1009 this: &mut BufferStore,
1010 entry_id: ProjectEntryId,
1011 path: &Arc<Path>,
1012 worktree: &Entity<worktree::Worktree>,
1013 snapshot: &worktree::Snapshot,
1014 cx: &mut Context<BufferStore>,
1015 ) -> Option<()> {
1016 let project_path = ProjectPath {
1017 worktree_id: snapshot.id(),
1018 path: path.clone(),
1019 };
1020
1021 let buffer_id = {
1022 let local = this.as_local_mut()?;
1023 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1024 Some(&buffer_id) => buffer_id,
1025 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
1026 }
1027 };
1028
1029 let buffer = if let Some(buffer) = this.get(buffer_id) {
1030 Some(buffer)
1031 } else {
1032 this.opened_buffers.remove(&buffer_id);
1033 None
1034 };
1035
1036 let buffer = if let Some(buffer) = buffer {
1037 buffer
1038 } else {
1039 let this = this.as_local_mut()?;
1040 this.local_buffer_ids_by_path.remove(&project_path);
1041 this.local_buffer_ids_by_entry_id.remove(&entry_id);
1042 return None;
1043 };
1044
1045 let events = buffer.update(cx, |buffer, cx| {
1046 let local = this.as_local_mut()?;
1047 let file = buffer.file()?;
1048 let old_file = File::from_dyn(Some(file))?;
1049 if old_file.worktree != *worktree {
1050 return None;
1051 }
1052
1053 let snapshot_entry = old_file
1054 .entry_id
1055 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
1056 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
1057
1058 let new_file = if let Some(entry) = snapshot_entry {
1059 File {
1060 disk_state: match entry.mtime {
1061 Some(mtime) => DiskState::Present { mtime },
1062 None => old_file.disk_state,
1063 },
1064 is_local: true,
1065 entry_id: Some(entry.id),
1066 path: entry.path.clone(),
1067 worktree: worktree.clone(),
1068 is_private: entry.is_private,
1069 }
1070 } else {
1071 File {
1072 disk_state: DiskState::Deleted,
1073 is_local: true,
1074 entry_id: old_file.entry_id,
1075 path: old_file.path.clone(),
1076 worktree: worktree.clone(),
1077 is_private: old_file.is_private,
1078 }
1079 };
1080
1081 if new_file == *old_file {
1082 return None;
1083 }
1084
1085 let mut events = Vec::new();
1086 if new_file.path != old_file.path {
1087 local.local_buffer_ids_by_path.remove(&ProjectPath {
1088 path: old_file.path.clone(),
1089 worktree_id: old_file.worktree_id(cx),
1090 });
1091 local.local_buffer_ids_by_path.insert(
1092 ProjectPath {
1093 worktree_id: new_file.worktree_id(cx),
1094 path: new_file.path.clone(),
1095 },
1096 buffer_id,
1097 );
1098 events.push(BufferStoreEvent::BufferChangedFilePath {
1099 buffer: cx.entity(),
1100 old_file: buffer.file().cloned(),
1101 });
1102 }
1103
1104 if new_file.entry_id != old_file.entry_id {
1105 if let Some(entry_id) = old_file.entry_id {
1106 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1107 }
1108 if let Some(entry_id) = new_file.entry_id {
1109 local
1110 .local_buffer_ids_by_entry_id
1111 .insert(entry_id, buffer_id);
1112 }
1113 }
1114
1115 if let Some((client, project_id)) = &this.downstream_client {
1116 client
1117 .send(proto::UpdateBufferFile {
1118 project_id: *project_id,
1119 buffer_id: buffer_id.to_proto(),
1120 file: Some(new_file.to_proto(cx)),
1121 })
1122 .ok();
1123 }
1124
1125 buffer.file_updated(Arc::new(new_file), cx);
1126 Some(events)
1127 })?;
1128
1129 for event in events {
1130 cx.emit(event);
1131 }
1132
1133 None
1134 }
1135
1136 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1137 let file = File::from_dyn(buffer.read(cx).file())?;
1138
1139 let remote_id = buffer.read(cx).remote_id();
1140 if let Some(entry_id) = file.entry_id {
1141 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1142 Some(_) => {
1143 return None;
1144 }
1145 None => {
1146 self.local_buffer_ids_by_entry_id
1147 .insert(entry_id, remote_id);
1148 }
1149 }
1150 };
1151 self.local_buffer_ids_by_path.insert(
1152 ProjectPath {
1153 worktree_id: file.worktree_id(cx),
1154 path: file.path.clone(),
1155 },
1156 remote_id,
1157 );
1158
1159 Some(())
1160 }
1161
1162 fn save_buffer(
1163 &self,
1164 buffer: Entity<Buffer>,
1165 cx: &mut Context<BufferStore>,
1166 ) -> Task<Result<()>> {
1167 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1168 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1169 };
1170 let worktree = file.worktree.clone();
1171 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1172 }
1173
1174 fn save_buffer_as(
1175 &self,
1176 buffer: Entity<Buffer>,
1177 path: ProjectPath,
1178 cx: &mut Context<BufferStore>,
1179 ) -> Task<Result<()>> {
1180 let Some(worktree) = self
1181 .worktree_store
1182 .read(cx)
1183 .worktree_for_id(path.worktree_id, cx)
1184 else {
1185 return Task::ready(Err(anyhow!("no such worktree")));
1186 };
1187 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1188 }
1189
1190 fn open_buffer(
1191 &self,
1192 path: Arc<Path>,
1193 worktree: Entity<Worktree>,
1194 cx: &mut Context<BufferStore>,
1195 ) -> Task<Result<Entity<Buffer>>> {
1196 let load_buffer = worktree.update(cx, |worktree, cx| {
1197 let load_file = worktree.load_file(path.as_ref(), cx);
1198 let reservation = cx.reserve_entity();
1199 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1200 cx.spawn(move |_, mut cx| async move {
1201 let loaded = load_file.await?;
1202 let text_buffer = cx
1203 .background_executor()
1204 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1205 .await;
1206 cx.insert_entity(reservation, |_| {
1207 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1208 })
1209 })
1210 });
1211
1212 cx.spawn(move |this, mut cx| async move {
1213 let buffer = match load_buffer.await {
1214 Ok(buffer) => Ok(buffer),
1215 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1216 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1217 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1218 Buffer::build(
1219 text_buffer,
1220 Some(Arc::new(File {
1221 worktree,
1222 path,
1223 disk_state: DiskState::New,
1224 entry_id: None,
1225 is_local: true,
1226 is_private: false,
1227 })),
1228 Capability::ReadWrite,
1229 )
1230 }),
1231 Err(e) => Err(e),
1232 }?;
1233 this.update(&mut cx, |this, cx| {
1234 this.add_buffer(buffer.clone(), cx)?;
1235 let buffer_id = buffer.read(cx).remote_id();
1236 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1237 let this = this.as_local_mut().unwrap();
1238 this.local_buffer_ids_by_path.insert(
1239 ProjectPath {
1240 worktree_id: file.worktree_id(cx),
1241 path: file.path.clone(),
1242 },
1243 buffer_id,
1244 );
1245
1246 if let Some(entry_id) = file.entry_id {
1247 this.local_buffer_ids_by_entry_id
1248 .insert(entry_id, buffer_id);
1249 }
1250 }
1251
1252 anyhow::Ok(())
1253 })??;
1254
1255 Ok(buffer)
1256 })
1257 }
1258
1259 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1260 cx.spawn(|buffer_store, mut cx| async move {
1261 let buffer =
1262 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1263 buffer_store.update(&mut cx, |buffer_store, cx| {
1264 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1265 })?;
1266 Ok(buffer)
1267 })
1268 }
1269
1270 fn reload_buffers(
1271 &self,
1272 buffers: HashSet<Entity<Buffer>>,
1273 push_to_history: bool,
1274 cx: &mut Context<BufferStore>,
1275 ) -> Task<Result<ProjectTransaction>> {
1276 cx.spawn(move |_, mut cx| async move {
1277 let mut project_transaction = ProjectTransaction::default();
1278 for buffer in buffers {
1279 let transaction = buffer
1280 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1281 .await?;
1282 buffer.update(&mut cx, |buffer, cx| {
1283 if let Some(transaction) = transaction {
1284 if !push_to_history {
1285 buffer.forget_transaction(transaction.id);
1286 }
1287 project_transaction.0.insert(cx.entity(), transaction);
1288 }
1289 })?;
1290 }
1291
1292 Ok(project_transaction)
1293 })
1294 }
1295}
1296
1297impl BufferStore {
1298 pub fn init(client: &AnyProtoClient) {
1299 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1300 client.add_entity_message_handler(Self::handle_buffer_saved);
1301 client.add_entity_message_handler(Self::handle_update_buffer_file);
1302 client.add_entity_request_handler(Self::handle_save_buffer);
1303 client.add_entity_request_handler(Self::handle_blame_buffer);
1304 client.add_entity_request_handler(Self::handle_reload_buffers);
1305 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1306 client.add_entity_request_handler(Self::handle_open_unstaged_changes);
1307 client.add_entity_request_handler(Self::handle_open_uncommitted_changes);
1308 client.add_entity_message_handler(Self::handle_update_diff_bases);
1309 }
1310
1311 /// Creates a buffer store, optionally retaining its buffers.
1312 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1313 Self {
1314 state: BufferStoreState::Local(LocalBufferStore {
1315 local_buffer_ids_by_path: Default::default(),
1316 local_buffer_ids_by_entry_id: Default::default(),
1317 worktree_store: worktree_store.clone(),
1318 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1319 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1320 let this = this.as_local_mut().unwrap();
1321 this.subscribe_to_worktree(worktree, cx);
1322 }
1323 }),
1324 }),
1325 downstream_client: None,
1326 opened_buffers: Default::default(),
1327 shared_buffers: Default::default(),
1328 loading_buffers: Default::default(),
1329 loading_change_sets: Default::default(),
1330 worktree_store,
1331 }
1332 }
1333
1334 pub fn remote(
1335 worktree_store: Entity<WorktreeStore>,
1336 upstream_client: AnyProtoClient,
1337 remote_id: u64,
1338 _cx: &mut Context<Self>,
1339 ) -> Self {
1340 Self {
1341 state: BufferStoreState::Remote(RemoteBufferStore {
1342 shared_with_me: Default::default(),
1343 loading_remote_buffers_by_id: Default::default(),
1344 remote_buffer_listeners: Default::default(),
1345 project_id: remote_id,
1346 upstream_client,
1347 worktree_store: worktree_store.clone(),
1348 }),
1349 downstream_client: None,
1350 opened_buffers: Default::default(),
1351 loading_buffers: Default::default(),
1352 loading_change_sets: Default::default(),
1353 shared_buffers: Default::default(),
1354 worktree_store,
1355 }
1356 }
1357
1358 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1359 match &mut self.state {
1360 BufferStoreState::Local(state) => Some(state),
1361 _ => None,
1362 }
1363 }
1364
1365 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1366 match &mut self.state {
1367 BufferStoreState::Remote(state) => Some(state),
1368 _ => None,
1369 }
1370 }
1371
1372 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1373 match &self.state {
1374 BufferStoreState::Remote(state) => Some(state),
1375 _ => None,
1376 }
1377 }
1378
1379 pub fn open_buffer(
1380 &mut self,
1381 project_path: ProjectPath,
1382 cx: &mut Context<Self>,
1383 ) -> Task<Result<Entity<Buffer>>> {
1384 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1385 return Task::ready(Ok(buffer));
1386 }
1387
1388 let task = match self.loading_buffers.entry(project_path.clone()) {
1389 hash_map::Entry::Occupied(e) => e.get().clone(),
1390 hash_map::Entry::Vacant(entry) => {
1391 let path = project_path.path.clone();
1392 let Some(worktree) = self
1393 .worktree_store
1394 .read(cx)
1395 .worktree_for_id(project_path.worktree_id, cx)
1396 else {
1397 return Task::ready(Err(anyhow!("no such worktree")));
1398 };
1399 let load_buffer = match &self.state {
1400 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1401 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1402 };
1403
1404 entry
1405 .insert(
1406 cx.spawn(move |this, mut cx| async move {
1407 let load_result = load_buffer.await;
1408 this.update(&mut cx, |this, _cx| {
1409 // Record the fact that the buffer is no longer loading.
1410 this.loading_buffers.remove(&project_path);
1411 })
1412 .ok();
1413 load_result.map_err(Arc::new)
1414 })
1415 .shared(),
1416 )
1417 .clone()
1418 }
1419 };
1420
1421 cx.background_executor()
1422 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1423 }
1424
1425 pub fn open_unstaged_changes(
1426 &mut self,
1427 buffer: Entity<Buffer>,
1428 cx: &mut Context<Self>,
1429 ) -> Task<Result<Entity<BufferChangeSet>>> {
1430 let buffer_id = buffer.read(cx).remote_id();
1431 if let Some(change_set) = self.get_unstaged_changes(buffer_id, cx) {
1432 return Task::ready(Ok(change_set));
1433 }
1434
1435 let task = match self
1436 .loading_change_sets
1437 .entry((buffer_id, ChangeSetKind::Unstaged))
1438 {
1439 hash_map::Entry::Occupied(e) => e.get().clone(),
1440 hash_map::Entry::Vacant(entry) => {
1441 let staged_text = match &self.state {
1442 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1443 BufferStoreState::Remote(this) => this.open_unstaged_changes(buffer_id, cx),
1444 };
1445
1446 entry
1447 .insert(
1448 cx.spawn(move |this, cx| async move {
1449 Self::open_change_set_internal(
1450 this,
1451 ChangeSetKind::Unstaged,
1452 staged_text.await.map(DiffBasesChange::SetIndex),
1453 buffer,
1454 cx,
1455 )
1456 .await
1457 .map_err(Arc::new)
1458 })
1459 .shared(),
1460 )
1461 .clone()
1462 }
1463 };
1464
1465 cx.background_executor()
1466 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1467 }
1468
1469 pub fn open_uncommitted_changes(
1470 &mut self,
1471 buffer: Entity<Buffer>,
1472 cx: &mut Context<Self>,
1473 ) -> Task<Result<Entity<BufferChangeSet>>> {
1474 let buffer_id = buffer.read(cx).remote_id();
1475 if let Some(change_set) = self.get_uncommitted_changes(buffer_id, cx) {
1476 return Task::ready(Ok(change_set));
1477 }
1478
1479 let task = match self
1480 .loading_change_sets
1481 .entry((buffer_id, ChangeSetKind::Uncommitted))
1482 {
1483 hash_map::Entry::Occupied(e) => e.get().clone(),
1484 hash_map::Entry::Vacant(entry) => {
1485 let changes = match &self.state {
1486 BufferStoreState::Local(this) => {
1487 let committed_text = this.load_committed_text(&buffer, cx);
1488 let staged_text = this.load_staged_text(&buffer, cx);
1489 cx.background_executor().spawn(async move {
1490 let committed_text = committed_text.await?;
1491 let staged_text = staged_text.await?;
1492 let diff_bases_change = if committed_text == staged_text {
1493 DiffBasesChange::SetBoth(committed_text)
1494 } else {
1495 DiffBasesChange::SetEach {
1496 index: staged_text,
1497 head: committed_text,
1498 }
1499 };
1500 Ok(diff_bases_change)
1501 })
1502 }
1503 BufferStoreState::Remote(this) => this.open_uncommitted_changes(buffer_id, cx),
1504 };
1505
1506 entry
1507 .insert(
1508 cx.spawn(move |this, cx| async move {
1509 Self::open_change_set_internal(
1510 this,
1511 ChangeSetKind::Uncommitted,
1512 changes.await,
1513 buffer,
1514 cx,
1515 )
1516 .await
1517 .map_err(Arc::new)
1518 })
1519 .shared(),
1520 )
1521 .clone()
1522 }
1523 };
1524
1525 cx.background_executor()
1526 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1527 }
1528
1529 #[cfg(any(test, feature = "test-support"))]
1530 pub fn set_unstaged_change_set(
1531 &mut self,
1532 buffer_id: BufferId,
1533 change_set: Entity<BufferChangeSet>,
1534 ) {
1535 self.loading_change_sets.insert(
1536 (buffer_id, ChangeSetKind::Unstaged),
1537 Task::ready(Ok(change_set)).shared(),
1538 );
1539 }
1540
1541 async fn open_change_set_internal(
1542 this: WeakEntity<Self>,
1543 kind: ChangeSetKind,
1544 texts: Result<DiffBasesChange>,
1545 buffer: Entity<Buffer>,
1546 mut cx: AsyncApp,
1547 ) -> Result<Entity<BufferChangeSet>> {
1548 let diff_bases_change = match texts {
1549 Err(e) => {
1550 this.update(&mut cx, |this, cx| {
1551 let buffer_id = buffer.read(cx).remote_id();
1552 this.loading_change_sets.remove(&(buffer_id, kind));
1553 })?;
1554 return Err(e);
1555 }
1556 Ok(change) => change,
1557 };
1558
1559 this.update(&mut cx, |this, cx| {
1560 let buffer_id = buffer.read(cx).remote_id();
1561 this.loading_change_sets.remove(&(buffer_id, kind));
1562
1563 if let Some(OpenBuffer::Complete {
1564 change_set_state, ..
1565 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1566 {
1567 change_set_state.update(cx, |change_set_state, cx| {
1568 let buffer_id = buffer.read(cx).remote_id();
1569 change_set_state.buffer_subscription.get_or_insert_with(|| {
1570 cx.subscribe(&buffer, |this, buffer, event, cx| match event {
1571 BufferEvent::LanguageChanged => {
1572 this.buffer_language_changed(buffer, cx)
1573 }
1574 _ => {}
1575 })
1576 });
1577
1578 let change_set = cx.new(|cx| BufferChangeSet {
1579 buffer_id,
1580 base_text: None,
1581 diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
1582 staged_text: None,
1583 staged_diff: None,
1584 });
1585 match kind {
1586 ChangeSetKind::Unstaged => {
1587 change_set_state.unstaged_changes = Some(change_set.downgrade())
1588 }
1589 ChangeSetKind::Uncommitted => {
1590 change_set_state.uncommitted_changes = Some(change_set.downgrade())
1591 }
1592 };
1593
1594 let buffer = buffer.read(cx).text_snapshot();
1595 let rx = change_set_state.diff_bases_changed(buffer, diff_bases_change, cx);
1596
1597 Ok(async move {
1598 rx.await.ok();
1599 Ok(change_set)
1600 })
1601 })
1602 } else {
1603 Err(anyhow!("buffer was closed"))
1604 }
1605 })??
1606 .await
1607 }
1608
1609 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1610 match &self.state {
1611 BufferStoreState::Local(this) => this.create_buffer(cx),
1612 BufferStoreState::Remote(this) => this.create_buffer(cx),
1613 }
1614 }
1615
1616 pub fn save_buffer(
1617 &mut self,
1618 buffer: Entity<Buffer>,
1619 cx: &mut Context<Self>,
1620 ) -> Task<Result<()>> {
1621 match &mut self.state {
1622 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1623 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1624 }
1625 }
1626
1627 pub fn save_buffer_as(
1628 &mut self,
1629 buffer: Entity<Buffer>,
1630 path: ProjectPath,
1631 cx: &mut Context<Self>,
1632 ) -> Task<Result<()>> {
1633 let old_file = buffer.read(cx).file().cloned();
1634 let task = match &self.state {
1635 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1636 BufferStoreState::Remote(this) => {
1637 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1638 }
1639 };
1640 cx.spawn(|this, mut cx| async move {
1641 task.await?;
1642 this.update(&mut cx, |_, cx| {
1643 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1644 })
1645 })
1646 }
1647
1648 pub fn blame_buffer(
1649 &self,
1650 buffer: &Entity<Buffer>,
1651 version: Option<clock::Global>,
1652 cx: &App,
1653 ) -> Task<Result<Option<Blame>>> {
1654 let buffer = buffer.read(cx);
1655 let Some(file) = File::from_dyn(buffer.file()) else {
1656 return Task::ready(Err(anyhow!("buffer has no file")));
1657 };
1658
1659 match file.worktree.clone().read(cx) {
1660 Worktree::Local(worktree) => {
1661 let worktree = worktree.snapshot();
1662 let blame_params = maybe!({
1663 let local_repo = match worktree.local_repo_for_path(&file.path) {
1664 Some(repo_for_path) => repo_for_path,
1665 None => return Ok(None),
1666 };
1667
1668 let relative_path = local_repo
1669 .relativize(&file.path)
1670 .context("failed to relativize buffer path")?;
1671
1672 let repo = local_repo.repo().clone();
1673
1674 let content = match version {
1675 Some(version) => buffer.rope_for_version(&version).clone(),
1676 None => buffer.as_rope().clone(),
1677 };
1678
1679 anyhow::Ok(Some((repo, relative_path, content)))
1680 });
1681
1682 cx.background_executor().spawn(async move {
1683 let Some((repo, relative_path, content)) = blame_params? else {
1684 return Ok(None);
1685 };
1686 repo.blame(&relative_path, content)
1687 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1688 .map(Some)
1689 })
1690 }
1691 Worktree::Remote(worktree) => {
1692 let buffer_id = buffer.remote_id();
1693 let version = buffer.version();
1694 let project_id = worktree.project_id();
1695 let client = worktree.client();
1696 cx.spawn(|_| async move {
1697 let response = client
1698 .request(proto::BlameBuffer {
1699 project_id,
1700 buffer_id: buffer_id.into(),
1701 version: serialize_version(&version),
1702 })
1703 .await?;
1704 Ok(deserialize_blame_buffer_response(response))
1705 })
1706 }
1707 }
1708 }
1709
1710 pub fn get_permalink_to_line(
1711 &self,
1712 buffer: &Entity<Buffer>,
1713 selection: Range<u32>,
1714 cx: &App,
1715 ) -> Task<Result<url::Url>> {
1716 let buffer = buffer.read(cx);
1717 let Some(file) = File::from_dyn(buffer.file()) else {
1718 return Task::ready(Err(anyhow!("buffer has no file")));
1719 };
1720
1721 match file.worktree.read(cx) {
1722 Worktree::Local(worktree) => {
1723 let worktree_path = worktree.abs_path().clone();
1724 let Some((repo_entry, repo)) =
1725 worktree.repository_for_path(file.path()).and_then(|entry| {
1726 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1727 Some((entry, repo))
1728 })
1729 else {
1730 // If we're not in a Git repo, check whether this is a Rust source
1731 // file in the Cargo registry (presumably opened with go-to-definition
1732 // from a normal Rust file). If so, we can put together a permalink
1733 // using crate metadata.
1734 if !buffer
1735 .language()
1736 .is_some_and(|lang| lang.name() == "Rust".into())
1737 {
1738 return Task::ready(Err(anyhow!("no permalink available")));
1739 }
1740 let file_path = worktree_path.join(file.path());
1741 return cx.spawn(|cx| async move {
1742 let provider_registry =
1743 cx.update(GitHostingProviderRegistry::default_global)?;
1744 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1745 .map_err(|_| anyhow!("no permalink available"))
1746 });
1747 };
1748
1749 let path = match repo_entry.relativize(file.path()) {
1750 Ok(RepoPath(path)) => path,
1751 Err(e) => return Task::ready(Err(e)),
1752 };
1753
1754 cx.spawn(|cx| async move {
1755 const REMOTE_NAME: &str = "origin";
1756 let origin_url = repo
1757 .remote_url(REMOTE_NAME)
1758 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1759
1760 let sha = repo
1761 .head_sha()
1762 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1763
1764 let provider_registry =
1765 cx.update(GitHostingProviderRegistry::default_global)?;
1766
1767 let (provider, remote) =
1768 parse_git_remote_url(provider_registry, &origin_url)
1769 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1770
1771 let path = path
1772 .to_str()
1773 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1774
1775 Ok(provider.build_permalink(
1776 remote,
1777 BuildPermalinkParams {
1778 sha: &sha,
1779 path,
1780 selection: Some(selection),
1781 },
1782 ))
1783 })
1784 }
1785 Worktree::Remote(worktree) => {
1786 let buffer_id = buffer.remote_id();
1787 let project_id = worktree.project_id();
1788 let client = worktree.client();
1789 cx.spawn(|_| async move {
1790 let response = client
1791 .request(proto::GetPermalinkToLine {
1792 project_id,
1793 buffer_id: buffer_id.into(),
1794 selection: Some(proto::Range {
1795 start: selection.start as u64,
1796 end: selection.end as u64,
1797 }),
1798 })
1799 .await?;
1800
1801 url::Url::parse(&response.permalink).context("failed to parse permalink")
1802 })
1803 }
1804 }
1805 }
1806
1807 fn add_buffer(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1808 let remote_id = buffer.read(cx).remote_id();
1809 let is_remote = buffer.read(cx).replica_id() != 0;
1810 let open_buffer = OpenBuffer::Complete {
1811 buffer: buffer.downgrade(),
1812 change_set_state: cx.new(|_| BufferChangeSetState::default()),
1813 };
1814
1815 let handle = cx.entity().downgrade();
1816 buffer.update(cx, move |_, cx| {
1817 cx.on_release(move |buffer, cx| {
1818 handle
1819 .update(cx, |_, cx| {
1820 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1821 })
1822 .ok();
1823 })
1824 .detach()
1825 });
1826
1827 match self.opened_buffers.entry(remote_id) {
1828 hash_map::Entry::Vacant(entry) => {
1829 entry.insert(open_buffer);
1830 }
1831 hash_map::Entry::Occupied(mut entry) => {
1832 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1833 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1834 } else if entry.get().upgrade().is_some() {
1835 if is_remote {
1836 return Ok(());
1837 } else {
1838 debug_panic!("buffer {} was already registered", remote_id);
1839 Err(anyhow!("buffer {} was already registered", remote_id))?;
1840 }
1841 }
1842 entry.insert(open_buffer);
1843 }
1844 }
1845
1846 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1847 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1848 Ok(())
1849 }
1850
1851 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1852 self.opened_buffers
1853 .values()
1854 .filter_map(|buffer| buffer.upgrade())
1855 }
1856
1857 pub fn loading_buffers(
1858 &self,
1859 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1860 self.loading_buffers.iter().map(|(path, task)| {
1861 let task = task.clone();
1862 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1863 })
1864 }
1865
1866 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1867 self.buffers().find_map(|buffer| {
1868 let file = File::from_dyn(buffer.read(cx).file())?;
1869 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1870 Some(buffer)
1871 } else {
1872 None
1873 }
1874 })
1875 }
1876
1877 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1878 self.opened_buffers.get(&buffer_id)?.upgrade()
1879 }
1880
1881 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1882 self.get(buffer_id)
1883 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1884 }
1885
1886 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1887 self.get(buffer_id).or_else(|| {
1888 self.as_remote()
1889 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1890 })
1891 }
1892
1893 pub fn get_unstaged_changes(
1894 &self,
1895 buffer_id: BufferId,
1896 cx: &App,
1897 ) -> Option<Entity<BufferChangeSet>> {
1898 if let OpenBuffer::Complete {
1899 change_set_state, ..
1900 } = self.opened_buffers.get(&buffer_id)?
1901 {
1902 change_set_state
1903 .read(cx)
1904 .unstaged_changes
1905 .as_ref()?
1906 .upgrade()
1907 } else {
1908 None
1909 }
1910 }
1911
1912 pub fn get_uncommitted_changes(
1913 &self,
1914 buffer_id: BufferId,
1915 cx: &App,
1916 ) -> Option<Entity<BufferChangeSet>> {
1917 if let OpenBuffer::Complete {
1918 change_set_state, ..
1919 } = self.opened_buffers.get(&buffer_id)?
1920 {
1921 change_set_state
1922 .read(cx)
1923 .uncommitted_changes
1924 .as_ref()?
1925 .upgrade()
1926 } else {
1927 None
1928 }
1929 }
1930
1931 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1932 let buffers = self
1933 .buffers()
1934 .map(|buffer| {
1935 let buffer = buffer.read(cx);
1936 proto::BufferVersion {
1937 id: buffer.remote_id().into(),
1938 version: language::proto::serialize_version(&buffer.version),
1939 }
1940 })
1941 .collect();
1942 let incomplete_buffer_ids = self
1943 .as_remote()
1944 .map(|remote| remote.incomplete_buffer_ids())
1945 .unwrap_or_default();
1946 (buffers, incomplete_buffer_ids)
1947 }
1948
1949 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1950 for open_buffer in self.opened_buffers.values_mut() {
1951 if let Some(buffer) = open_buffer.upgrade() {
1952 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1953 }
1954 }
1955
1956 for buffer in self.buffers() {
1957 buffer.update(cx, |buffer, cx| {
1958 buffer.set_capability(Capability::ReadOnly, cx)
1959 });
1960 }
1961
1962 if let Some(remote) = self.as_remote_mut() {
1963 // Wake up all futures currently waiting on a buffer to get opened,
1964 // to give them a chance to fail now that we've disconnected.
1965 remote.remote_buffer_listeners.clear()
1966 }
1967 }
1968
1969 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1970 self.downstream_client = Some((downstream_client, remote_id));
1971 }
1972
1973 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1974 self.downstream_client.take();
1975 self.forget_shared_buffers();
1976 }
1977
1978 pub fn discard_incomplete(&mut self) {
1979 self.opened_buffers
1980 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1981 }
1982
1983 pub fn find_search_candidates(
1984 &mut self,
1985 query: &SearchQuery,
1986 mut limit: usize,
1987 fs: Arc<dyn Fs>,
1988 cx: &mut Context<Self>,
1989 ) -> Receiver<Entity<Buffer>> {
1990 let (tx, rx) = smol::channel::unbounded();
1991 let mut open_buffers = HashSet::default();
1992 let mut unnamed_buffers = Vec::new();
1993 for handle in self.buffers() {
1994 let buffer = handle.read(cx);
1995 if let Some(entry_id) = buffer.entry_id(cx) {
1996 open_buffers.insert(entry_id);
1997 } else {
1998 limit = limit.saturating_sub(1);
1999 unnamed_buffers.push(handle)
2000 };
2001 }
2002
2003 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
2004 let project_paths_rx = self
2005 .worktree_store
2006 .update(cx, |worktree_store, cx| {
2007 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
2008 })
2009 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
2010
2011 cx.spawn(|this, mut cx| async move {
2012 for buffer in unnamed_buffers {
2013 tx.send(buffer).await.ok();
2014 }
2015
2016 let mut project_paths_rx = pin!(project_paths_rx);
2017 while let Some(project_paths) = project_paths_rx.next().await {
2018 let buffers = this.update(&mut cx, |this, cx| {
2019 project_paths
2020 .into_iter()
2021 .map(|project_path| this.open_buffer(project_path, cx))
2022 .collect::<Vec<_>>()
2023 })?;
2024 for buffer_task in buffers {
2025 if let Some(buffer) = buffer_task.await.log_err() {
2026 if tx.send(buffer).await.is_err() {
2027 return anyhow::Ok(());
2028 }
2029 }
2030 }
2031 }
2032 anyhow::Ok(())
2033 })
2034 .detach();
2035 rx
2036 }
2037
2038 pub fn recalculate_buffer_diffs(
2039 &mut self,
2040 buffers: Vec<Entity<Buffer>>,
2041 cx: &mut Context<Self>,
2042 ) -> impl Future<Output = ()> {
2043 let mut futures = Vec::new();
2044 for buffer in buffers {
2045 if let Some(OpenBuffer::Complete {
2046 change_set_state, ..
2047 }) = self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
2048 {
2049 let buffer = buffer.read(cx).text_snapshot();
2050 futures.push(change_set_state.update(cx, |change_set_state, cx| {
2051 change_set_state.recalculate_diffs(buffer, cx)
2052 }));
2053 }
2054 }
2055 async move {
2056 futures::future::join_all(futures).await;
2057 }
2058 }
2059
2060 fn on_buffer_event(
2061 &mut self,
2062 buffer: Entity<Buffer>,
2063 event: &BufferEvent,
2064 cx: &mut Context<Self>,
2065 ) {
2066 match event {
2067 BufferEvent::FileHandleChanged => {
2068 if let Some(local) = self.as_local_mut() {
2069 local.buffer_changed_file(buffer, cx);
2070 }
2071 }
2072 BufferEvent::Reloaded => {
2073 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
2074 return;
2075 };
2076 let buffer = buffer.read(cx);
2077 downstream_client
2078 .send(proto::BufferReloaded {
2079 project_id: *project_id,
2080 buffer_id: buffer.remote_id().to_proto(),
2081 version: serialize_version(&buffer.version()),
2082 mtime: buffer.saved_mtime().map(|t| t.into()),
2083 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
2084 })
2085 .log_err();
2086 }
2087 _ => {}
2088 }
2089 }
2090
2091 pub async fn handle_update_buffer(
2092 this: Entity<Self>,
2093 envelope: TypedEnvelope<proto::UpdateBuffer>,
2094 mut cx: AsyncApp,
2095 ) -> Result<proto::Ack> {
2096 let payload = envelope.payload.clone();
2097 let buffer_id = BufferId::new(payload.buffer_id)?;
2098 let ops = payload
2099 .operations
2100 .into_iter()
2101 .map(language::proto::deserialize_operation)
2102 .collect::<Result<Vec<_>, _>>()?;
2103 this.update(&mut cx, |this, cx| {
2104 match this.opened_buffers.entry(buffer_id) {
2105 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2106 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2107 OpenBuffer::Complete { buffer, .. } => {
2108 if let Some(buffer) = buffer.upgrade() {
2109 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2110 }
2111 }
2112 },
2113 hash_map::Entry::Vacant(e) => {
2114 e.insert(OpenBuffer::Operations(ops));
2115 }
2116 }
2117 Ok(proto::Ack {})
2118 })?
2119 }
2120
2121 pub fn register_shared_lsp_handle(
2122 &mut self,
2123 peer_id: proto::PeerId,
2124 buffer_id: BufferId,
2125 handle: OpenLspBufferHandle,
2126 ) {
2127 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2128 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2129 buffer.lsp_handle = Some(handle);
2130 return;
2131 }
2132 }
2133 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2134 }
2135
2136 pub fn handle_synchronize_buffers(
2137 &mut self,
2138 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2139 cx: &mut Context<Self>,
2140 client: Arc<Client>,
2141 ) -> Result<proto::SynchronizeBuffersResponse> {
2142 let project_id = envelope.payload.project_id;
2143 let mut response = proto::SynchronizeBuffersResponse {
2144 buffers: Default::default(),
2145 };
2146 let Some(guest_id) = envelope.original_sender_id else {
2147 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2148 };
2149
2150 self.shared_buffers.entry(guest_id).or_default().clear();
2151 for buffer in envelope.payload.buffers {
2152 let buffer_id = BufferId::new(buffer.id)?;
2153 let remote_version = language::proto::deserialize_version(&buffer.version);
2154 if let Some(buffer) = self.get(buffer_id) {
2155 self.shared_buffers
2156 .entry(guest_id)
2157 .or_default()
2158 .entry(buffer_id)
2159 .or_insert_with(|| SharedBuffer {
2160 buffer: buffer.clone(),
2161 change_set: None,
2162 lsp_handle: None,
2163 });
2164
2165 let buffer = buffer.read(cx);
2166 response.buffers.push(proto::BufferVersion {
2167 id: buffer_id.into(),
2168 version: language::proto::serialize_version(&buffer.version),
2169 });
2170
2171 let operations = buffer.serialize_ops(Some(remote_version), cx);
2172 let client = client.clone();
2173 if let Some(file) = buffer.file() {
2174 client
2175 .send(proto::UpdateBufferFile {
2176 project_id,
2177 buffer_id: buffer_id.into(),
2178 file: Some(file.to_proto(cx)),
2179 })
2180 .log_err();
2181 }
2182
2183 // TODO(max): do something
2184 // client
2185 // .send(proto::UpdateStagedText {
2186 // project_id,
2187 // buffer_id: buffer_id.into(),
2188 // diff_base: buffer.diff_base().map(ToString::to_string),
2189 // })
2190 // .log_err();
2191
2192 client
2193 .send(proto::BufferReloaded {
2194 project_id,
2195 buffer_id: buffer_id.into(),
2196 version: language::proto::serialize_version(buffer.saved_version()),
2197 mtime: buffer.saved_mtime().map(|time| time.into()),
2198 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2199 as i32,
2200 })
2201 .log_err();
2202
2203 cx.background_executor()
2204 .spawn(
2205 async move {
2206 let operations = operations.await;
2207 for chunk in split_operations(operations) {
2208 client
2209 .request(proto::UpdateBuffer {
2210 project_id,
2211 buffer_id: buffer_id.into(),
2212 operations: chunk,
2213 })
2214 .await?;
2215 }
2216 anyhow::Ok(())
2217 }
2218 .log_err(),
2219 )
2220 .detach();
2221 }
2222 }
2223 Ok(response)
2224 }
2225
2226 pub fn handle_create_buffer_for_peer(
2227 &mut self,
2228 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2229 replica_id: u16,
2230 capability: Capability,
2231 cx: &mut Context<Self>,
2232 ) -> Result<()> {
2233 let Some(remote) = self.as_remote_mut() else {
2234 return Err(anyhow!("buffer store is not a remote"));
2235 };
2236
2237 if let Some(buffer) =
2238 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2239 {
2240 self.add_buffer(buffer, cx)?;
2241 }
2242
2243 Ok(())
2244 }
2245
2246 pub async fn handle_update_buffer_file(
2247 this: Entity<Self>,
2248 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2249 mut cx: AsyncApp,
2250 ) -> Result<()> {
2251 let buffer_id = envelope.payload.buffer_id;
2252 let buffer_id = BufferId::new(buffer_id)?;
2253
2254 this.update(&mut cx, |this, cx| {
2255 let payload = envelope.payload.clone();
2256 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2257 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2258 let worktree = this
2259 .worktree_store
2260 .read(cx)
2261 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2262 .ok_or_else(|| anyhow!("no such worktree"))?;
2263 let file = File::from_proto(file, worktree, cx)?;
2264 let old_file = buffer.update(cx, |buffer, cx| {
2265 let old_file = buffer.file().cloned();
2266 let new_path = file.path.clone();
2267 buffer.file_updated(Arc::new(file), cx);
2268 if old_file
2269 .as_ref()
2270 .map_or(true, |old| *old.path() != new_path)
2271 {
2272 Some(old_file)
2273 } else {
2274 None
2275 }
2276 });
2277 if let Some(old_file) = old_file {
2278 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2279 }
2280 }
2281 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2282 downstream_client
2283 .send(proto::UpdateBufferFile {
2284 project_id: *project_id,
2285 buffer_id: buffer_id.into(),
2286 file: envelope.payload.file,
2287 })
2288 .log_err();
2289 }
2290 Ok(())
2291 })?
2292 }
2293
2294 pub async fn handle_save_buffer(
2295 this: Entity<Self>,
2296 envelope: TypedEnvelope<proto::SaveBuffer>,
2297 mut cx: AsyncApp,
2298 ) -> Result<proto::BufferSaved> {
2299 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2300 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2301 anyhow::Ok((
2302 this.get_existing(buffer_id)?,
2303 this.downstream_client
2304 .as_ref()
2305 .map(|(_, project_id)| *project_id)
2306 .context("project is not shared")?,
2307 ))
2308 })??;
2309 buffer
2310 .update(&mut cx, |buffer, _| {
2311 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2312 })?
2313 .await?;
2314 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2315
2316 if let Some(new_path) = envelope.payload.new_path {
2317 let new_path = ProjectPath::from_proto(new_path);
2318 this.update(&mut cx, |this, cx| {
2319 this.save_buffer_as(buffer.clone(), new_path, cx)
2320 })?
2321 .await?;
2322 } else {
2323 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2324 .await?;
2325 }
2326
2327 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2328 project_id,
2329 buffer_id: buffer_id.into(),
2330 version: serialize_version(buffer.saved_version()),
2331 mtime: buffer.saved_mtime().map(|time| time.into()),
2332 })
2333 }
2334
2335 pub async fn handle_close_buffer(
2336 this: Entity<Self>,
2337 envelope: TypedEnvelope<proto::CloseBuffer>,
2338 mut cx: AsyncApp,
2339 ) -> Result<()> {
2340 let peer_id = envelope.sender_id;
2341 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2342 this.update(&mut cx, |this, _| {
2343 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2344 if shared.remove(&buffer_id).is_some() {
2345 if shared.is_empty() {
2346 this.shared_buffers.remove(&peer_id);
2347 }
2348 return;
2349 }
2350 }
2351 debug_panic!(
2352 "peer_id {} closed buffer_id {} which was either not open or already closed",
2353 peer_id,
2354 buffer_id
2355 )
2356 })
2357 }
2358
2359 pub async fn handle_buffer_saved(
2360 this: Entity<Self>,
2361 envelope: TypedEnvelope<proto::BufferSaved>,
2362 mut cx: AsyncApp,
2363 ) -> Result<()> {
2364 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2365 let version = deserialize_version(&envelope.payload.version);
2366 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2367 this.update(&mut cx, move |this, cx| {
2368 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2369 buffer.update(cx, |buffer, cx| {
2370 buffer.did_save(version, mtime, cx);
2371 });
2372 }
2373
2374 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2375 downstream_client
2376 .send(proto::BufferSaved {
2377 project_id: *project_id,
2378 buffer_id: buffer_id.into(),
2379 mtime: envelope.payload.mtime,
2380 version: envelope.payload.version,
2381 })
2382 .log_err();
2383 }
2384 })
2385 }
2386
2387 pub async fn handle_buffer_reloaded(
2388 this: Entity<Self>,
2389 envelope: TypedEnvelope<proto::BufferReloaded>,
2390 mut cx: AsyncApp,
2391 ) -> Result<()> {
2392 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2393 let version = deserialize_version(&envelope.payload.version);
2394 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2395 let line_ending = deserialize_line_ending(
2396 proto::LineEnding::from_i32(envelope.payload.line_ending)
2397 .ok_or_else(|| anyhow!("missing line ending"))?,
2398 );
2399 this.update(&mut cx, |this, cx| {
2400 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2401 buffer.update(cx, |buffer, cx| {
2402 buffer.did_reload(version, line_ending, mtime, cx);
2403 });
2404 }
2405
2406 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2407 downstream_client
2408 .send(proto::BufferReloaded {
2409 project_id: *project_id,
2410 buffer_id: buffer_id.into(),
2411 mtime: envelope.payload.mtime,
2412 version: envelope.payload.version,
2413 line_ending: envelope.payload.line_ending,
2414 })
2415 .log_err();
2416 }
2417 })
2418 }
2419
2420 pub async fn handle_blame_buffer(
2421 this: Entity<Self>,
2422 envelope: TypedEnvelope<proto::BlameBuffer>,
2423 mut cx: AsyncApp,
2424 ) -> Result<proto::BlameBufferResponse> {
2425 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2426 let version = deserialize_version(&envelope.payload.version);
2427 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2428 buffer
2429 .update(&mut cx, |buffer, _| {
2430 buffer.wait_for_version(version.clone())
2431 })?
2432 .await?;
2433 let blame = this
2434 .update(&mut cx, |this, cx| {
2435 this.blame_buffer(&buffer, Some(version), cx)
2436 })?
2437 .await?;
2438 Ok(serialize_blame_buffer_response(blame))
2439 }
2440
2441 pub async fn handle_get_permalink_to_line(
2442 this: Entity<Self>,
2443 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2444 mut cx: AsyncApp,
2445 ) -> Result<proto::GetPermalinkToLineResponse> {
2446 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2447 // let version = deserialize_version(&envelope.payload.version);
2448 let selection = {
2449 let proto_selection = envelope
2450 .payload
2451 .selection
2452 .context("no selection to get permalink for defined")?;
2453 proto_selection.start as u32..proto_selection.end as u32
2454 };
2455 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2456 let permalink = this
2457 .update(&mut cx, |this, cx| {
2458 this.get_permalink_to_line(&buffer, selection, cx)
2459 })?
2460 .await?;
2461 Ok(proto::GetPermalinkToLineResponse {
2462 permalink: permalink.to_string(),
2463 })
2464 }
2465
2466 pub async fn handle_open_unstaged_changes(
2467 this: Entity<Self>,
2468 request: TypedEnvelope<proto::OpenUnstagedChanges>,
2469 mut cx: AsyncApp,
2470 ) -> Result<proto::OpenUnstagedChangesResponse> {
2471 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2472 let change_set = this
2473 .update(&mut cx, |this, cx| {
2474 let buffer = this.get(buffer_id)?;
2475 Some(this.open_unstaged_changes(buffer, cx))
2476 })?
2477 .ok_or_else(|| anyhow!("no such buffer"))?
2478 .await?;
2479 this.update(&mut cx, |this, _| {
2480 let shared_buffers = this
2481 .shared_buffers
2482 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2483 .or_default();
2484 debug_assert!(shared_buffers.contains_key(&buffer_id));
2485 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2486 shared.change_set = Some(change_set.clone());
2487 }
2488 })?;
2489 let staged_text = change_set.read_with(&cx, |change_set, _| {
2490 change_set.base_text.as_ref().map(|buffer| buffer.text())
2491 })?;
2492 Ok(proto::OpenUnstagedChangesResponse { staged_text })
2493 }
2494
2495 pub async fn handle_open_uncommitted_changes(
2496 this: Entity<Self>,
2497 request: TypedEnvelope<proto::OpenUncommittedChanges>,
2498 mut cx: AsyncApp,
2499 ) -> Result<proto::OpenUncommittedChangesResponse> {
2500 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2501 let change_set = this
2502 .update(&mut cx, |this, cx| {
2503 let buffer = this.get(buffer_id)?;
2504 Some(this.open_uncommitted_changes(buffer, cx))
2505 })?
2506 .ok_or_else(|| anyhow!("no such buffer"))?
2507 .await?;
2508 this.update(&mut cx, |this, _| {
2509 let shared_buffers = this
2510 .shared_buffers
2511 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2512 .or_default();
2513 debug_assert!(shared_buffers.contains_key(&buffer_id));
2514 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2515 shared.change_set = Some(change_set.clone());
2516 }
2517 })?;
2518 change_set.read_with(&cx, |change_set, _| {
2519 use proto::open_uncommitted_changes_response::Mode;
2520
2521 let mode;
2522 let staged_text;
2523 let committed_text;
2524 if let Some(committed_buffer) = &change_set.base_text {
2525 committed_text = Some(committed_buffer.text());
2526 if let Some(staged_buffer) = &change_set.staged_text {
2527 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2528 mode = Mode::IndexMatchesHead;
2529 staged_text = None;
2530 } else {
2531 mode = Mode::IndexAndHead;
2532 staged_text = Some(staged_buffer.text());
2533 }
2534 } else {
2535 mode = Mode::IndexAndHead;
2536 staged_text = None;
2537 }
2538 } else {
2539 mode = Mode::IndexAndHead;
2540 committed_text = None;
2541 staged_text = change_set.staged_text.as_ref().map(|buffer| buffer.text());
2542 }
2543
2544 proto::OpenUncommittedChangesResponse {
2545 committed_text,
2546 staged_text,
2547 mode: mode.into(),
2548 }
2549 })
2550 }
2551
2552 pub async fn handle_update_diff_bases(
2553 this: Entity<Self>,
2554 request: TypedEnvelope<proto::UpdateDiffBases>,
2555 mut cx: AsyncApp,
2556 ) -> Result<()> {
2557 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2558 this.update(&mut cx, |this, cx| {
2559 if let Some(OpenBuffer::Complete {
2560 change_set_state,
2561 buffer,
2562 }) = this.opened_buffers.get_mut(&buffer_id)
2563 {
2564 if let Some(buffer) = buffer.upgrade() {
2565 let buffer = buffer.read(cx).text_snapshot();
2566 change_set_state.update(cx, |change_set_state, cx| {
2567 change_set_state.handle_base_texts_updated(buffer, request.payload, cx);
2568 })
2569 }
2570 }
2571 })
2572 }
2573
2574 pub fn reload_buffers(
2575 &self,
2576 buffers: HashSet<Entity<Buffer>>,
2577 push_to_history: bool,
2578 cx: &mut Context<Self>,
2579 ) -> Task<Result<ProjectTransaction>> {
2580 if buffers.is_empty() {
2581 return Task::ready(Ok(ProjectTransaction::default()));
2582 }
2583 match &self.state {
2584 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2585 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2586 }
2587 }
2588
2589 async fn handle_reload_buffers(
2590 this: Entity<Self>,
2591 envelope: TypedEnvelope<proto::ReloadBuffers>,
2592 mut cx: AsyncApp,
2593 ) -> Result<proto::ReloadBuffersResponse> {
2594 let sender_id = envelope.original_sender_id().unwrap_or_default();
2595 let reload = this.update(&mut cx, |this, cx| {
2596 let mut buffers = HashSet::default();
2597 for buffer_id in &envelope.payload.buffer_ids {
2598 let buffer_id = BufferId::new(*buffer_id)?;
2599 buffers.insert(this.get_existing(buffer_id)?);
2600 }
2601 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2602 })??;
2603
2604 let project_transaction = reload.await?;
2605 let project_transaction = this.update(&mut cx, |this, cx| {
2606 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2607 })?;
2608 Ok(proto::ReloadBuffersResponse {
2609 transaction: Some(project_transaction),
2610 })
2611 }
2612
2613 pub fn create_buffer_for_peer(
2614 &mut self,
2615 buffer: &Entity<Buffer>,
2616 peer_id: proto::PeerId,
2617 cx: &mut Context<Self>,
2618 ) -> Task<Result<()>> {
2619 let buffer_id = buffer.read(cx).remote_id();
2620 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2621 if shared_buffers.contains_key(&buffer_id) {
2622 return Task::ready(Ok(()));
2623 }
2624 shared_buffers.insert(
2625 buffer_id,
2626 SharedBuffer {
2627 buffer: buffer.clone(),
2628 change_set: None,
2629 lsp_handle: None,
2630 },
2631 );
2632
2633 let Some((client, project_id)) = self.downstream_client.clone() else {
2634 return Task::ready(Ok(()));
2635 };
2636
2637 cx.spawn(|this, mut cx| async move {
2638 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2639 return anyhow::Ok(());
2640 };
2641
2642 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2643 let operations = operations.await;
2644 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2645
2646 let initial_state = proto::CreateBufferForPeer {
2647 project_id,
2648 peer_id: Some(peer_id),
2649 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2650 };
2651
2652 if client.send(initial_state).log_err().is_some() {
2653 let client = client.clone();
2654 cx.background_executor()
2655 .spawn(async move {
2656 let mut chunks = split_operations(operations).peekable();
2657 while let Some(chunk) = chunks.next() {
2658 let is_last = chunks.peek().is_none();
2659 client.send(proto::CreateBufferForPeer {
2660 project_id,
2661 peer_id: Some(peer_id),
2662 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2663 proto::BufferChunk {
2664 buffer_id: buffer_id.into(),
2665 operations: chunk,
2666 is_last,
2667 },
2668 )),
2669 })?;
2670 }
2671 anyhow::Ok(())
2672 })
2673 .await
2674 .log_err();
2675 }
2676 Ok(())
2677 })
2678 }
2679
2680 pub fn forget_shared_buffers(&mut self) {
2681 self.shared_buffers.clear();
2682 }
2683
2684 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2685 self.shared_buffers.remove(peer_id);
2686 }
2687
2688 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2689 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2690 self.shared_buffers.insert(new_peer_id, buffers);
2691 }
2692 }
2693
2694 pub fn has_shared_buffers(&self) -> bool {
2695 !self.shared_buffers.is_empty()
2696 }
2697
2698 pub fn create_local_buffer(
2699 &mut self,
2700 text: &str,
2701 language: Option<Arc<Language>>,
2702 cx: &mut Context<Self>,
2703 ) -> Entity<Buffer> {
2704 let buffer = cx.new(|cx| {
2705 Buffer::local(text, cx)
2706 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2707 });
2708
2709 self.add_buffer(buffer.clone(), cx).log_err();
2710 let buffer_id = buffer.read(cx).remote_id();
2711
2712 let this = self
2713 .as_local_mut()
2714 .expect("local-only method called in a non-local context");
2715 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2716 this.local_buffer_ids_by_path.insert(
2717 ProjectPath {
2718 worktree_id: file.worktree_id(cx),
2719 path: file.path.clone(),
2720 },
2721 buffer_id,
2722 );
2723
2724 if let Some(entry_id) = file.entry_id {
2725 this.local_buffer_ids_by_entry_id
2726 .insert(entry_id, buffer_id);
2727 }
2728 }
2729 buffer
2730 }
2731
2732 pub fn deserialize_project_transaction(
2733 &mut self,
2734 message: proto::ProjectTransaction,
2735 push_to_history: bool,
2736 cx: &mut Context<Self>,
2737 ) -> Task<Result<ProjectTransaction>> {
2738 if let Some(this) = self.as_remote_mut() {
2739 this.deserialize_project_transaction(message, push_to_history, cx)
2740 } else {
2741 debug_panic!("not a remote buffer store");
2742 Task::ready(Err(anyhow!("not a remote buffer store")))
2743 }
2744 }
2745
2746 pub fn wait_for_remote_buffer(
2747 &mut self,
2748 id: BufferId,
2749 cx: &mut Context<BufferStore>,
2750 ) -> Task<Result<Entity<Buffer>>> {
2751 if let Some(this) = self.as_remote_mut() {
2752 this.wait_for_remote_buffer(id, cx)
2753 } else {
2754 debug_panic!("not a remote buffer store");
2755 Task::ready(Err(anyhow!("not a remote buffer store")))
2756 }
2757 }
2758
2759 pub fn serialize_project_transaction_for_peer(
2760 &mut self,
2761 project_transaction: ProjectTransaction,
2762 peer_id: proto::PeerId,
2763 cx: &mut Context<Self>,
2764 ) -> proto::ProjectTransaction {
2765 let mut serialized_transaction = proto::ProjectTransaction {
2766 buffer_ids: Default::default(),
2767 transactions: Default::default(),
2768 };
2769 for (buffer, transaction) in project_transaction.0 {
2770 self.create_buffer_for_peer(&buffer, peer_id, cx)
2771 .detach_and_log_err(cx);
2772 serialized_transaction
2773 .buffer_ids
2774 .push(buffer.read(cx).remote_id().into());
2775 serialized_transaction
2776 .transactions
2777 .push(language::proto::serialize_transaction(&transaction));
2778 }
2779 serialized_transaction
2780 }
2781}
2782
2783impl EventEmitter<BufferChangeSetEvent> for BufferChangeSet {}
2784
2785impl BufferChangeSet {
2786 fn set_state(
2787 &mut self,
2788 base_text: Option<language::BufferSnapshot>,
2789 diff: BufferDiff,
2790 buffer: &text::BufferSnapshot,
2791 cx: &mut Context<Self>,
2792 ) {
2793 if let Some(base_text) = base_text.as_ref() {
2794 let changed_range = if Some(base_text.remote_id())
2795 != self.base_text.as_ref().map(|buffer| buffer.remote_id())
2796 {
2797 Some(text::Anchor::MIN..text::Anchor::MAX)
2798 } else {
2799 diff.compare(&self.diff_to_buffer, buffer)
2800 };
2801 if let Some(changed_range) = changed_range {
2802 cx.emit(BufferChangeSetEvent::DiffChanged { changed_range });
2803 }
2804 }
2805 self.base_text = base_text;
2806 self.diff_to_buffer = diff;
2807 }
2808
2809 pub fn diff_hunks_intersecting_range<'a>(
2810 &'a self,
2811 range: Range<text::Anchor>,
2812 buffer_snapshot: &'a text::BufferSnapshot,
2813 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2814 self.diff_to_buffer
2815 .hunks_intersecting_range(range, buffer_snapshot)
2816 }
2817
2818 pub fn diff_hunks_intersecting_range_rev<'a>(
2819 &'a self,
2820 range: Range<text::Anchor>,
2821 buffer_snapshot: &'a text::BufferSnapshot,
2822 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2823 self.diff_to_buffer
2824 .hunks_intersecting_range_rev(range, buffer_snapshot)
2825 }
2826
2827 /// Used in cases where the change set isn't derived from git.
2828 pub fn set_base_text(
2829 &mut self,
2830 base_buffer: Entity<language::Buffer>,
2831 buffer: text::BufferSnapshot,
2832 cx: &mut Context<Self>,
2833 ) -> oneshot::Receiver<()> {
2834 let (tx, rx) = oneshot::channel();
2835 let this = cx.weak_entity();
2836 let base_buffer = base_buffer.read(cx).snapshot();
2837 cx.spawn(|_, mut cx| async move {
2838 let diff = cx
2839 .background_executor()
2840 .spawn({
2841 let base_buffer = base_buffer.clone();
2842 let buffer = buffer.clone();
2843 async move { BufferDiff::build(Some(&base_buffer.text()), &buffer) }
2844 })
2845 .await;
2846 let Some(this) = this.upgrade() else {
2847 tx.send(()).ok();
2848 return;
2849 };
2850 this.update(&mut cx, |this, cx| {
2851 this.set_state(Some(base_buffer), diff, &buffer, cx);
2852 })
2853 .log_err();
2854 tx.send(()).ok();
2855 })
2856 .detach();
2857 rx
2858 }
2859
2860 #[cfg(any(test, feature = "test-support"))]
2861 pub fn base_text_string(&self) -> Option<String> {
2862 self.base_text.as_ref().map(|buffer| buffer.text())
2863 }
2864
2865 pub fn new(buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2866 BufferChangeSet {
2867 buffer_id: buffer.read(cx).remote_id(),
2868 base_text: None,
2869 diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
2870 staged_text: None,
2871 staged_diff: None,
2872 }
2873 }
2874
2875 #[cfg(any(test, feature = "test-support"))]
2876 pub fn new_with_base_text(base_text: &str, buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2877 let mut base_text = base_text.to_owned();
2878 text::LineEnding::normalize(&mut base_text);
2879 let diff_to_buffer = BufferDiff::build(Some(&base_text), &buffer.read(cx).text_snapshot());
2880 let base_text = language::Buffer::build_snapshot_sync(base_text.into(), None, None, cx);
2881 BufferChangeSet {
2882 buffer_id: buffer.read(cx).remote_id(),
2883 base_text: Some(base_text),
2884 diff_to_buffer,
2885 staged_text: None,
2886 staged_diff: None,
2887 }
2888 }
2889
2890 #[cfg(any(test, feature = "test-support"))]
2891 pub fn recalculate_diff_sync(
2892 &mut self,
2893 snapshot: text::BufferSnapshot,
2894 cx: &mut Context<Self>,
2895 ) {
2896 let mut base_text = self.base_text.as_ref().map(|buffer| buffer.text());
2897 if let Some(base_text) = base_text.as_mut() {
2898 text::LineEnding::normalize(base_text);
2899 }
2900 let diff_to_buffer = BufferDiff::build(base_text.as_deref(), &snapshot);
2901 self.set_state(self.base_text.clone(), diff_to_buffer, &snapshot, cx);
2902 }
2903}
2904
2905impl OpenBuffer {
2906 fn upgrade(&self) -> Option<Entity<Buffer>> {
2907 match self {
2908 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2909 OpenBuffer::Operations(_) => None,
2910 }
2911 }
2912}
2913
2914fn is_not_found_error(error: &anyhow::Error) -> bool {
2915 error
2916 .root_cause()
2917 .downcast_ref::<io::Error>()
2918 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2919}
2920
2921fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2922 let Some(blame) = blame else {
2923 return proto::BlameBufferResponse {
2924 blame_response: None,
2925 };
2926 };
2927
2928 let entries = blame
2929 .entries
2930 .into_iter()
2931 .map(|entry| proto::BlameEntry {
2932 sha: entry.sha.as_bytes().into(),
2933 start_line: entry.range.start,
2934 end_line: entry.range.end,
2935 original_line_number: entry.original_line_number,
2936 author: entry.author.clone(),
2937 author_mail: entry.author_mail.clone(),
2938 author_time: entry.author_time,
2939 author_tz: entry.author_tz.clone(),
2940 committer: entry.committer.clone(),
2941 committer_mail: entry.committer_mail.clone(),
2942 committer_time: entry.committer_time,
2943 committer_tz: entry.committer_tz.clone(),
2944 summary: entry.summary.clone(),
2945 previous: entry.previous.clone(),
2946 filename: entry.filename.clone(),
2947 })
2948 .collect::<Vec<_>>();
2949
2950 let messages = blame
2951 .messages
2952 .into_iter()
2953 .map(|(oid, message)| proto::CommitMessage {
2954 oid: oid.as_bytes().into(),
2955 message,
2956 })
2957 .collect::<Vec<_>>();
2958
2959 let permalinks = blame
2960 .permalinks
2961 .into_iter()
2962 .map(|(oid, url)| proto::CommitPermalink {
2963 oid: oid.as_bytes().into(),
2964 permalink: url.to_string(),
2965 })
2966 .collect::<Vec<_>>();
2967
2968 proto::BlameBufferResponse {
2969 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2970 entries,
2971 messages,
2972 permalinks,
2973 remote_url: blame.remote_url,
2974 }),
2975 }
2976}
2977
2978fn deserialize_blame_buffer_response(
2979 response: proto::BlameBufferResponse,
2980) -> Option<git::blame::Blame> {
2981 let response = response.blame_response?;
2982 let entries = response
2983 .entries
2984 .into_iter()
2985 .filter_map(|entry| {
2986 Some(git::blame::BlameEntry {
2987 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2988 range: entry.start_line..entry.end_line,
2989 original_line_number: entry.original_line_number,
2990 committer: entry.committer,
2991 committer_time: entry.committer_time,
2992 committer_tz: entry.committer_tz,
2993 committer_mail: entry.committer_mail,
2994 author: entry.author,
2995 author_mail: entry.author_mail,
2996 author_time: entry.author_time,
2997 author_tz: entry.author_tz,
2998 summary: entry.summary,
2999 previous: entry.previous,
3000 filename: entry.filename,
3001 })
3002 })
3003 .collect::<Vec<_>>();
3004
3005 let messages = response
3006 .messages
3007 .into_iter()
3008 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
3009 .collect::<HashMap<_, _>>();
3010
3011 let permalinks = response
3012 .permalinks
3013 .into_iter()
3014 .filter_map(|permalink| {
3015 Some((
3016 git::Oid::from_bytes(&permalink.oid).ok()?,
3017 Url::from_str(&permalink.permalink).ok()?,
3018 ))
3019 })
3020 .collect::<HashMap<_, _>>();
3021
3022 Some(Blame {
3023 entries,
3024 permalinks,
3025 messages,
3026 remote_url: response.remote_url,
3027 })
3028}
3029
3030fn get_permalink_in_rust_registry_src(
3031 provider_registry: Arc<GitHostingProviderRegistry>,
3032 path: PathBuf,
3033 selection: Range<u32>,
3034) -> Result<url::Url> {
3035 #[derive(Deserialize)]
3036 struct CargoVcsGit {
3037 sha1: String,
3038 }
3039
3040 #[derive(Deserialize)]
3041 struct CargoVcsInfo {
3042 git: CargoVcsGit,
3043 path_in_vcs: String,
3044 }
3045
3046 #[derive(Deserialize)]
3047 struct CargoPackage {
3048 repository: String,
3049 }
3050
3051 #[derive(Deserialize)]
3052 struct CargoToml {
3053 package: CargoPackage,
3054 }
3055
3056 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
3057 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
3058 Some((dir, json))
3059 }) else {
3060 bail!("No .cargo_vcs_info.json found in parent directories")
3061 };
3062 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
3063 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
3064 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
3065 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
3066 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
3067 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
3068 let permalink = provider.build_permalink(
3069 remote,
3070 BuildPermalinkParams {
3071 sha: &cargo_vcs_info.git.sha1,
3072 path: &path.to_string_lossy(),
3073 selection: Some(selection),
3074 },
3075 );
3076 Ok(permalink)
3077}