1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{
13 channel::oneshot,
14 future::{OptionFuture, Shared},
15 Future, FutureExt as _, StreamExt,
16};
17use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
18use gpui::{
19 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
20};
21use http_client::Url;
22use language::{
23 proto::{
24 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
25 split_operations,
26 },
27 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
28};
29use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::{BufferId, Rope};
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum ChangeSetKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_change_sets: HashMap<
58 (BufferId, ChangeSetKind),
59 Shared<Task<Result<Entity<BufferChangeSet>, Arc<anyhow::Error>>>>,
60 >,
61 worktree_store: Entity<WorktreeStore>,
62 opened_buffers: HashMap<BufferId, OpenBuffer>,
63 downstream_client: Option<(AnyProtoClient, u64)>,
64 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
65}
66
67#[derive(Hash, Eq, PartialEq, Clone)]
68struct SharedBuffer {
69 buffer: Entity<Buffer>,
70 change_set: Option<Entity<BufferChangeSet>>,
71 lsp_handle: Option<OpenLspBufferHandle>,
72}
73
74#[derive(Default)]
75struct BufferChangeSetState {
76 unstaged_changes: Option<WeakEntity<BufferChangeSet>>,
77 uncommitted_changes: Option<WeakEntity<BufferChangeSet>>,
78 recalculate_diff_task: Option<Task<Result<()>>>,
79 language: Option<Arc<Language>>,
80 language_registry: Option<Arc<LanguageRegistry>>,
81 diff_updated_futures: Vec<oneshot::Sender<()>>,
82 buffer_subscription: Option<Subscription>,
83
84 head_text: Option<Arc<String>>,
85 index_text: Option<Arc<String>>,
86 head_changed: bool,
87 index_changed: bool,
88}
89
90#[derive(Clone, Debug)]
91enum DiffBasesChange {
92 SetIndex(Option<String>),
93 SetHead(Option<String>),
94 SetEach {
95 index: Option<String>,
96 head: Option<String>,
97 },
98 SetBoth(Option<String>),
99}
100
101impl BufferChangeSetState {
102 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
103 self.language = buffer.read(cx).language().cloned();
104 self.index_changed = self.index_text.is_some();
105 self.head_changed = self.head_text.is_some();
106 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
107 }
108
109 fn unstaged_changes(&self) -> Option<Entity<BufferChangeSet>> {
110 self.unstaged_changes.as_ref().and_then(|set| set.upgrade())
111 }
112
113 fn uncommitted_changes(&self) -> Option<Entity<BufferChangeSet>> {
114 self.uncommitted_changes
115 .as_ref()
116 .and_then(|set| set.upgrade())
117 }
118
119 fn handle_base_texts_updated(
120 &mut self,
121 buffer: text::BufferSnapshot,
122 message: proto::UpdateDiffBases,
123 cx: &mut Context<Self>,
124 ) {
125 use proto::update_diff_bases::Mode;
126
127 let Some(mode) = Mode::from_i32(message.mode) else {
128 return;
129 };
130
131 let diff_bases_change = match mode {
132 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
133 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
134 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.staged_text),
135 Mode::IndexAndHead => DiffBasesChange::SetEach {
136 index: message.staged_text,
137 head: message.committed_text,
138 },
139 };
140
141 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
142 }
143
144 fn diff_bases_changed(
145 &mut self,
146 buffer: text::BufferSnapshot,
147 diff_bases_change: DiffBasesChange,
148 cx: &mut Context<Self>,
149 ) -> oneshot::Receiver<()> {
150 match diff_bases_change {
151 DiffBasesChange::SetIndex(index) => {
152 self.index_text = index.map(|mut text| {
153 text::LineEnding::normalize(&mut text);
154 Arc::new(text)
155 });
156 self.index_changed = true;
157 }
158 DiffBasesChange::SetHead(head) => {
159 self.head_text = head.map(|mut text| {
160 text::LineEnding::normalize(&mut text);
161 Arc::new(text)
162 });
163 self.head_changed = true;
164 }
165 DiffBasesChange::SetBoth(mut text) => {
166 if let Some(text) = text.as_mut() {
167 text::LineEnding::normalize(text);
168 }
169 self.head_text = text.map(Arc::new);
170 self.index_text = self.head_text.clone();
171 self.head_changed = true;
172 self.index_changed = true;
173 }
174 DiffBasesChange::SetEach { index, head } => {
175 self.index_text = index.map(|mut text| {
176 text::LineEnding::normalize(&mut text);
177 Arc::new(text)
178 });
179 self.head_text = head.map(|mut text| {
180 text::LineEnding::normalize(&mut text);
181 Arc::new(text)
182 });
183 self.head_changed = true;
184 self.index_changed = true;
185 }
186 }
187
188 self.recalculate_diffs(buffer, cx)
189 }
190
191 fn recalculate_diffs(
192 &mut self,
193 buffer: text::BufferSnapshot,
194 cx: &mut Context<Self>,
195 ) -> oneshot::Receiver<()> {
196 let (tx, rx) = oneshot::channel();
197 self.diff_updated_futures.push(tx);
198
199 let language = self.language.clone();
200 let language_registry = self.language_registry.clone();
201 let unstaged_changes = self.unstaged_changes();
202 let uncommitted_changes = self.uncommitted_changes();
203 let head = self.head_text.clone();
204 let index = self.index_text.clone();
205 let index_changed = self.index_changed;
206 let head_changed = self.head_changed;
207 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
208 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
209 (None, None) => true,
210 _ => false,
211 };
212 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
213 if let Some(unstaged_changes) = &unstaged_changes {
214 let staged_snapshot = if index_changed {
215 let staged_snapshot = cx.update(|cx| {
216 index.as_ref().map(|head| {
217 language::Buffer::build_snapshot(
218 Rope::from(head.as_str()),
219 language.clone(),
220 language_registry.clone(),
221 cx,
222 )
223 })
224 })?;
225 cx.background_executor()
226 .spawn(OptionFuture::from(staged_snapshot))
227 } else {
228 Task::ready(
229 unstaged_changes
230 .read_with(&cx, |change_set, _| change_set.base_text.clone())?,
231 )
232 };
233
234 let diff =
235 cx.background_executor().spawn({
236 let buffer = buffer.clone();
237 async move {
238 BufferDiff::build(index.as_ref().map(|index| index.as_str()), &buffer)
239 }
240 });
241
242 let (staged_snapshot, diff) = futures::join!(staged_snapshot, diff);
243
244 unstaged_changes.update(&mut cx, |unstaged_changes, cx| {
245 unstaged_changes.set_state(staged_snapshot.clone(), diff, &buffer, cx);
246 })?;
247 }
248
249 if let Some(uncommitted_changes) = &uncommitted_changes {
250 let (snapshot, diff) = if let (Some(unstaged_changes), true) =
251 (&unstaged_changes, index_matches_head)
252 {
253 unstaged_changes.read_with(&cx, |change_set, _| {
254 (
255 change_set.base_text.clone(),
256 change_set.diff_to_buffer.clone(),
257 )
258 })?
259 } else {
260 let committed_snapshot = if head_changed {
261 let committed_snapshot = cx.update(|cx| {
262 head.as_ref().map(|head| {
263 language::Buffer::build_snapshot(
264 Rope::from(head.as_str()),
265 language.clone(),
266 language_registry.clone(),
267 cx,
268 )
269 })
270 })?;
271 cx.background_executor()
272 .spawn(OptionFuture::from(committed_snapshot))
273 } else {
274 Task::ready(
275 uncommitted_changes
276 .read_with(&cx, |change_set, _| change_set.base_text.clone())?,
277 )
278 };
279
280 let diff = cx.background_executor().spawn({
281 let buffer = buffer.clone();
282 let head = head.clone();
283 async move {
284 BufferDiff::build(head.as_ref().map(|head| head.as_str()), &buffer)
285 }
286 });
287 futures::join!(committed_snapshot, diff)
288 };
289
290 uncommitted_changes.update(&mut cx, |change_set, cx| {
291 change_set.set_state(snapshot, diff, &buffer, cx);
292 })?;
293 }
294
295 if let Some(this) = this.upgrade() {
296 this.update(&mut cx, |this, _| {
297 this.index_changed = false;
298 this.head_changed = false;
299 for tx in this.diff_updated_futures.drain(..) {
300 tx.send(()).ok();
301 }
302 })?;
303 }
304
305 Ok(())
306 }));
307
308 rx
309 }
310}
311
312pub struct BufferChangeSet {
313 pub buffer_id: BufferId,
314 pub base_text: Option<language::BufferSnapshot>,
315 pub diff_to_buffer: BufferDiff,
316 pub unstaged_change_set: Option<Entity<BufferChangeSet>>,
317}
318
319impl std::fmt::Debug for BufferChangeSet {
320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 f.debug_struct("BufferChangeSet")
322 .field("buffer_id", &self.buffer_id)
323 .field("base_text", &self.base_text.as_ref().map(|s| s.text()))
324 .field("diff_to_buffer", &self.diff_to_buffer)
325 .finish()
326 }
327}
328
329pub enum BufferChangeSetEvent {
330 DiffChanged { changed_range: Range<text::Anchor> },
331}
332
333enum BufferStoreState {
334 Local(LocalBufferStore),
335 Remote(RemoteBufferStore),
336}
337
338struct RemoteBufferStore {
339 shared_with_me: HashSet<Entity<Buffer>>,
340 upstream_client: AnyProtoClient,
341 project_id: u64,
342 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
343 remote_buffer_listeners:
344 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
345 worktree_store: Entity<WorktreeStore>,
346}
347
348struct LocalBufferStore {
349 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
350 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
351 worktree_store: Entity<WorktreeStore>,
352 _subscription: Subscription,
353}
354
355enum OpenBuffer {
356 Complete {
357 buffer: WeakEntity<Buffer>,
358 change_set_state: Entity<BufferChangeSetState>,
359 },
360 Operations(Vec<Operation>),
361}
362
363pub enum BufferStoreEvent {
364 BufferAdded(Entity<Buffer>),
365 BufferDropped(BufferId),
366 BufferChangedFilePath {
367 buffer: Entity<Buffer>,
368 old_file: Option<Arc<dyn language::File>>,
369 },
370}
371
372#[derive(Default, Debug)]
373pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
374
375impl EventEmitter<BufferStoreEvent> for BufferStore {}
376
377impl RemoteBufferStore {
378 fn open_unstaged_changes(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
379 let project_id = self.project_id;
380 let client = self.upstream_client.clone();
381 cx.background_executor().spawn(async move {
382 let response = client
383 .request(proto::OpenUnstagedChanges {
384 project_id,
385 buffer_id: buffer_id.to_proto(),
386 })
387 .await?;
388 Ok(response.staged_text)
389 })
390 }
391
392 fn open_uncommitted_changes(
393 &self,
394 buffer_id: BufferId,
395 cx: &App,
396 ) -> Task<Result<DiffBasesChange>> {
397 use proto::open_uncommitted_changes_response::Mode;
398
399 let project_id = self.project_id;
400 let client = self.upstream_client.clone();
401 cx.background_executor().spawn(async move {
402 let response = client
403 .request(proto::OpenUncommittedChanges {
404 project_id,
405 buffer_id: buffer_id.to_proto(),
406 })
407 .await?;
408 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
409 let bases = match mode {
410 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.staged_text),
411 Mode::IndexAndHead => DiffBasesChange::SetEach {
412 head: response.committed_text,
413 index: response.staged_text,
414 },
415 };
416 Ok(bases)
417 })
418 }
419
420 pub fn wait_for_remote_buffer(
421 &mut self,
422 id: BufferId,
423 cx: &mut Context<BufferStore>,
424 ) -> Task<Result<Entity<Buffer>>> {
425 let (tx, rx) = oneshot::channel();
426 self.remote_buffer_listeners.entry(id).or_default().push(tx);
427
428 cx.spawn(|this, cx| async move {
429 if let Some(buffer) = this
430 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
431 .ok()
432 .flatten()
433 {
434 return Ok(buffer);
435 }
436
437 cx.background_executor()
438 .spawn(async move { rx.await? })
439 .await
440 })
441 }
442
443 fn save_remote_buffer(
444 &self,
445 buffer_handle: Entity<Buffer>,
446 new_path: Option<proto::ProjectPath>,
447 cx: &Context<BufferStore>,
448 ) -> Task<Result<()>> {
449 let buffer = buffer_handle.read(cx);
450 let buffer_id = buffer.remote_id().into();
451 let version = buffer.version();
452 let rpc = self.upstream_client.clone();
453 let project_id = self.project_id;
454 cx.spawn(move |_, mut cx| async move {
455 let response = rpc
456 .request(proto::SaveBuffer {
457 project_id,
458 buffer_id,
459 new_path,
460 version: serialize_version(&version),
461 })
462 .await?;
463 let version = deserialize_version(&response.version);
464 let mtime = response.mtime.map(|mtime| mtime.into());
465
466 buffer_handle.update(&mut cx, |buffer, cx| {
467 buffer.did_save(version.clone(), mtime, cx);
468 })?;
469
470 Ok(())
471 })
472 }
473
474 pub fn handle_create_buffer_for_peer(
475 &mut self,
476 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
477 replica_id: u16,
478 capability: Capability,
479 cx: &mut Context<BufferStore>,
480 ) -> Result<Option<Entity<Buffer>>> {
481 match envelope
482 .payload
483 .variant
484 .ok_or_else(|| anyhow!("missing variant"))?
485 {
486 proto::create_buffer_for_peer::Variant::State(mut state) => {
487 let buffer_id = BufferId::new(state.id)?;
488
489 let buffer_result = maybe!({
490 let mut buffer_file = None;
491 if let Some(file) = state.file.take() {
492 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
493 let worktree = self
494 .worktree_store
495 .read(cx)
496 .worktree_for_id(worktree_id, cx)
497 .ok_or_else(|| {
498 anyhow!("no worktree found for id {}", file.worktree_id)
499 })?;
500 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
501 as Arc<dyn language::File>);
502 }
503 Buffer::from_proto(replica_id, capability, state, buffer_file)
504 });
505
506 match buffer_result {
507 Ok(buffer) => {
508 let buffer = cx.new(|_| buffer);
509 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
510 }
511 Err(error) => {
512 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
513 for listener in listeners {
514 listener.send(Err(anyhow!(error.cloned()))).ok();
515 }
516 }
517 }
518 }
519 }
520 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
521 let buffer_id = BufferId::new(chunk.buffer_id)?;
522 let buffer = self
523 .loading_remote_buffers_by_id
524 .get(&buffer_id)
525 .cloned()
526 .ok_or_else(|| {
527 anyhow!(
528 "received chunk for buffer {} without initial state",
529 chunk.buffer_id
530 )
531 })?;
532
533 let result = maybe!({
534 let operations = chunk
535 .operations
536 .into_iter()
537 .map(language::proto::deserialize_operation)
538 .collect::<Result<Vec<_>>>()?;
539 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
540 anyhow::Ok(())
541 });
542
543 if let Err(error) = result {
544 self.loading_remote_buffers_by_id.remove(&buffer_id);
545 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
546 for listener in listeners {
547 listener.send(Err(error.cloned())).ok();
548 }
549 }
550 } else if chunk.is_last {
551 self.loading_remote_buffers_by_id.remove(&buffer_id);
552 if self.upstream_client.is_via_collab() {
553 // retain buffers sent by peers to avoid races.
554 self.shared_with_me.insert(buffer.clone());
555 }
556
557 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
558 for sender in senders {
559 sender.send(Ok(buffer.clone())).ok();
560 }
561 }
562 return Ok(Some(buffer));
563 }
564 }
565 }
566 return Ok(None);
567 }
568
569 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
570 self.loading_remote_buffers_by_id
571 .keys()
572 .copied()
573 .collect::<Vec<_>>()
574 }
575
576 pub fn deserialize_project_transaction(
577 &self,
578 message: proto::ProjectTransaction,
579 push_to_history: bool,
580 cx: &mut Context<BufferStore>,
581 ) -> Task<Result<ProjectTransaction>> {
582 cx.spawn(|this, mut cx| async move {
583 let mut project_transaction = ProjectTransaction::default();
584 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
585 {
586 let buffer_id = BufferId::new(buffer_id)?;
587 let buffer = this
588 .update(&mut cx, |this, cx| {
589 this.wait_for_remote_buffer(buffer_id, cx)
590 })?
591 .await?;
592 let transaction = language::proto::deserialize_transaction(transaction)?;
593 project_transaction.0.insert(buffer, transaction);
594 }
595
596 for (buffer, transaction) in &project_transaction.0 {
597 buffer
598 .update(&mut cx, |buffer, _| {
599 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
600 })?
601 .await?;
602
603 if push_to_history {
604 buffer.update(&mut cx, |buffer, _| {
605 buffer.push_transaction(transaction.clone(), Instant::now());
606 })?;
607 }
608 }
609
610 Ok(project_transaction)
611 })
612 }
613
614 fn open_buffer(
615 &self,
616 path: Arc<Path>,
617 worktree: Entity<Worktree>,
618 cx: &mut Context<BufferStore>,
619 ) -> Task<Result<Entity<Buffer>>> {
620 let worktree_id = worktree.read(cx).id().to_proto();
621 let project_id = self.project_id;
622 let client = self.upstream_client.clone();
623 let path_string = path.clone().to_string_lossy().to_string();
624 cx.spawn(move |this, mut cx| async move {
625 let response = client
626 .request(proto::OpenBufferByPath {
627 project_id,
628 worktree_id,
629 path: path_string,
630 })
631 .await?;
632 let buffer_id = BufferId::new(response.buffer_id)?;
633
634 let buffer = this
635 .update(&mut cx, {
636 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
637 })?
638 .await?;
639
640 Ok(buffer)
641 })
642 }
643
644 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
645 let create = self.upstream_client.request(proto::OpenNewBuffer {
646 project_id: self.project_id,
647 });
648 cx.spawn(|this, mut cx| async move {
649 let response = create.await?;
650 let buffer_id = BufferId::new(response.buffer_id)?;
651
652 this.update(&mut cx, |this, cx| {
653 this.wait_for_remote_buffer(buffer_id, cx)
654 })?
655 .await
656 })
657 }
658
659 fn reload_buffers(
660 &self,
661 buffers: HashSet<Entity<Buffer>>,
662 push_to_history: bool,
663 cx: &mut Context<BufferStore>,
664 ) -> Task<Result<ProjectTransaction>> {
665 let request = self.upstream_client.request(proto::ReloadBuffers {
666 project_id: self.project_id,
667 buffer_ids: buffers
668 .iter()
669 .map(|buffer| buffer.read(cx).remote_id().to_proto())
670 .collect(),
671 });
672
673 cx.spawn(|this, mut cx| async move {
674 let response = request
675 .await?
676 .transaction
677 .ok_or_else(|| anyhow!("missing transaction"))?;
678 this.update(&mut cx, |this, cx| {
679 this.deserialize_project_transaction(response, push_to_history, cx)
680 })?
681 .await
682 })
683 }
684}
685
686impl LocalBufferStore {
687 fn worktree_for_buffer(
688 &self,
689 buffer: &Entity<Buffer>,
690 cx: &App,
691 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
692 let file = buffer.read(cx).file()?;
693 let worktree_id = file.worktree_id(cx);
694 let path = file.path().clone();
695 let worktree = self
696 .worktree_store
697 .read(cx)
698 .worktree_for_id(worktree_id, cx)?;
699 Some((worktree, path))
700 }
701
702 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
703 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
704 worktree.read(cx).load_staged_file(path.as_ref(), cx)
705 } else {
706 return Task::ready(Err(anyhow!("no such worktree")));
707 }
708 }
709
710 fn load_committed_text(
711 &self,
712 buffer: &Entity<Buffer>,
713 cx: &App,
714 ) -> Task<Result<Option<String>>> {
715 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
716 worktree.read(cx).load_committed_file(path.as_ref(), cx)
717 } else {
718 Task::ready(Err(anyhow!("no such worktree")))
719 }
720 }
721
722 fn save_local_buffer(
723 &self,
724 buffer_handle: Entity<Buffer>,
725 worktree: Entity<Worktree>,
726 path: Arc<Path>,
727 mut has_changed_file: bool,
728 cx: &mut Context<BufferStore>,
729 ) -> Task<Result<()>> {
730 let buffer = buffer_handle.read(cx);
731
732 let text = buffer.as_rope().clone();
733 let line_ending = buffer.line_ending();
734 let version = buffer.version();
735 let buffer_id = buffer.remote_id();
736 if buffer
737 .file()
738 .is_some_and(|file| file.disk_state() == DiskState::New)
739 {
740 has_changed_file = true;
741 }
742
743 let save = worktree.update(cx, |worktree, cx| {
744 worktree.write_file(path.as_ref(), text, line_ending, cx)
745 });
746
747 cx.spawn(move |this, mut cx| async move {
748 let new_file = save.await?;
749 let mtime = new_file.disk_state().mtime();
750 this.update(&mut cx, |this, cx| {
751 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
752 if has_changed_file {
753 downstream_client
754 .send(proto::UpdateBufferFile {
755 project_id,
756 buffer_id: buffer_id.to_proto(),
757 file: Some(language::File::to_proto(&*new_file, cx)),
758 })
759 .log_err();
760 }
761 downstream_client
762 .send(proto::BufferSaved {
763 project_id,
764 buffer_id: buffer_id.to_proto(),
765 version: serialize_version(&version),
766 mtime: mtime.map(|time| time.into()),
767 })
768 .log_err();
769 }
770 })?;
771 buffer_handle.update(&mut cx, |buffer, cx| {
772 if has_changed_file {
773 buffer.file_updated(new_file, cx);
774 }
775 buffer.did_save(version.clone(), mtime, cx);
776 })
777 })
778 }
779
780 fn subscribe_to_worktree(
781 &mut self,
782 worktree: &Entity<Worktree>,
783 cx: &mut Context<BufferStore>,
784 ) {
785 cx.subscribe(worktree, |this, worktree, event, cx| {
786 if worktree.read(cx).is_local() {
787 match event {
788 worktree::Event::UpdatedEntries(changes) => {
789 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
790 }
791 worktree::Event::UpdatedGitRepositories(updated_repos) => {
792 Self::local_worktree_git_repos_changed(
793 this,
794 worktree.clone(),
795 updated_repos,
796 cx,
797 )
798 }
799 _ => {}
800 }
801 }
802 })
803 .detach();
804 }
805
806 fn local_worktree_entries_changed(
807 this: &mut BufferStore,
808 worktree_handle: &Entity<Worktree>,
809 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
810 cx: &mut Context<BufferStore>,
811 ) {
812 let snapshot = worktree_handle.read(cx).snapshot();
813 for (path, entry_id, _) in changes {
814 Self::local_worktree_entry_changed(
815 this,
816 *entry_id,
817 path,
818 worktree_handle,
819 &snapshot,
820 cx,
821 );
822 }
823 }
824
825 fn local_worktree_git_repos_changed(
826 this: &mut BufferStore,
827 worktree_handle: Entity<Worktree>,
828 changed_repos: &UpdatedGitRepositoriesSet,
829 cx: &mut Context<BufferStore>,
830 ) {
831 debug_assert!(worktree_handle.read(cx).is_local());
832
833 let mut change_set_state_updates = Vec::new();
834 for buffer in this.opened_buffers.values() {
835 let OpenBuffer::Complete {
836 buffer,
837 change_set_state,
838 } = buffer
839 else {
840 continue;
841 };
842 let Some(buffer) = buffer.upgrade() else {
843 continue;
844 };
845 let buffer = buffer.read(cx);
846 let Some(file) = File::from_dyn(buffer.file()) else {
847 continue;
848 };
849 if file.worktree != worktree_handle {
850 continue;
851 }
852 let change_set_state = change_set_state.read(cx);
853 if changed_repos
854 .iter()
855 .any(|(work_dir, _)| file.path.starts_with(work_dir))
856 {
857 let snapshot = buffer.text_snapshot();
858 change_set_state_updates.push((
859 snapshot.clone(),
860 file.path.clone(),
861 change_set_state
862 .unstaged_changes
863 .as_ref()
864 .and_then(|set| set.upgrade())
865 .is_some(),
866 change_set_state
867 .uncommitted_changes
868 .as_ref()
869 .and_then(|set| set.upgrade())
870 .is_some(),
871 ))
872 }
873 }
874
875 if change_set_state_updates.is_empty() {
876 return;
877 }
878
879 cx.spawn(move |this, mut cx| async move {
880 let snapshot =
881 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
882 let diff_bases_changes_by_buffer = cx
883 .background_executor()
884 .spawn(async move {
885 change_set_state_updates
886 .into_iter()
887 .filter_map(
888 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
889 let local_repo = snapshot.local_repo_for_path(&path)?;
890 let relative_path = local_repo.relativize(&path).ok()?;
891 let staged_text = if needs_staged_text {
892 local_repo.repo().load_index_text(&relative_path)
893 } else {
894 None
895 };
896 let committed_text = if needs_committed_text {
897 local_repo.repo().load_committed_text(&relative_path)
898 } else {
899 None
900 };
901 let diff_bases_change =
902 match (needs_staged_text, needs_committed_text) {
903 (true, true) => Some(if staged_text == committed_text {
904 DiffBasesChange::SetBoth(staged_text)
905 } else {
906 DiffBasesChange::SetEach {
907 index: staged_text,
908 head: committed_text,
909 }
910 }),
911 (true, false) => {
912 Some(DiffBasesChange::SetIndex(staged_text))
913 }
914 (false, true) => {
915 Some(DiffBasesChange::SetHead(committed_text))
916 }
917 (false, false) => None,
918 };
919 Some((buffer_snapshot, diff_bases_change))
920 },
921 )
922 .collect::<Vec<_>>()
923 })
924 .await;
925
926 this.update(&mut cx, |this, cx| {
927 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
928 let Some(OpenBuffer::Complete {
929 change_set_state, ..
930 }) = this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
931 else {
932 continue;
933 };
934 let Some(diff_bases_change) = diff_bases_change else {
935 continue;
936 };
937
938 change_set_state.update(cx, |change_set_state, cx| {
939 use proto::update_diff_bases::Mode;
940
941 if let Some((client, project_id)) = this.downstream_client.as_ref() {
942 let buffer_id = buffer_snapshot.remote_id().to_proto();
943 let (staged_text, committed_text, mode) = match diff_bases_change
944 .clone()
945 {
946 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
947 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
948 DiffBasesChange::SetEach { index, head } => {
949 (index, head, Mode::IndexAndHead)
950 }
951 DiffBasesChange::SetBoth(text) => {
952 (text, None, Mode::IndexMatchesHead)
953 }
954 };
955 let message = proto::UpdateDiffBases {
956 project_id: *project_id,
957 buffer_id,
958 staged_text,
959 committed_text,
960 mode: mode as i32,
961 };
962
963 client.send(message).log_err();
964 }
965
966 let _ = change_set_state.diff_bases_changed(
967 buffer_snapshot,
968 diff_bases_change,
969 cx,
970 );
971 });
972 }
973 })
974 })
975 .detach_and_log_err(cx);
976 }
977
978 fn local_worktree_entry_changed(
979 this: &mut BufferStore,
980 entry_id: ProjectEntryId,
981 path: &Arc<Path>,
982 worktree: &Entity<worktree::Worktree>,
983 snapshot: &worktree::Snapshot,
984 cx: &mut Context<BufferStore>,
985 ) -> Option<()> {
986 let project_path = ProjectPath {
987 worktree_id: snapshot.id(),
988 path: path.clone(),
989 };
990
991 let buffer_id = {
992 let local = this.as_local_mut()?;
993 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
994 Some(&buffer_id) => buffer_id,
995 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
996 }
997 };
998
999 let buffer = if let Some(buffer) = this.get(buffer_id) {
1000 Some(buffer)
1001 } else {
1002 this.opened_buffers.remove(&buffer_id);
1003 None
1004 };
1005
1006 let buffer = if let Some(buffer) = buffer {
1007 buffer
1008 } else {
1009 let this = this.as_local_mut()?;
1010 this.local_buffer_ids_by_path.remove(&project_path);
1011 this.local_buffer_ids_by_entry_id.remove(&entry_id);
1012 return None;
1013 };
1014
1015 let events = buffer.update(cx, |buffer, cx| {
1016 let local = this.as_local_mut()?;
1017 let file = buffer.file()?;
1018 let old_file = File::from_dyn(Some(file))?;
1019 if old_file.worktree != *worktree {
1020 return None;
1021 }
1022
1023 let snapshot_entry = old_file
1024 .entry_id
1025 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
1026 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
1027
1028 let new_file = if let Some(entry) = snapshot_entry {
1029 File {
1030 disk_state: match entry.mtime {
1031 Some(mtime) => DiskState::Present { mtime },
1032 None => old_file.disk_state,
1033 },
1034 is_local: true,
1035 entry_id: Some(entry.id),
1036 path: entry.path.clone(),
1037 worktree: worktree.clone(),
1038 is_private: entry.is_private,
1039 }
1040 } else {
1041 File {
1042 disk_state: DiskState::Deleted,
1043 is_local: true,
1044 entry_id: old_file.entry_id,
1045 path: old_file.path.clone(),
1046 worktree: worktree.clone(),
1047 is_private: old_file.is_private,
1048 }
1049 };
1050
1051 if new_file == *old_file {
1052 return None;
1053 }
1054
1055 let mut events = Vec::new();
1056 if new_file.path != old_file.path {
1057 local.local_buffer_ids_by_path.remove(&ProjectPath {
1058 path: old_file.path.clone(),
1059 worktree_id: old_file.worktree_id(cx),
1060 });
1061 local.local_buffer_ids_by_path.insert(
1062 ProjectPath {
1063 worktree_id: new_file.worktree_id(cx),
1064 path: new_file.path.clone(),
1065 },
1066 buffer_id,
1067 );
1068 events.push(BufferStoreEvent::BufferChangedFilePath {
1069 buffer: cx.entity(),
1070 old_file: buffer.file().cloned(),
1071 });
1072 }
1073
1074 if new_file.entry_id != old_file.entry_id {
1075 if let Some(entry_id) = old_file.entry_id {
1076 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1077 }
1078 if let Some(entry_id) = new_file.entry_id {
1079 local
1080 .local_buffer_ids_by_entry_id
1081 .insert(entry_id, buffer_id);
1082 }
1083 }
1084
1085 if let Some((client, project_id)) = &this.downstream_client {
1086 client
1087 .send(proto::UpdateBufferFile {
1088 project_id: *project_id,
1089 buffer_id: buffer_id.to_proto(),
1090 file: Some(new_file.to_proto(cx)),
1091 })
1092 .ok();
1093 }
1094
1095 buffer.file_updated(Arc::new(new_file), cx);
1096 Some(events)
1097 })?;
1098
1099 for event in events {
1100 cx.emit(event);
1101 }
1102
1103 None
1104 }
1105
1106 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1107 let file = File::from_dyn(buffer.read(cx).file())?;
1108
1109 let remote_id = buffer.read(cx).remote_id();
1110 if let Some(entry_id) = file.entry_id {
1111 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1112 Some(_) => {
1113 return None;
1114 }
1115 None => {
1116 self.local_buffer_ids_by_entry_id
1117 .insert(entry_id, remote_id);
1118 }
1119 }
1120 };
1121 self.local_buffer_ids_by_path.insert(
1122 ProjectPath {
1123 worktree_id: file.worktree_id(cx),
1124 path: file.path.clone(),
1125 },
1126 remote_id,
1127 );
1128
1129 Some(())
1130 }
1131
1132 fn save_buffer(
1133 &self,
1134 buffer: Entity<Buffer>,
1135 cx: &mut Context<BufferStore>,
1136 ) -> Task<Result<()>> {
1137 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1138 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1139 };
1140 let worktree = file.worktree.clone();
1141 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1142 }
1143
1144 fn save_buffer_as(
1145 &self,
1146 buffer: Entity<Buffer>,
1147 path: ProjectPath,
1148 cx: &mut Context<BufferStore>,
1149 ) -> Task<Result<()>> {
1150 let Some(worktree) = self
1151 .worktree_store
1152 .read(cx)
1153 .worktree_for_id(path.worktree_id, cx)
1154 else {
1155 return Task::ready(Err(anyhow!("no such worktree")));
1156 };
1157 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1158 }
1159
1160 fn open_buffer(
1161 &self,
1162 path: Arc<Path>,
1163 worktree: Entity<Worktree>,
1164 cx: &mut Context<BufferStore>,
1165 ) -> Task<Result<Entity<Buffer>>> {
1166 let load_buffer = worktree.update(cx, |worktree, cx| {
1167 let load_file = worktree.load_file(path.as_ref(), cx);
1168 let reservation = cx.reserve_entity();
1169 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1170 cx.spawn(move |_, mut cx| async move {
1171 let loaded = load_file.await?;
1172 let text_buffer = cx
1173 .background_executor()
1174 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1175 .await;
1176 cx.insert_entity(reservation, |_| {
1177 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1178 })
1179 })
1180 });
1181
1182 cx.spawn(move |this, mut cx| async move {
1183 let buffer = match load_buffer.await {
1184 Ok(buffer) => Ok(buffer),
1185 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1186 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1187 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1188 Buffer::build(
1189 text_buffer,
1190 Some(Arc::new(File {
1191 worktree,
1192 path,
1193 disk_state: DiskState::New,
1194 entry_id: None,
1195 is_local: true,
1196 is_private: false,
1197 })),
1198 Capability::ReadWrite,
1199 )
1200 }),
1201 Err(e) => Err(e),
1202 }?;
1203 this.update(&mut cx, |this, cx| {
1204 this.add_buffer(buffer.clone(), cx)?;
1205 let buffer_id = buffer.read(cx).remote_id();
1206 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1207 let this = this.as_local_mut().unwrap();
1208 this.local_buffer_ids_by_path.insert(
1209 ProjectPath {
1210 worktree_id: file.worktree_id(cx),
1211 path: file.path.clone(),
1212 },
1213 buffer_id,
1214 );
1215
1216 if let Some(entry_id) = file.entry_id {
1217 this.local_buffer_ids_by_entry_id
1218 .insert(entry_id, buffer_id);
1219 }
1220 }
1221
1222 anyhow::Ok(())
1223 })??;
1224
1225 Ok(buffer)
1226 })
1227 }
1228
1229 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1230 cx.spawn(|buffer_store, mut cx| async move {
1231 let buffer =
1232 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1233 buffer_store.update(&mut cx, |buffer_store, cx| {
1234 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1235 })?;
1236 Ok(buffer)
1237 })
1238 }
1239
1240 fn reload_buffers(
1241 &self,
1242 buffers: HashSet<Entity<Buffer>>,
1243 push_to_history: bool,
1244 cx: &mut Context<BufferStore>,
1245 ) -> Task<Result<ProjectTransaction>> {
1246 cx.spawn(move |_, mut cx| async move {
1247 let mut project_transaction = ProjectTransaction::default();
1248 for buffer in buffers {
1249 let transaction = buffer
1250 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1251 .await?;
1252 buffer.update(&mut cx, |buffer, cx| {
1253 if let Some(transaction) = transaction {
1254 if !push_to_history {
1255 buffer.forget_transaction(transaction.id);
1256 }
1257 project_transaction.0.insert(cx.entity(), transaction);
1258 }
1259 })?;
1260 }
1261
1262 Ok(project_transaction)
1263 })
1264 }
1265}
1266
1267impl BufferStore {
1268 pub fn init(client: &AnyProtoClient) {
1269 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1270 client.add_entity_message_handler(Self::handle_buffer_saved);
1271 client.add_entity_message_handler(Self::handle_update_buffer_file);
1272 client.add_entity_request_handler(Self::handle_save_buffer);
1273 client.add_entity_request_handler(Self::handle_blame_buffer);
1274 client.add_entity_request_handler(Self::handle_reload_buffers);
1275 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1276 client.add_entity_request_handler(Self::handle_open_unstaged_changes);
1277 client.add_entity_request_handler(Self::handle_open_uncommitted_changes);
1278 client.add_entity_message_handler(Self::handle_update_diff_bases);
1279 }
1280
1281 /// Creates a buffer store, optionally retaining its buffers.
1282 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1283 Self {
1284 state: BufferStoreState::Local(LocalBufferStore {
1285 local_buffer_ids_by_path: Default::default(),
1286 local_buffer_ids_by_entry_id: Default::default(),
1287 worktree_store: worktree_store.clone(),
1288 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1289 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1290 let this = this.as_local_mut().unwrap();
1291 this.subscribe_to_worktree(worktree, cx);
1292 }
1293 }),
1294 }),
1295 downstream_client: None,
1296 opened_buffers: Default::default(),
1297 shared_buffers: Default::default(),
1298 loading_buffers: Default::default(),
1299 loading_change_sets: Default::default(),
1300 worktree_store,
1301 }
1302 }
1303
1304 pub fn remote(
1305 worktree_store: Entity<WorktreeStore>,
1306 upstream_client: AnyProtoClient,
1307 remote_id: u64,
1308 _cx: &mut Context<Self>,
1309 ) -> Self {
1310 Self {
1311 state: BufferStoreState::Remote(RemoteBufferStore {
1312 shared_with_me: Default::default(),
1313 loading_remote_buffers_by_id: Default::default(),
1314 remote_buffer_listeners: Default::default(),
1315 project_id: remote_id,
1316 upstream_client,
1317 worktree_store: worktree_store.clone(),
1318 }),
1319 downstream_client: None,
1320 opened_buffers: Default::default(),
1321 loading_buffers: Default::default(),
1322 loading_change_sets: Default::default(),
1323 shared_buffers: Default::default(),
1324 worktree_store,
1325 }
1326 }
1327
1328 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1329 match &mut self.state {
1330 BufferStoreState::Local(state) => Some(state),
1331 _ => None,
1332 }
1333 }
1334
1335 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1336 match &mut self.state {
1337 BufferStoreState::Remote(state) => Some(state),
1338 _ => None,
1339 }
1340 }
1341
1342 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1343 match &self.state {
1344 BufferStoreState::Remote(state) => Some(state),
1345 _ => None,
1346 }
1347 }
1348
1349 pub fn open_buffer(
1350 &mut self,
1351 project_path: ProjectPath,
1352 cx: &mut Context<Self>,
1353 ) -> Task<Result<Entity<Buffer>>> {
1354 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1355 return Task::ready(Ok(buffer));
1356 }
1357
1358 let task = match self.loading_buffers.entry(project_path.clone()) {
1359 hash_map::Entry::Occupied(e) => e.get().clone(),
1360 hash_map::Entry::Vacant(entry) => {
1361 let path = project_path.path.clone();
1362 let Some(worktree) = self
1363 .worktree_store
1364 .read(cx)
1365 .worktree_for_id(project_path.worktree_id, cx)
1366 else {
1367 return Task::ready(Err(anyhow!("no such worktree")));
1368 };
1369 let load_buffer = match &self.state {
1370 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1371 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1372 };
1373
1374 entry
1375 .insert(
1376 cx.spawn(move |this, mut cx| async move {
1377 let load_result = load_buffer.await;
1378 this.update(&mut cx, |this, _cx| {
1379 // Record the fact that the buffer is no longer loading.
1380 this.loading_buffers.remove(&project_path);
1381 })
1382 .ok();
1383 load_result.map_err(Arc::new)
1384 })
1385 .shared(),
1386 )
1387 .clone()
1388 }
1389 };
1390
1391 cx.background_executor()
1392 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1393 }
1394
1395 pub fn open_unstaged_changes(
1396 &mut self,
1397 buffer: Entity<Buffer>,
1398 cx: &mut Context<Self>,
1399 ) -> Task<Result<Entity<BufferChangeSet>>> {
1400 let buffer_id = buffer.read(cx).remote_id();
1401 if let Some(change_set) = self.get_unstaged_changes(buffer_id, cx) {
1402 return Task::ready(Ok(change_set));
1403 }
1404
1405 let task = match self
1406 .loading_change_sets
1407 .entry((buffer_id, ChangeSetKind::Unstaged))
1408 {
1409 hash_map::Entry::Occupied(e) => e.get().clone(),
1410 hash_map::Entry::Vacant(entry) => {
1411 let staged_text = match &self.state {
1412 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1413 BufferStoreState::Remote(this) => this.open_unstaged_changes(buffer_id, cx),
1414 };
1415
1416 entry
1417 .insert(
1418 cx.spawn(move |this, cx| async move {
1419 Self::open_change_set_internal(
1420 this,
1421 ChangeSetKind::Unstaged,
1422 staged_text.await.map(DiffBasesChange::SetIndex),
1423 buffer,
1424 cx,
1425 )
1426 .await
1427 .map_err(Arc::new)
1428 })
1429 .shared(),
1430 )
1431 .clone()
1432 }
1433 };
1434
1435 cx.background_executor()
1436 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1437 }
1438
1439 pub fn open_uncommitted_changes(
1440 &mut self,
1441 buffer: Entity<Buffer>,
1442 cx: &mut Context<Self>,
1443 ) -> Task<Result<Entity<BufferChangeSet>>> {
1444 let buffer_id = buffer.read(cx).remote_id();
1445 if let Some(change_set) = self.get_uncommitted_changes(buffer_id, cx) {
1446 return Task::ready(Ok(change_set));
1447 }
1448
1449 let task = match self
1450 .loading_change_sets
1451 .entry((buffer_id, ChangeSetKind::Uncommitted))
1452 {
1453 hash_map::Entry::Occupied(e) => e.get().clone(),
1454 hash_map::Entry::Vacant(entry) => {
1455 let changes = match &self.state {
1456 BufferStoreState::Local(this) => {
1457 let committed_text = this.load_committed_text(&buffer, cx);
1458 let staged_text = this.load_staged_text(&buffer, cx);
1459 cx.background_executor().spawn(async move {
1460 let committed_text = committed_text.await?;
1461 let staged_text = staged_text.await?;
1462 let diff_bases_change = if committed_text == staged_text {
1463 DiffBasesChange::SetBoth(committed_text)
1464 } else {
1465 DiffBasesChange::SetEach {
1466 index: staged_text,
1467 head: committed_text,
1468 }
1469 };
1470 Ok(diff_bases_change)
1471 })
1472 }
1473 BufferStoreState::Remote(this) => this.open_uncommitted_changes(buffer_id, cx),
1474 };
1475
1476 entry
1477 .insert(
1478 cx.spawn(move |this, cx| async move {
1479 Self::open_change_set_internal(
1480 this,
1481 ChangeSetKind::Uncommitted,
1482 changes.await,
1483 buffer,
1484 cx,
1485 )
1486 .await
1487 .map_err(Arc::new)
1488 })
1489 .shared(),
1490 )
1491 .clone()
1492 }
1493 };
1494
1495 cx.background_executor()
1496 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1497 }
1498
1499 #[cfg(any(test, feature = "test-support"))]
1500 pub fn set_unstaged_change_set(
1501 &mut self,
1502 buffer_id: BufferId,
1503 change_set: Entity<BufferChangeSet>,
1504 ) {
1505 self.loading_change_sets.insert(
1506 (buffer_id, ChangeSetKind::Unstaged),
1507 Task::ready(Ok(change_set)).shared(),
1508 );
1509 }
1510
1511 async fn open_change_set_internal(
1512 this: WeakEntity<Self>,
1513 kind: ChangeSetKind,
1514 texts: Result<DiffBasesChange>,
1515 buffer: Entity<Buffer>,
1516 mut cx: AsyncApp,
1517 ) -> Result<Entity<BufferChangeSet>> {
1518 let diff_bases_change = match texts {
1519 Err(e) => {
1520 this.update(&mut cx, |this, cx| {
1521 let buffer_id = buffer.read(cx).remote_id();
1522 this.loading_change_sets.remove(&(buffer_id, kind));
1523 })?;
1524 return Err(e);
1525 }
1526 Ok(change) => change,
1527 };
1528
1529 this.update(&mut cx, |this, cx| {
1530 let buffer_id = buffer.read(cx).remote_id();
1531 this.loading_change_sets.remove(&(buffer_id, kind));
1532
1533 if let Some(OpenBuffer::Complete {
1534 change_set_state, ..
1535 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1536 {
1537 change_set_state.update(cx, |change_set_state, cx| {
1538 let buffer_id = buffer.read(cx).remote_id();
1539 change_set_state.buffer_subscription.get_or_insert_with(|| {
1540 cx.subscribe(&buffer, |this, buffer, event, cx| match event {
1541 BufferEvent::LanguageChanged => {
1542 this.buffer_language_changed(buffer, cx)
1543 }
1544 _ => {}
1545 })
1546 });
1547
1548 let change_set = cx.new(|cx| BufferChangeSet {
1549 buffer_id,
1550 base_text: None,
1551 diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
1552 unstaged_change_set: None,
1553 });
1554 match kind {
1555 ChangeSetKind::Unstaged => {
1556 change_set_state.unstaged_changes = Some(change_set.downgrade())
1557 }
1558 ChangeSetKind::Uncommitted => {
1559 let unstaged_change_set =
1560 if let Some(change_set) = change_set_state.unstaged_changes() {
1561 change_set
1562 } else {
1563 let unstaged_change_set = cx.new(|cx| BufferChangeSet {
1564 buffer_id,
1565 base_text: None,
1566 diff_to_buffer: BufferDiff::new(
1567 &buffer.read(cx).text_snapshot(),
1568 ),
1569 unstaged_change_set: None,
1570 });
1571 change_set_state.unstaged_changes =
1572 Some(unstaged_change_set.downgrade());
1573 unstaged_change_set
1574 };
1575
1576 change_set.update(cx, |change_set, _| {
1577 change_set.unstaged_change_set = Some(unstaged_change_set);
1578 });
1579 change_set_state.uncommitted_changes = Some(change_set.downgrade())
1580 }
1581 };
1582
1583 let buffer = buffer.read(cx).text_snapshot();
1584 let rx = change_set_state.diff_bases_changed(buffer, diff_bases_change, cx);
1585
1586 Ok(async move {
1587 rx.await.ok();
1588 Ok(change_set)
1589 })
1590 })
1591 } else {
1592 Err(anyhow!("buffer was closed"))
1593 }
1594 })??
1595 .await
1596 }
1597
1598 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1599 match &self.state {
1600 BufferStoreState::Local(this) => this.create_buffer(cx),
1601 BufferStoreState::Remote(this) => this.create_buffer(cx),
1602 }
1603 }
1604
1605 pub fn save_buffer(
1606 &mut self,
1607 buffer: Entity<Buffer>,
1608 cx: &mut Context<Self>,
1609 ) -> Task<Result<()>> {
1610 match &mut self.state {
1611 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1612 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1613 }
1614 }
1615
1616 pub fn save_buffer_as(
1617 &mut self,
1618 buffer: Entity<Buffer>,
1619 path: ProjectPath,
1620 cx: &mut Context<Self>,
1621 ) -> Task<Result<()>> {
1622 let old_file = buffer.read(cx).file().cloned();
1623 let task = match &self.state {
1624 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1625 BufferStoreState::Remote(this) => {
1626 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1627 }
1628 };
1629 cx.spawn(|this, mut cx| async move {
1630 task.await?;
1631 this.update(&mut cx, |_, cx| {
1632 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1633 })
1634 })
1635 }
1636
1637 pub fn blame_buffer(
1638 &self,
1639 buffer: &Entity<Buffer>,
1640 version: Option<clock::Global>,
1641 cx: &App,
1642 ) -> Task<Result<Option<Blame>>> {
1643 let buffer = buffer.read(cx);
1644 let Some(file) = File::from_dyn(buffer.file()) else {
1645 return Task::ready(Err(anyhow!("buffer has no file")));
1646 };
1647
1648 match file.worktree.clone().read(cx) {
1649 Worktree::Local(worktree) => {
1650 let worktree = worktree.snapshot();
1651 let blame_params = maybe!({
1652 let local_repo = match worktree.local_repo_for_path(&file.path) {
1653 Some(repo_for_path) => repo_for_path,
1654 None => return Ok(None),
1655 };
1656
1657 let relative_path = local_repo
1658 .relativize(&file.path)
1659 .context("failed to relativize buffer path")?;
1660
1661 let repo = local_repo.repo().clone();
1662
1663 let content = match version {
1664 Some(version) => buffer.rope_for_version(&version).clone(),
1665 None => buffer.as_rope().clone(),
1666 };
1667
1668 anyhow::Ok(Some((repo, relative_path, content)))
1669 });
1670
1671 cx.background_executor().spawn(async move {
1672 let Some((repo, relative_path, content)) = blame_params? else {
1673 return Ok(None);
1674 };
1675 repo.blame(&relative_path, content)
1676 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1677 .map(Some)
1678 })
1679 }
1680 Worktree::Remote(worktree) => {
1681 let buffer_id = buffer.remote_id();
1682 let version = buffer.version();
1683 let project_id = worktree.project_id();
1684 let client = worktree.client();
1685 cx.spawn(|_| async move {
1686 let response = client
1687 .request(proto::BlameBuffer {
1688 project_id,
1689 buffer_id: buffer_id.into(),
1690 version: serialize_version(&version),
1691 })
1692 .await?;
1693 Ok(deserialize_blame_buffer_response(response))
1694 })
1695 }
1696 }
1697 }
1698
1699 pub fn get_permalink_to_line(
1700 &self,
1701 buffer: &Entity<Buffer>,
1702 selection: Range<u32>,
1703 cx: &App,
1704 ) -> Task<Result<url::Url>> {
1705 let buffer = buffer.read(cx);
1706 let Some(file) = File::from_dyn(buffer.file()) else {
1707 return Task::ready(Err(anyhow!("buffer has no file")));
1708 };
1709
1710 match file.worktree.read(cx) {
1711 Worktree::Local(worktree) => {
1712 let worktree_path = worktree.abs_path().clone();
1713 let Some((repo_entry, repo)) =
1714 worktree.repository_for_path(file.path()).and_then(|entry| {
1715 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1716 Some((entry, repo))
1717 })
1718 else {
1719 // If we're not in a Git repo, check whether this is a Rust source
1720 // file in the Cargo registry (presumably opened with go-to-definition
1721 // from a normal Rust file). If so, we can put together a permalink
1722 // using crate metadata.
1723 if !buffer
1724 .language()
1725 .is_some_and(|lang| lang.name() == "Rust".into())
1726 {
1727 return Task::ready(Err(anyhow!("no permalink available")));
1728 }
1729 let file_path = worktree_path.join(file.path());
1730 return cx.spawn(|cx| async move {
1731 let provider_registry =
1732 cx.update(GitHostingProviderRegistry::default_global)?;
1733 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1734 .map_err(|_| anyhow!("no permalink available"))
1735 });
1736 };
1737
1738 let path = match repo_entry.relativize(file.path()) {
1739 Ok(RepoPath(path)) => path,
1740 Err(e) => return Task::ready(Err(e)),
1741 };
1742
1743 cx.spawn(|cx| async move {
1744 const REMOTE_NAME: &str = "origin";
1745 let origin_url = repo
1746 .remote_url(REMOTE_NAME)
1747 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1748
1749 let sha = repo
1750 .head_sha()
1751 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1752
1753 let provider_registry =
1754 cx.update(GitHostingProviderRegistry::default_global)?;
1755
1756 let (provider, remote) =
1757 parse_git_remote_url(provider_registry, &origin_url)
1758 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1759
1760 let path = path
1761 .to_str()
1762 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1763
1764 Ok(provider.build_permalink(
1765 remote,
1766 BuildPermalinkParams {
1767 sha: &sha,
1768 path,
1769 selection: Some(selection),
1770 },
1771 ))
1772 })
1773 }
1774 Worktree::Remote(worktree) => {
1775 let buffer_id = buffer.remote_id();
1776 let project_id = worktree.project_id();
1777 let client = worktree.client();
1778 cx.spawn(|_| async move {
1779 let response = client
1780 .request(proto::GetPermalinkToLine {
1781 project_id,
1782 buffer_id: buffer_id.into(),
1783 selection: Some(proto::Range {
1784 start: selection.start as u64,
1785 end: selection.end as u64,
1786 }),
1787 })
1788 .await?;
1789
1790 url::Url::parse(&response.permalink).context("failed to parse permalink")
1791 })
1792 }
1793 }
1794 }
1795
1796 fn add_buffer(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1797 let remote_id = buffer.read(cx).remote_id();
1798 let is_remote = buffer.read(cx).replica_id() != 0;
1799 let open_buffer = OpenBuffer::Complete {
1800 buffer: buffer.downgrade(),
1801 change_set_state: cx.new(|_| BufferChangeSetState::default()),
1802 };
1803
1804 let handle = cx.entity().downgrade();
1805 buffer.update(cx, move |_, cx| {
1806 cx.on_release(move |buffer, cx| {
1807 handle
1808 .update(cx, |_, cx| {
1809 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1810 })
1811 .ok();
1812 })
1813 .detach()
1814 });
1815
1816 match self.opened_buffers.entry(remote_id) {
1817 hash_map::Entry::Vacant(entry) => {
1818 entry.insert(open_buffer);
1819 }
1820 hash_map::Entry::Occupied(mut entry) => {
1821 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1822 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1823 } else if entry.get().upgrade().is_some() {
1824 if is_remote {
1825 return Ok(());
1826 } else {
1827 debug_panic!("buffer {} was already registered", remote_id);
1828 Err(anyhow!("buffer {} was already registered", remote_id))?;
1829 }
1830 }
1831 entry.insert(open_buffer);
1832 }
1833 }
1834
1835 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1836 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1837 Ok(())
1838 }
1839
1840 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1841 self.opened_buffers
1842 .values()
1843 .filter_map(|buffer| buffer.upgrade())
1844 }
1845
1846 pub fn loading_buffers(
1847 &self,
1848 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1849 self.loading_buffers.iter().map(|(path, task)| {
1850 let task = task.clone();
1851 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1852 })
1853 }
1854
1855 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1856 self.buffers().find_map(|buffer| {
1857 let file = File::from_dyn(buffer.read(cx).file())?;
1858 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1859 Some(buffer)
1860 } else {
1861 None
1862 }
1863 })
1864 }
1865
1866 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1867 self.opened_buffers.get(&buffer_id)?.upgrade()
1868 }
1869
1870 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1871 self.get(buffer_id)
1872 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1873 }
1874
1875 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1876 self.get(buffer_id).or_else(|| {
1877 self.as_remote()
1878 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1879 })
1880 }
1881
1882 pub fn get_unstaged_changes(
1883 &self,
1884 buffer_id: BufferId,
1885 cx: &App,
1886 ) -> Option<Entity<BufferChangeSet>> {
1887 if let OpenBuffer::Complete {
1888 change_set_state, ..
1889 } = self.opened_buffers.get(&buffer_id)?
1890 {
1891 change_set_state
1892 .read(cx)
1893 .unstaged_changes
1894 .as_ref()?
1895 .upgrade()
1896 } else {
1897 None
1898 }
1899 }
1900
1901 pub fn get_uncommitted_changes(
1902 &self,
1903 buffer_id: BufferId,
1904 cx: &App,
1905 ) -> Option<Entity<BufferChangeSet>> {
1906 if let OpenBuffer::Complete {
1907 change_set_state, ..
1908 } = self.opened_buffers.get(&buffer_id)?
1909 {
1910 change_set_state
1911 .read(cx)
1912 .uncommitted_changes
1913 .as_ref()?
1914 .upgrade()
1915 } else {
1916 None
1917 }
1918 }
1919
1920 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1921 let buffers = self
1922 .buffers()
1923 .map(|buffer| {
1924 let buffer = buffer.read(cx);
1925 proto::BufferVersion {
1926 id: buffer.remote_id().into(),
1927 version: language::proto::serialize_version(&buffer.version),
1928 }
1929 })
1930 .collect();
1931 let incomplete_buffer_ids = self
1932 .as_remote()
1933 .map(|remote| remote.incomplete_buffer_ids())
1934 .unwrap_or_default();
1935 (buffers, incomplete_buffer_ids)
1936 }
1937
1938 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1939 for open_buffer in self.opened_buffers.values_mut() {
1940 if let Some(buffer) = open_buffer.upgrade() {
1941 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1942 }
1943 }
1944
1945 for buffer in self.buffers() {
1946 buffer.update(cx, |buffer, cx| {
1947 buffer.set_capability(Capability::ReadOnly, cx)
1948 });
1949 }
1950
1951 if let Some(remote) = self.as_remote_mut() {
1952 // Wake up all futures currently waiting on a buffer to get opened,
1953 // to give them a chance to fail now that we've disconnected.
1954 remote.remote_buffer_listeners.clear()
1955 }
1956 }
1957
1958 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1959 self.downstream_client = Some((downstream_client, remote_id));
1960 }
1961
1962 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1963 self.downstream_client.take();
1964 self.forget_shared_buffers();
1965 }
1966
1967 pub fn discard_incomplete(&mut self) {
1968 self.opened_buffers
1969 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1970 }
1971
1972 pub fn find_search_candidates(
1973 &mut self,
1974 query: &SearchQuery,
1975 mut limit: usize,
1976 fs: Arc<dyn Fs>,
1977 cx: &mut Context<Self>,
1978 ) -> Receiver<Entity<Buffer>> {
1979 let (tx, rx) = smol::channel::unbounded();
1980 let mut open_buffers = HashSet::default();
1981 let mut unnamed_buffers = Vec::new();
1982 for handle in self.buffers() {
1983 let buffer = handle.read(cx);
1984 if let Some(entry_id) = buffer.entry_id(cx) {
1985 open_buffers.insert(entry_id);
1986 } else {
1987 limit = limit.saturating_sub(1);
1988 unnamed_buffers.push(handle)
1989 };
1990 }
1991
1992 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1993 let project_paths_rx = self
1994 .worktree_store
1995 .update(cx, |worktree_store, cx| {
1996 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1997 })
1998 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1999
2000 cx.spawn(|this, mut cx| async move {
2001 for buffer in unnamed_buffers {
2002 tx.send(buffer).await.ok();
2003 }
2004
2005 let mut project_paths_rx = pin!(project_paths_rx);
2006 while let Some(project_paths) = project_paths_rx.next().await {
2007 let buffers = this.update(&mut cx, |this, cx| {
2008 project_paths
2009 .into_iter()
2010 .map(|project_path| this.open_buffer(project_path, cx))
2011 .collect::<Vec<_>>()
2012 })?;
2013 for buffer_task in buffers {
2014 if let Some(buffer) = buffer_task.await.log_err() {
2015 if tx.send(buffer).await.is_err() {
2016 return anyhow::Ok(());
2017 }
2018 }
2019 }
2020 }
2021 anyhow::Ok(())
2022 })
2023 .detach();
2024 rx
2025 }
2026
2027 pub fn recalculate_buffer_diffs(
2028 &mut self,
2029 buffers: Vec<Entity<Buffer>>,
2030 cx: &mut Context<Self>,
2031 ) -> impl Future<Output = ()> {
2032 let mut futures = Vec::new();
2033 for buffer in buffers {
2034 if let Some(OpenBuffer::Complete {
2035 change_set_state, ..
2036 }) = self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
2037 {
2038 let buffer = buffer.read(cx).text_snapshot();
2039 futures.push(change_set_state.update(cx, |change_set_state, cx| {
2040 change_set_state.recalculate_diffs(buffer, cx)
2041 }));
2042 }
2043 }
2044 async move {
2045 futures::future::join_all(futures).await;
2046 }
2047 }
2048
2049 fn on_buffer_event(
2050 &mut self,
2051 buffer: Entity<Buffer>,
2052 event: &BufferEvent,
2053 cx: &mut Context<Self>,
2054 ) {
2055 match event {
2056 BufferEvent::FileHandleChanged => {
2057 if let Some(local) = self.as_local_mut() {
2058 local.buffer_changed_file(buffer, cx);
2059 }
2060 }
2061 BufferEvent::Reloaded => {
2062 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
2063 return;
2064 };
2065 let buffer = buffer.read(cx);
2066 downstream_client
2067 .send(proto::BufferReloaded {
2068 project_id: *project_id,
2069 buffer_id: buffer.remote_id().to_proto(),
2070 version: serialize_version(&buffer.version()),
2071 mtime: buffer.saved_mtime().map(|t| t.into()),
2072 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
2073 })
2074 .log_err();
2075 }
2076 _ => {}
2077 }
2078 }
2079
2080 pub async fn handle_update_buffer(
2081 this: Entity<Self>,
2082 envelope: TypedEnvelope<proto::UpdateBuffer>,
2083 mut cx: AsyncApp,
2084 ) -> Result<proto::Ack> {
2085 let payload = envelope.payload.clone();
2086 let buffer_id = BufferId::new(payload.buffer_id)?;
2087 let ops = payload
2088 .operations
2089 .into_iter()
2090 .map(language::proto::deserialize_operation)
2091 .collect::<Result<Vec<_>, _>>()?;
2092 this.update(&mut cx, |this, cx| {
2093 match this.opened_buffers.entry(buffer_id) {
2094 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2095 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2096 OpenBuffer::Complete { buffer, .. } => {
2097 if let Some(buffer) = buffer.upgrade() {
2098 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2099 }
2100 }
2101 },
2102 hash_map::Entry::Vacant(e) => {
2103 e.insert(OpenBuffer::Operations(ops));
2104 }
2105 }
2106 Ok(proto::Ack {})
2107 })?
2108 }
2109
2110 pub fn register_shared_lsp_handle(
2111 &mut self,
2112 peer_id: proto::PeerId,
2113 buffer_id: BufferId,
2114 handle: OpenLspBufferHandle,
2115 ) {
2116 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2117 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2118 buffer.lsp_handle = Some(handle);
2119 return;
2120 }
2121 }
2122 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2123 }
2124
2125 pub fn handle_synchronize_buffers(
2126 &mut self,
2127 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2128 cx: &mut Context<Self>,
2129 client: Arc<Client>,
2130 ) -> Result<proto::SynchronizeBuffersResponse> {
2131 let project_id = envelope.payload.project_id;
2132 let mut response = proto::SynchronizeBuffersResponse {
2133 buffers: Default::default(),
2134 };
2135 let Some(guest_id) = envelope.original_sender_id else {
2136 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2137 };
2138
2139 self.shared_buffers.entry(guest_id).or_default().clear();
2140 for buffer in envelope.payload.buffers {
2141 let buffer_id = BufferId::new(buffer.id)?;
2142 let remote_version = language::proto::deserialize_version(&buffer.version);
2143 if let Some(buffer) = self.get(buffer_id) {
2144 self.shared_buffers
2145 .entry(guest_id)
2146 .or_default()
2147 .entry(buffer_id)
2148 .or_insert_with(|| SharedBuffer {
2149 buffer: buffer.clone(),
2150 change_set: None,
2151 lsp_handle: None,
2152 });
2153
2154 let buffer = buffer.read(cx);
2155 response.buffers.push(proto::BufferVersion {
2156 id: buffer_id.into(),
2157 version: language::proto::serialize_version(&buffer.version),
2158 });
2159
2160 let operations = buffer.serialize_ops(Some(remote_version), cx);
2161 let client = client.clone();
2162 if let Some(file) = buffer.file() {
2163 client
2164 .send(proto::UpdateBufferFile {
2165 project_id,
2166 buffer_id: buffer_id.into(),
2167 file: Some(file.to_proto(cx)),
2168 })
2169 .log_err();
2170 }
2171
2172 // TODO(max): do something
2173 // client
2174 // .send(proto::UpdateStagedText {
2175 // project_id,
2176 // buffer_id: buffer_id.into(),
2177 // diff_base: buffer.diff_base().map(ToString::to_string),
2178 // })
2179 // .log_err();
2180
2181 client
2182 .send(proto::BufferReloaded {
2183 project_id,
2184 buffer_id: buffer_id.into(),
2185 version: language::proto::serialize_version(buffer.saved_version()),
2186 mtime: buffer.saved_mtime().map(|time| time.into()),
2187 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2188 as i32,
2189 })
2190 .log_err();
2191
2192 cx.background_executor()
2193 .spawn(
2194 async move {
2195 let operations = operations.await;
2196 for chunk in split_operations(operations) {
2197 client
2198 .request(proto::UpdateBuffer {
2199 project_id,
2200 buffer_id: buffer_id.into(),
2201 operations: chunk,
2202 })
2203 .await?;
2204 }
2205 anyhow::Ok(())
2206 }
2207 .log_err(),
2208 )
2209 .detach();
2210 }
2211 }
2212 Ok(response)
2213 }
2214
2215 pub fn handle_create_buffer_for_peer(
2216 &mut self,
2217 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2218 replica_id: u16,
2219 capability: Capability,
2220 cx: &mut Context<Self>,
2221 ) -> Result<()> {
2222 let Some(remote) = self.as_remote_mut() else {
2223 return Err(anyhow!("buffer store is not a remote"));
2224 };
2225
2226 if let Some(buffer) =
2227 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2228 {
2229 self.add_buffer(buffer, cx)?;
2230 }
2231
2232 Ok(())
2233 }
2234
2235 pub async fn handle_update_buffer_file(
2236 this: Entity<Self>,
2237 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2238 mut cx: AsyncApp,
2239 ) -> Result<()> {
2240 let buffer_id = envelope.payload.buffer_id;
2241 let buffer_id = BufferId::new(buffer_id)?;
2242
2243 this.update(&mut cx, |this, cx| {
2244 let payload = envelope.payload.clone();
2245 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2246 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2247 let worktree = this
2248 .worktree_store
2249 .read(cx)
2250 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2251 .ok_or_else(|| anyhow!("no such worktree"))?;
2252 let file = File::from_proto(file, worktree, cx)?;
2253 let old_file = buffer.update(cx, |buffer, cx| {
2254 let old_file = buffer.file().cloned();
2255 let new_path = file.path.clone();
2256 buffer.file_updated(Arc::new(file), cx);
2257 if old_file
2258 .as_ref()
2259 .map_or(true, |old| *old.path() != new_path)
2260 {
2261 Some(old_file)
2262 } else {
2263 None
2264 }
2265 });
2266 if let Some(old_file) = old_file {
2267 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2268 }
2269 }
2270 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2271 downstream_client
2272 .send(proto::UpdateBufferFile {
2273 project_id: *project_id,
2274 buffer_id: buffer_id.into(),
2275 file: envelope.payload.file,
2276 })
2277 .log_err();
2278 }
2279 Ok(())
2280 })?
2281 }
2282
2283 pub async fn handle_save_buffer(
2284 this: Entity<Self>,
2285 envelope: TypedEnvelope<proto::SaveBuffer>,
2286 mut cx: AsyncApp,
2287 ) -> Result<proto::BufferSaved> {
2288 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2289 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2290 anyhow::Ok((
2291 this.get_existing(buffer_id)?,
2292 this.downstream_client
2293 .as_ref()
2294 .map(|(_, project_id)| *project_id)
2295 .context("project is not shared")?,
2296 ))
2297 })??;
2298 buffer
2299 .update(&mut cx, |buffer, _| {
2300 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2301 })?
2302 .await?;
2303 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2304
2305 if let Some(new_path) = envelope.payload.new_path {
2306 let new_path = ProjectPath::from_proto(new_path);
2307 this.update(&mut cx, |this, cx| {
2308 this.save_buffer_as(buffer.clone(), new_path, cx)
2309 })?
2310 .await?;
2311 } else {
2312 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2313 .await?;
2314 }
2315
2316 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2317 project_id,
2318 buffer_id: buffer_id.into(),
2319 version: serialize_version(buffer.saved_version()),
2320 mtime: buffer.saved_mtime().map(|time| time.into()),
2321 })
2322 }
2323
2324 pub async fn handle_close_buffer(
2325 this: Entity<Self>,
2326 envelope: TypedEnvelope<proto::CloseBuffer>,
2327 mut cx: AsyncApp,
2328 ) -> Result<()> {
2329 let peer_id = envelope.sender_id;
2330 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2331 this.update(&mut cx, |this, _| {
2332 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2333 if shared.remove(&buffer_id).is_some() {
2334 if shared.is_empty() {
2335 this.shared_buffers.remove(&peer_id);
2336 }
2337 return;
2338 }
2339 }
2340 debug_panic!(
2341 "peer_id {} closed buffer_id {} which was either not open or already closed",
2342 peer_id,
2343 buffer_id
2344 )
2345 })
2346 }
2347
2348 pub async fn handle_buffer_saved(
2349 this: Entity<Self>,
2350 envelope: TypedEnvelope<proto::BufferSaved>,
2351 mut cx: AsyncApp,
2352 ) -> Result<()> {
2353 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2354 let version = deserialize_version(&envelope.payload.version);
2355 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2356 this.update(&mut cx, move |this, cx| {
2357 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2358 buffer.update(cx, |buffer, cx| {
2359 buffer.did_save(version, mtime, cx);
2360 });
2361 }
2362
2363 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2364 downstream_client
2365 .send(proto::BufferSaved {
2366 project_id: *project_id,
2367 buffer_id: buffer_id.into(),
2368 mtime: envelope.payload.mtime,
2369 version: envelope.payload.version,
2370 })
2371 .log_err();
2372 }
2373 })
2374 }
2375
2376 pub async fn handle_buffer_reloaded(
2377 this: Entity<Self>,
2378 envelope: TypedEnvelope<proto::BufferReloaded>,
2379 mut cx: AsyncApp,
2380 ) -> Result<()> {
2381 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2382 let version = deserialize_version(&envelope.payload.version);
2383 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2384 let line_ending = deserialize_line_ending(
2385 proto::LineEnding::from_i32(envelope.payload.line_ending)
2386 .ok_or_else(|| anyhow!("missing line ending"))?,
2387 );
2388 this.update(&mut cx, |this, cx| {
2389 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2390 buffer.update(cx, |buffer, cx| {
2391 buffer.did_reload(version, line_ending, mtime, cx);
2392 });
2393 }
2394
2395 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2396 downstream_client
2397 .send(proto::BufferReloaded {
2398 project_id: *project_id,
2399 buffer_id: buffer_id.into(),
2400 mtime: envelope.payload.mtime,
2401 version: envelope.payload.version,
2402 line_ending: envelope.payload.line_ending,
2403 })
2404 .log_err();
2405 }
2406 })
2407 }
2408
2409 pub async fn handle_blame_buffer(
2410 this: Entity<Self>,
2411 envelope: TypedEnvelope<proto::BlameBuffer>,
2412 mut cx: AsyncApp,
2413 ) -> Result<proto::BlameBufferResponse> {
2414 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2415 let version = deserialize_version(&envelope.payload.version);
2416 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2417 buffer
2418 .update(&mut cx, |buffer, _| {
2419 buffer.wait_for_version(version.clone())
2420 })?
2421 .await?;
2422 let blame = this
2423 .update(&mut cx, |this, cx| {
2424 this.blame_buffer(&buffer, Some(version), cx)
2425 })?
2426 .await?;
2427 Ok(serialize_blame_buffer_response(blame))
2428 }
2429
2430 pub async fn handle_get_permalink_to_line(
2431 this: Entity<Self>,
2432 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2433 mut cx: AsyncApp,
2434 ) -> Result<proto::GetPermalinkToLineResponse> {
2435 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2436 // let version = deserialize_version(&envelope.payload.version);
2437 let selection = {
2438 let proto_selection = envelope
2439 .payload
2440 .selection
2441 .context("no selection to get permalink for defined")?;
2442 proto_selection.start as u32..proto_selection.end as u32
2443 };
2444 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2445 let permalink = this
2446 .update(&mut cx, |this, cx| {
2447 this.get_permalink_to_line(&buffer, selection, cx)
2448 })?
2449 .await?;
2450 Ok(proto::GetPermalinkToLineResponse {
2451 permalink: permalink.to_string(),
2452 })
2453 }
2454
2455 pub async fn handle_open_unstaged_changes(
2456 this: Entity<Self>,
2457 request: TypedEnvelope<proto::OpenUnstagedChanges>,
2458 mut cx: AsyncApp,
2459 ) -> Result<proto::OpenUnstagedChangesResponse> {
2460 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2461 let change_set = this
2462 .update(&mut cx, |this, cx| {
2463 let buffer = this.get(buffer_id)?;
2464 Some(this.open_unstaged_changes(buffer, cx))
2465 })?
2466 .ok_or_else(|| anyhow!("no such buffer"))?
2467 .await?;
2468 this.update(&mut cx, |this, _| {
2469 let shared_buffers = this
2470 .shared_buffers
2471 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2472 .or_default();
2473 debug_assert!(shared_buffers.contains_key(&buffer_id));
2474 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2475 shared.change_set = Some(change_set.clone());
2476 }
2477 })?;
2478 let staged_text = change_set.read_with(&cx, |change_set, _| {
2479 change_set.base_text.as_ref().map(|buffer| buffer.text())
2480 })?;
2481 Ok(proto::OpenUnstagedChangesResponse { staged_text })
2482 }
2483
2484 pub async fn handle_open_uncommitted_changes(
2485 this: Entity<Self>,
2486 request: TypedEnvelope<proto::OpenUncommittedChanges>,
2487 mut cx: AsyncApp,
2488 ) -> Result<proto::OpenUncommittedChangesResponse> {
2489 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2490 let change_set = this
2491 .update(&mut cx, |this, cx| {
2492 let buffer = this.get(buffer_id)?;
2493 Some(this.open_uncommitted_changes(buffer, cx))
2494 })?
2495 .ok_or_else(|| anyhow!("no such buffer"))?
2496 .await?;
2497 this.update(&mut cx, |this, _| {
2498 let shared_buffers = this
2499 .shared_buffers
2500 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2501 .or_default();
2502 debug_assert!(shared_buffers.contains_key(&buffer_id));
2503 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2504 shared.change_set = Some(change_set.clone());
2505 }
2506 })?;
2507 change_set.read_with(&cx, |change_set, cx| {
2508 use proto::open_uncommitted_changes_response::Mode;
2509
2510 let staged_buffer = change_set
2511 .unstaged_change_set
2512 .as_ref()
2513 .and_then(|change_set| change_set.read(cx).base_text.as_ref());
2514
2515 let mode;
2516 let staged_text;
2517 let committed_text;
2518 if let Some(committed_buffer) = &change_set.base_text {
2519 committed_text = Some(committed_buffer.text());
2520 if let Some(staged_buffer) = staged_buffer {
2521 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2522 mode = Mode::IndexMatchesHead;
2523 staged_text = None;
2524 } else {
2525 mode = Mode::IndexAndHead;
2526 staged_text = Some(staged_buffer.text());
2527 }
2528 } else {
2529 mode = Mode::IndexAndHead;
2530 staged_text = None;
2531 }
2532 } else {
2533 mode = Mode::IndexAndHead;
2534 committed_text = None;
2535 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2536 }
2537
2538 proto::OpenUncommittedChangesResponse {
2539 committed_text,
2540 staged_text,
2541 mode: mode.into(),
2542 }
2543 })
2544 }
2545
2546 pub async fn handle_update_diff_bases(
2547 this: Entity<Self>,
2548 request: TypedEnvelope<proto::UpdateDiffBases>,
2549 mut cx: AsyncApp,
2550 ) -> Result<()> {
2551 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2552 this.update(&mut cx, |this, cx| {
2553 if let Some(OpenBuffer::Complete {
2554 change_set_state,
2555 buffer,
2556 }) = this.opened_buffers.get_mut(&buffer_id)
2557 {
2558 if let Some(buffer) = buffer.upgrade() {
2559 let buffer = buffer.read(cx).text_snapshot();
2560 change_set_state.update(cx, |change_set_state, cx| {
2561 change_set_state.handle_base_texts_updated(buffer, request.payload, cx);
2562 })
2563 }
2564 }
2565 })
2566 }
2567
2568 pub fn reload_buffers(
2569 &self,
2570 buffers: HashSet<Entity<Buffer>>,
2571 push_to_history: bool,
2572 cx: &mut Context<Self>,
2573 ) -> Task<Result<ProjectTransaction>> {
2574 if buffers.is_empty() {
2575 return Task::ready(Ok(ProjectTransaction::default()));
2576 }
2577 match &self.state {
2578 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2579 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2580 }
2581 }
2582
2583 async fn handle_reload_buffers(
2584 this: Entity<Self>,
2585 envelope: TypedEnvelope<proto::ReloadBuffers>,
2586 mut cx: AsyncApp,
2587 ) -> Result<proto::ReloadBuffersResponse> {
2588 let sender_id = envelope.original_sender_id().unwrap_or_default();
2589 let reload = this.update(&mut cx, |this, cx| {
2590 let mut buffers = HashSet::default();
2591 for buffer_id in &envelope.payload.buffer_ids {
2592 let buffer_id = BufferId::new(*buffer_id)?;
2593 buffers.insert(this.get_existing(buffer_id)?);
2594 }
2595 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2596 })??;
2597
2598 let project_transaction = reload.await?;
2599 let project_transaction = this.update(&mut cx, |this, cx| {
2600 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2601 })?;
2602 Ok(proto::ReloadBuffersResponse {
2603 transaction: Some(project_transaction),
2604 })
2605 }
2606
2607 pub fn create_buffer_for_peer(
2608 &mut self,
2609 buffer: &Entity<Buffer>,
2610 peer_id: proto::PeerId,
2611 cx: &mut Context<Self>,
2612 ) -> Task<Result<()>> {
2613 let buffer_id = buffer.read(cx).remote_id();
2614 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2615 if shared_buffers.contains_key(&buffer_id) {
2616 return Task::ready(Ok(()));
2617 }
2618 shared_buffers.insert(
2619 buffer_id,
2620 SharedBuffer {
2621 buffer: buffer.clone(),
2622 change_set: None,
2623 lsp_handle: None,
2624 },
2625 );
2626
2627 let Some((client, project_id)) = self.downstream_client.clone() else {
2628 return Task::ready(Ok(()));
2629 };
2630
2631 cx.spawn(|this, mut cx| async move {
2632 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2633 return anyhow::Ok(());
2634 };
2635
2636 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2637 let operations = operations.await;
2638 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2639
2640 let initial_state = proto::CreateBufferForPeer {
2641 project_id,
2642 peer_id: Some(peer_id),
2643 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2644 };
2645
2646 if client.send(initial_state).log_err().is_some() {
2647 let client = client.clone();
2648 cx.background_executor()
2649 .spawn(async move {
2650 let mut chunks = split_operations(operations).peekable();
2651 while let Some(chunk) = chunks.next() {
2652 let is_last = chunks.peek().is_none();
2653 client.send(proto::CreateBufferForPeer {
2654 project_id,
2655 peer_id: Some(peer_id),
2656 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2657 proto::BufferChunk {
2658 buffer_id: buffer_id.into(),
2659 operations: chunk,
2660 is_last,
2661 },
2662 )),
2663 })?;
2664 }
2665 anyhow::Ok(())
2666 })
2667 .await
2668 .log_err();
2669 }
2670 Ok(())
2671 })
2672 }
2673
2674 pub fn forget_shared_buffers(&mut self) {
2675 self.shared_buffers.clear();
2676 }
2677
2678 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2679 self.shared_buffers.remove(peer_id);
2680 }
2681
2682 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2683 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2684 self.shared_buffers.insert(new_peer_id, buffers);
2685 }
2686 }
2687
2688 pub fn has_shared_buffers(&self) -> bool {
2689 !self.shared_buffers.is_empty()
2690 }
2691
2692 pub fn create_local_buffer(
2693 &mut self,
2694 text: &str,
2695 language: Option<Arc<Language>>,
2696 cx: &mut Context<Self>,
2697 ) -> Entity<Buffer> {
2698 let buffer = cx.new(|cx| {
2699 Buffer::local(text, cx)
2700 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2701 });
2702
2703 self.add_buffer(buffer.clone(), cx).log_err();
2704 let buffer_id = buffer.read(cx).remote_id();
2705
2706 let this = self
2707 .as_local_mut()
2708 .expect("local-only method called in a non-local context");
2709 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2710 this.local_buffer_ids_by_path.insert(
2711 ProjectPath {
2712 worktree_id: file.worktree_id(cx),
2713 path: file.path.clone(),
2714 },
2715 buffer_id,
2716 );
2717
2718 if let Some(entry_id) = file.entry_id {
2719 this.local_buffer_ids_by_entry_id
2720 .insert(entry_id, buffer_id);
2721 }
2722 }
2723 buffer
2724 }
2725
2726 pub fn deserialize_project_transaction(
2727 &mut self,
2728 message: proto::ProjectTransaction,
2729 push_to_history: bool,
2730 cx: &mut Context<Self>,
2731 ) -> Task<Result<ProjectTransaction>> {
2732 if let Some(this) = self.as_remote_mut() {
2733 this.deserialize_project_transaction(message, push_to_history, cx)
2734 } else {
2735 debug_panic!("not a remote buffer store");
2736 Task::ready(Err(anyhow!("not a remote buffer store")))
2737 }
2738 }
2739
2740 pub fn wait_for_remote_buffer(
2741 &mut self,
2742 id: BufferId,
2743 cx: &mut Context<BufferStore>,
2744 ) -> Task<Result<Entity<Buffer>>> {
2745 if let Some(this) = self.as_remote_mut() {
2746 this.wait_for_remote_buffer(id, cx)
2747 } else {
2748 debug_panic!("not a remote buffer store");
2749 Task::ready(Err(anyhow!("not a remote buffer store")))
2750 }
2751 }
2752
2753 pub fn serialize_project_transaction_for_peer(
2754 &mut self,
2755 project_transaction: ProjectTransaction,
2756 peer_id: proto::PeerId,
2757 cx: &mut Context<Self>,
2758 ) -> proto::ProjectTransaction {
2759 let mut serialized_transaction = proto::ProjectTransaction {
2760 buffer_ids: Default::default(),
2761 transactions: Default::default(),
2762 };
2763 for (buffer, transaction) in project_transaction.0 {
2764 self.create_buffer_for_peer(&buffer, peer_id, cx)
2765 .detach_and_log_err(cx);
2766 serialized_transaction
2767 .buffer_ids
2768 .push(buffer.read(cx).remote_id().into());
2769 serialized_transaction
2770 .transactions
2771 .push(language::proto::serialize_transaction(&transaction));
2772 }
2773 serialized_transaction
2774 }
2775}
2776
2777impl EventEmitter<BufferChangeSetEvent> for BufferChangeSet {}
2778
2779impl BufferChangeSet {
2780 fn set_state(
2781 &mut self,
2782 base_text: Option<language::BufferSnapshot>,
2783 diff: BufferDiff,
2784 buffer: &text::BufferSnapshot,
2785 cx: &mut Context<Self>,
2786 ) {
2787 if let Some(base_text) = base_text.as_ref() {
2788 let changed_range = if Some(base_text.remote_id())
2789 != self.base_text.as_ref().map(|buffer| buffer.remote_id())
2790 {
2791 Some(text::Anchor::MIN..text::Anchor::MAX)
2792 } else {
2793 diff.compare(&self.diff_to_buffer, buffer)
2794 };
2795 if let Some(changed_range) = changed_range {
2796 cx.emit(BufferChangeSetEvent::DiffChanged { changed_range });
2797 }
2798 }
2799 self.base_text = base_text;
2800 self.diff_to_buffer = diff;
2801 }
2802
2803 pub fn diff_hunks_intersecting_range<'a>(
2804 &'a self,
2805 range: Range<text::Anchor>,
2806 buffer_snapshot: &'a text::BufferSnapshot,
2807 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2808 self.diff_to_buffer
2809 .hunks_intersecting_range(range, buffer_snapshot)
2810 }
2811
2812 pub fn diff_hunks_intersecting_range_rev<'a>(
2813 &'a self,
2814 range: Range<text::Anchor>,
2815 buffer_snapshot: &'a text::BufferSnapshot,
2816 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2817 self.diff_to_buffer
2818 .hunks_intersecting_range_rev(range, buffer_snapshot)
2819 }
2820
2821 /// Used in cases where the change set isn't derived from git.
2822 pub fn set_base_text(
2823 &mut self,
2824 base_buffer: Entity<language::Buffer>,
2825 buffer: text::BufferSnapshot,
2826 cx: &mut Context<Self>,
2827 ) -> oneshot::Receiver<()> {
2828 let (tx, rx) = oneshot::channel();
2829 let this = cx.weak_entity();
2830 let base_buffer = base_buffer.read(cx).snapshot();
2831 cx.spawn(|_, mut cx| async move {
2832 let diff = cx
2833 .background_executor()
2834 .spawn({
2835 let base_buffer = base_buffer.clone();
2836 let buffer = buffer.clone();
2837 async move { BufferDiff::build(Some(&base_buffer.text()), &buffer) }
2838 })
2839 .await;
2840 let Some(this) = this.upgrade() else {
2841 tx.send(()).ok();
2842 return;
2843 };
2844 this.update(&mut cx, |this, cx| {
2845 this.set_state(Some(base_buffer), diff, &buffer, cx);
2846 })
2847 .log_err();
2848 tx.send(()).ok();
2849 })
2850 .detach();
2851 rx
2852 }
2853
2854 #[cfg(any(test, feature = "test-support"))]
2855 pub fn base_text_string(&self) -> Option<String> {
2856 self.base_text.as_ref().map(|buffer| buffer.text())
2857 }
2858
2859 pub fn new(buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2860 BufferChangeSet {
2861 buffer_id: buffer.read(cx).remote_id(),
2862 base_text: None,
2863 diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
2864 unstaged_change_set: None,
2865 }
2866 }
2867
2868 #[cfg(any(test, feature = "test-support"))]
2869 pub fn new_with_base_text(base_text: &str, buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2870 let mut base_text = base_text.to_owned();
2871 text::LineEnding::normalize(&mut base_text);
2872 let diff_to_buffer = BufferDiff::build(Some(&base_text), &buffer.read(cx).text_snapshot());
2873 let base_text = language::Buffer::build_snapshot_sync(base_text.into(), None, None, cx);
2874 BufferChangeSet {
2875 buffer_id: buffer.read(cx).remote_id(),
2876 base_text: Some(base_text),
2877 diff_to_buffer,
2878 unstaged_change_set: None,
2879 }
2880 }
2881
2882 #[cfg(any(test, feature = "test-support"))]
2883 pub fn recalculate_diff_sync(
2884 &mut self,
2885 snapshot: text::BufferSnapshot,
2886 cx: &mut Context<Self>,
2887 ) {
2888 let mut base_text = self.base_text.as_ref().map(|buffer| buffer.text());
2889 if let Some(base_text) = base_text.as_mut() {
2890 text::LineEnding::normalize(base_text);
2891 }
2892 let diff_to_buffer = BufferDiff::build(base_text.as_deref(), &snapshot);
2893 self.set_state(self.base_text.clone(), diff_to_buffer, &snapshot, cx);
2894 }
2895}
2896
2897impl OpenBuffer {
2898 fn upgrade(&self) -> Option<Entity<Buffer>> {
2899 match self {
2900 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2901 OpenBuffer::Operations(_) => None,
2902 }
2903 }
2904}
2905
2906fn is_not_found_error(error: &anyhow::Error) -> bool {
2907 error
2908 .root_cause()
2909 .downcast_ref::<io::Error>()
2910 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2911}
2912
2913fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2914 let Some(blame) = blame else {
2915 return proto::BlameBufferResponse {
2916 blame_response: None,
2917 };
2918 };
2919
2920 let entries = blame
2921 .entries
2922 .into_iter()
2923 .map(|entry| proto::BlameEntry {
2924 sha: entry.sha.as_bytes().into(),
2925 start_line: entry.range.start,
2926 end_line: entry.range.end,
2927 original_line_number: entry.original_line_number,
2928 author: entry.author.clone(),
2929 author_mail: entry.author_mail.clone(),
2930 author_time: entry.author_time,
2931 author_tz: entry.author_tz.clone(),
2932 committer: entry.committer.clone(),
2933 committer_mail: entry.committer_mail.clone(),
2934 committer_time: entry.committer_time,
2935 committer_tz: entry.committer_tz.clone(),
2936 summary: entry.summary.clone(),
2937 previous: entry.previous.clone(),
2938 filename: entry.filename.clone(),
2939 })
2940 .collect::<Vec<_>>();
2941
2942 let messages = blame
2943 .messages
2944 .into_iter()
2945 .map(|(oid, message)| proto::CommitMessage {
2946 oid: oid.as_bytes().into(),
2947 message,
2948 })
2949 .collect::<Vec<_>>();
2950
2951 let permalinks = blame
2952 .permalinks
2953 .into_iter()
2954 .map(|(oid, url)| proto::CommitPermalink {
2955 oid: oid.as_bytes().into(),
2956 permalink: url.to_string(),
2957 })
2958 .collect::<Vec<_>>();
2959
2960 proto::BlameBufferResponse {
2961 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2962 entries,
2963 messages,
2964 permalinks,
2965 remote_url: blame.remote_url,
2966 }),
2967 }
2968}
2969
2970fn deserialize_blame_buffer_response(
2971 response: proto::BlameBufferResponse,
2972) -> Option<git::blame::Blame> {
2973 let response = response.blame_response?;
2974 let entries = response
2975 .entries
2976 .into_iter()
2977 .filter_map(|entry| {
2978 Some(git::blame::BlameEntry {
2979 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2980 range: entry.start_line..entry.end_line,
2981 original_line_number: entry.original_line_number,
2982 committer: entry.committer,
2983 committer_time: entry.committer_time,
2984 committer_tz: entry.committer_tz,
2985 committer_mail: entry.committer_mail,
2986 author: entry.author,
2987 author_mail: entry.author_mail,
2988 author_time: entry.author_time,
2989 author_tz: entry.author_tz,
2990 summary: entry.summary,
2991 previous: entry.previous,
2992 filename: entry.filename,
2993 })
2994 })
2995 .collect::<Vec<_>>();
2996
2997 let messages = response
2998 .messages
2999 .into_iter()
3000 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
3001 .collect::<HashMap<_, _>>();
3002
3003 let permalinks = response
3004 .permalinks
3005 .into_iter()
3006 .filter_map(|permalink| {
3007 Some((
3008 git::Oid::from_bytes(&permalink.oid).ok()?,
3009 Url::from_str(&permalink.permalink).ok()?,
3010 ))
3011 })
3012 .collect::<HashMap<_, _>>();
3013
3014 Some(Blame {
3015 entries,
3016 permalinks,
3017 messages,
3018 remote_url: response.remote_url,
3019 })
3020}
3021
3022fn get_permalink_in_rust_registry_src(
3023 provider_registry: Arc<GitHostingProviderRegistry>,
3024 path: PathBuf,
3025 selection: Range<u32>,
3026) -> Result<url::Url> {
3027 #[derive(Deserialize)]
3028 struct CargoVcsGit {
3029 sha1: String,
3030 }
3031
3032 #[derive(Deserialize)]
3033 struct CargoVcsInfo {
3034 git: CargoVcsGit,
3035 path_in_vcs: String,
3036 }
3037
3038 #[derive(Deserialize)]
3039 struct CargoPackage {
3040 repository: String,
3041 }
3042
3043 #[derive(Deserialize)]
3044 struct CargoToml {
3045 package: CargoPackage,
3046 }
3047
3048 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
3049 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
3050 Some((dir, json))
3051 }) else {
3052 bail!("No .cargo_vcs_info.json found in parent directories")
3053 };
3054 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
3055 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
3056 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
3057 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
3058 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
3059 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
3060 let permalink = provider.build_permalink(
3061 remote,
3062 BuildPermalinkParams {
3063 sha: &cargo_vcs_info.git.sha1,
3064 path: &path.to_string_lossy(),
3065 selection: Some(selection),
3066 },
3067 );
3068 Ok(permalink)
3069}