1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{
13 channel::oneshot,
14 future::{OptionFuture, Shared},
15 Future, FutureExt as _, StreamExt,
16};
17use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
18use gpui::{
19 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
20};
21use http_client::Url;
22use language::{
23 proto::{
24 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
25 split_operations,
26 },
27 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
28};
29use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
30use serde::Deserialize;
31use smol::channel::Receiver;
32use std::{
33 io,
34 ops::Range,
35 path::{Path, PathBuf},
36 pin::pin,
37 str::FromStr as _,
38 sync::Arc,
39 time::Instant,
40};
41use text::{BufferId, Rope};
42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
46enum ChangeSetKind {
47 Unstaged,
48 Uncommitted,
49}
50
51/// A set of open buffers.
52pub struct BufferStore {
53 state: BufferStoreState,
54 #[allow(clippy::type_complexity)]
55 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
56 #[allow(clippy::type_complexity)]
57 loading_change_sets: HashMap<
58 (BufferId, ChangeSetKind),
59 Shared<Task<Result<Entity<BufferChangeSet>, Arc<anyhow::Error>>>>,
60 >,
61 worktree_store: Entity<WorktreeStore>,
62 opened_buffers: HashMap<BufferId, OpenBuffer>,
63 downstream_client: Option<(AnyProtoClient, u64)>,
64 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
65}
66
67#[derive(Hash, Eq, PartialEq, Clone)]
68struct SharedBuffer {
69 buffer: Entity<Buffer>,
70 change_set: Option<Entity<BufferChangeSet>>,
71 lsp_handle: Option<OpenLspBufferHandle>,
72}
73
74#[derive(Default)]
75struct BufferChangeSetState {
76 unstaged_changes: Option<WeakEntity<BufferChangeSet>>,
77 uncommitted_changes: Option<WeakEntity<BufferChangeSet>>,
78 recalculate_diff_task: Option<Task<Result<()>>>,
79 language: Option<Arc<Language>>,
80 language_registry: Option<Arc<LanguageRegistry>>,
81 diff_updated_futures: Vec<oneshot::Sender<()>>,
82 buffer_subscription: Option<Subscription>,
83
84 head_text: Option<Arc<String>>,
85 index_text: Option<Arc<String>>,
86 head_changed: bool,
87 index_changed: bool,
88}
89
90#[derive(Clone, Debug)]
91enum DiffBasesChange {
92 SetIndex(Option<String>),
93 SetHead(Option<String>),
94 SetEach {
95 index: Option<String>,
96 head: Option<String>,
97 },
98 SetBoth(Option<String>),
99}
100
101impl BufferChangeSetState {
102 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
103 self.language = buffer.read(cx).language().cloned();
104 self.index_changed = self.index_text.is_some();
105 self.head_changed = self.head_text.is_some();
106 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
107 }
108
109 fn unstaged_changes(&self) -> Option<Entity<BufferChangeSet>> {
110 self.unstaged_changes.as_ref().and_then(|set| set.upgrade())
111 }
112
113 fn uncommitted_changes(&self) -> Option<Entity<BufferChangeSet>> {
114 self.uncommitted_changes
115 .as_ref()
116 .and_then(|set| set.upgrade())
117 }
118
119 fn handle_base_texts_updated(
120 &mut self,
121 buffer: text::BufferSnapshot,
122 message: proto::UpdateDiffBases,
123 cx: &mut Context<Self>,
124 ) {
125 use proto::update_diff_bases::Mode;
126
127 let Some(mode) = Mode::from_i32(message.mode) else {
128 return;
129 };
130
131 let diff_bases_change = match mode {
132 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
133 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
134 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
135 Mode::IndexAndHead => DiffBasesChange::SetEach {
136 index: message.staged_text,
137 head: message.committed_text,
138 },
139 };
140
141 let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
142 }
143
144 fn diff_bases_changed(
145 &mut self,
146 buffer: text::BufferSnapshot,
147 diff_bases_change: DiffBasesChange,
148 cx: &mut Context<Self>,
149 ) -> oneshot::Receiver<()> {
150 match diff_bases_change {
151 DiffBasesChange::SetIndex(index) => {
152 let mut index = index.unwrap_or_default();
153 text::LineEnding::normalize(&mut index);
154 self.index_text = Some(Arc::new(index));
155 self.index_changed = true;
156 }
157 DiffBasesChange::SetHead(head) => {
158 let mut head = head.unwrap_or_default();
159 text::LineEnding::normalize(&mut head);
160 self.head_text = Some(Arc::new(head));
161 self.head_changed = true;
162 }
163 DiffBasesChange::SetBoth(text) => {
164 let mut text = text.unwrap_or_default();
165 text::LineEnding::normalize(&mut text);
166 self.head_text = Some(Arc::new(text));
167 self.index_text = self.head_text.clone();
168 self.head_changed = true;
169 self.index_changed = true;
170 }
171 DiffBasesChange::SetEach { index, head } => {
172 let mut index = index.unwrap_or_default();
173 text::LineEnding::normalize(&mut index);
174 let mut head = head.unwrap_or_default();
175 text::LineEnding::normalize(&mut head);
176 self.index_text = Some(Arc::new(index));
177 self.head_text = Some(Arc::new(head));
178 self.head_changed = true;
179 self.index_changed = true;
180 }
181 }
182
183 self.recalculate_diffs(buffer, cx)
184 }
185
186 fn recalculate_diffs(
187 &mut self,
188 buffer: text::BufferSnapshot,
189 cx: &mut Context<Self>,
190 ) -> oneshot::Receiver<()> {
191 let (tx, rx) = oneshot::channel();
192 self.diff_updated_futures.push(tx);
193
194 let language = self.language.clone();
195 let language_registry = self.language_registry.clone();
196 let unstaged_changes = self.unstaged_changes();
197 let uncommitted_changes = self.uncommitted_changes();
198 let head = self.head_text.clone();
199 let index = self.index_text.clone();
200 let index_changed = self.index_changed;
201 let head_changed = self.head_changed;
202 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
203 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
204 (None, None) => true,
205 _ => false,
206 };
207 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
208 if let Some(unstaged_changes) = &unstaged_changes {
209 let staged_snapshot = if index_changed {
210 let staged_snapshot = cx.update(|cx| {
211 index.as_ref().map(|head| {
212 language::Buffer::build_snapshot(
213 Rope::from(head.as_str()),
214 language.clone(),
215 language_registry.clone(),
216 cx,
217 )
218 })
219 })?;
220 cx.background_executor()
221 .spawn(OptionFuture::from(staged_snapshot))
222 } else {
223 Task::ready(
224 unstaged_changes
225 .read_with(&cx, |change_set, _| change_set.base_text.clone())?,
226 )
227 };
228
229 let diff =
230 cx.background_executor().spawn({
231 let buffer = buffer.clone();
232 async move {
233 BufferDiff::build(index.as_ref().map(|index| index.as_str()), &buffer)
234 }
235 });
236
237 let (staged_snapshot, diff) = futures::join!(staged_snapshot, diff);
238
239 unstaged_changes.update(&mut cx, |unstaged_changes, cx| {
240 unstaged_changes.set_state(staged_snapshot.clone(), diff, &buffer, cx);
241 })?;
242 }
243
244 if let Some(uncommitted_changes) = &uncommitted_changes {
245 let (snapshot, diff) = if let (Some(unstaged_changes), true) =
246 (&unstaged_changes, index_matches_head)
247 {
248 unstaged_changes.read_with(&cx, |change_set, _| {
249 (
250 change_set.base_text.clone(),
251 change_set.diff_to_buffer.clone(),
252 )
253 })?
254 } else {
255 let committed_snapshot = if head_changed {
256 let committed_snapshot = cx.update(|cx| {
257 head.as_ref().map(|head| {
258 language::Buffer::build_snapshot(
259 Rope::from(head.as_str()),
260 language.clone(),
261 language_registry.clone(),
262 cx,
263 )
264 })
265 })?;
266 cx.background_executor()
267 .spawn(OptionFuture::from(committed_snapshot))
268 } else {
269 Task::ready(
270 uncommitted_changes
271 .read_with(&cx, |change_set, _| change_set.base_text.clone())?,
272 )
273 };
274
275 let diff = cx.background_executor().spawn({
276 let buffer = buffer.clone();
277 let head = head.clone();
278 async move {
279 BufferDiff::build(head.as_ref().map(|head| head.as_str()), &buffer)
280 }
281 });
282 futures::join!(committed_snapshot, diff)
283 };
284
285 uncommitted_changes.update(&mut cx, |change_set, cx| {
286 change_set.set_state(snapshot, diff, &buffer, cx);
287 })?;
288 }
289
290 if let Some(this) = this.upgrade() {
291 this.update(&mut cx, |this, _| {
292 this.index_changed = false;
293 this.head_changed = false;
294 for tx in this.diff_updated_futures.drain(..) {
295 tx.send(()).ok();
296 }
297 })?;
298 }
299
300 Ok(())
301 }));
302
303 rx
304 }
305}
306
307pub struct BufferChangeSet {
308 pub buffer_id: BufferId,
309 pub base_text: Option<language::BufferSnapshot>,
310 pub diff_to_buffer: BufferDiff,
311 pub unstaged_change_set: Option<Entity<BufferChangeSet>>,
312}
313
314impl std::fmt::Debug for BufferChangeSet {
315 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
316 f.debug_struct("BufferChangeSet")
317 .field("buffer_id", &self.buffer_id)
318 .field("base_text", &self.base_text.as_ref().map(|s| s.text()))
319 .field("diff_to_buffer", &self.diff_to_buffer)
320 .finish()
321 }
322}
323
324pub enum BufferChangeSetEvent {
325 DiffChanged { changed_range: Range<text::Anchor> },
326}
327
328enum BufferStoreState {
329 Local(LocalBufferStore),
330 Remote(RemoteBufferStore),
331}
332
333struct RemoteBufferStore {
334 shared_with_me: HashSet<Entity<Buffer>>,
335 upstream_client: AnyProtoClient,
336 project_id: u64,
337 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
338 remote_buffer_listeners:
339 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
340 worktree_store: Entity<WorktreeStore>,
341}
342
343struct LocalBufferStore {
344 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
345 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
346 worktree_store: Entity<WorktreeStore>,
347 _subscription: Subscription,
348}
349
350enum OpenBuffer {
351 Complete {
352 buffer: WeakEntity<Buffer>,
353 change_set_state: Entity<BufferChangeSetState>,
354 },
355 Operations(Vec<Operation>),
356}
357
358pub enum BufferStoreEvent {
359 BufferAdded(Entity<Buffer>),
360 BufferDropped(BufferId),
361 BufferChangedFilePath {
362 buffer: Entity<Buffer>,
363 old_file: Option<Arc<dyn language::File>>,
364 },
365}
366
367#[derive(Default, Debug)]
368pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
369
370impl EventEmitter<BufferStoreEvent> for BufferStore {}
371
372impl RemoteBufferStore {
373 fn open_unstaged_changes(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
374 let project_id = self.project_id;
375 let client = self.upstream_client.clone();
376 cx.background_executor().spawn(async move {
377 let response = client
378 .request(proto::OpenUnstagedChanges {
379 project_id,
380 buffer_id: buffer_id.to_proto(),
381 })
382 .await?;
383 Ok(response.staged_text)
384 })
385 }
386
387 fn open_uncommitted_changes(
388 &self,
389 buffer_id: BufferId,
390 cx: &App,
391 ) -> Task<Result<DiffBasesChange>> {
392 use proto::open_uncommitted_changes_response::Mode;
393
394 let project_id = self.project_id;
395 let client = self.upstream_client.clone();
396 cx.background_executor().spawn(async move {
397 let response = client
398 .request(proto::OpenUncommittedChanges {
399 project_id,
400 buffer_id: buffer_id.to_proto(),
401 })
402 .await?;
403 let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
404 let bases = match mode {
405 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
406 Mode::IndexAndHead => DiffBasesChange::SetEach {
407 head: response.committed_text,
408 index: response.staged_text,
409 },
410 };
411 Ok(bases)
412 })
413 }
414
415 pub fn wait_for_remote_buffer(
416 &mut self,
417 id: BufferId,
418 cx: &mut Context<BufferStore>,
419 ) -> Task<Result<Entity<Buffer>>> {
420 let (tx, rx) = oneshot::channel();
421 self.remote_buffer_listeners.entry(id).or_default().push(tx);
422
423 cx.spawn(|this, cx| async move {
424 if let Some(buffer) = this
425 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
426 .ok()
427 .flatten()
428 {
429 return Ok(buffer);
430 }
431
432 cx.background_executor()
433 .spawn(async move { rx.await? })
434 .await
435 })
436 }
437
438 fn save_remote_buffer(
439 &self,
440 buffer_handle: Entity<Buffer>,
441 new_path: Option<proto::ProjectPath>,
442 cx: &Context<BufferStore>,
443 ) -> Task<Result<()>> {
444 let buffer = buffer_handle.read(cx);
445 let buffer_id = buffer.remote_id().into();
446 let version = buffer.version();
447 let rpc = self.upstream_client.clone();
448 let project_id = self.project_id;
449 cx.spawn(move |_, mut cx| async move {
450 let response = rpc
451 .request(proto::SaveBuffer {
452 project_id,
453 buffer_id,
454 new_path,
455 version: serialize_version(&version),
456 })
457 .await?;
458 let version = deserialize_version(&response.version);
459 let mtime = response.mtime.map(|mtime| mtime.into());
460
461 buffer_handle.update(&mut cx, |buffer, cx| {
462 buffer.did_save(version.clone(), mtime, cx);
463 })?;
464
465 Ok(())
466 })
467 }
468
469 pub fn handle_create_buffer_for_peer(
470 &mut self,
471 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
472 replica_id: u16,
473 capability: Capability,
474 cx: &mut Context<BufferStore>,
475 ) -> Result<Option<Entity<Buffer>>> {
476 match envelope
477 .payload
478 .variant
479 .ok_or_else(|| anyhow!("missing variant"))?
480 {
481 proto::create_buffer_for_peer::Variant::State(mut state) => {
482 let buffer_id = BufferId::new(state.id)?;
483
484 let buffer_result = maybe!({
485 let mut buffer_file = None;
486 if let Some(file) = state.file.take() {
487 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
488 let worktree = self
489 .worktree_store
490 .read(cx)
491 .worktree_for_id(worktree_id, cx)
492 .ok_or_else(|| {
493 anyhow!("no worktree found for id {}", file.worktree_id)
494 })?;
495 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
496 as Arc<dyn language::File>);
497 }
498 Buffer::from_proto(replica_id, capability, state, buffer_file)
499 });
500
501 match buffer_result {
502 Ok(buffer) => {
503 let buffer = cx.new(|_| buffer);
504 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
505 }
506 Err(error) => {
507 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
508 for listener in listeners {
509 listener.send(Err(anyhow!(error.cloned()))).ok();
510 }
511 }
512 }
513 }
514 }
515 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
516 let buffer_id = BufferId::new(chunk.buffer_id)?;
517 let buffer = self
518 .loading_remote_buffers_by_id
519 .get(&buffer_id)
520 .cloned()
521 .ok_or_else(|| {
522 anyhow!(
523 "received chunk for buffer {} without initial state",
524 chunk.buffer_id
525 )
526 })?;
527
528 let result = maybe!({
529 let operations = chunk
530 .operations
531 .into_iter()
532 .map(language::proto::deserialize_operation)
533 .collect::<Result<Vec<_>>>()?;
534 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
535 anyhow::Ok(())
536 });
537
538 if let Err(error) = result {
539 self.loading_remote_buffers_by_id.remove(&buffer_id);
540 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
541 for listener in listeners {
542 listener.send(Err(error.cloned())).ok();
543 }
544 }
545 } else if chunk.is_last {
546 self.loading_remote_buffers_by_id.remove(&buffer_id);
547 if self.upstream_client.is_via_collab() {
548 // retain buffers sent by peers to avoid races.
549 self.shared_with_me.insert(buffer.clone());
550 }
551
552 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
553 for sender in senders {
554 sender.send(Ok(buffer.clone())).ok();
555 }
556 }
557 return Ok(Some(buffer));
558 }
559 }
560 }
561 return Ok(None);
562 }
563
564 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
565 self.loading_remote_buffers_by_id
566 .keys()
567 .copied()
568 .collect::<Vec<_>>()
569 }
570
571 pub fn deserialize_project_transaction(
572 &self,
573 message: proto::ProjectTransaction,
574 push_to_history: bool,
575 cx: &mut Context<BufferStore>,
576 ) -> Task<Result<ProjectTransaction>> {
577 cx.spawn(|this, mut cx| async move {
578 let mut project_transaction = ProjectTransaction::default();
579 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
580 {
581 let buffer_id = BufferId::new(buffer_id)?;
582 let buffer = this
583 .update(&mut cx, |this, cx| {
584 this.wait_for_remote_buffer(buffer_id, cx)
585 })?
586 .await?;
587 let transaction = language::proto::deserialize_transaction(transaction)?;
588 project_transaction.0.insert(buffer, transaction);
589 }
590
591 for (buffer, transaction) in &project_transaction.0 {
592 buffer
593 .update(&mut cx, |buffer, _| {
594 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
595 })?
596 .await?;
597
598 if push_to_history {
599 buffer.update(&mut cx, |buffer, _| {
600 buffer.push_transaction(transaction.clone(), Instant::now());
601 })?;
602 }
603 }
604
605 Ok(project_transaction)
606 })
607 }
608
609 fn open_buffer(
610 &self,
611 path: Arc<Path>,
612 worktree: Entity<Worktree>,
613 cx: &mut Context<BufferStore>,
614 ) -> Task<Result<Entity<Buffer>>> {
615 let worktree_id = worktree.read(cx).id().to_proto();
616 let project_id = self.project_id;
617 let client = self.upstream_client.clone();
618 let path_string = path.clone().to_string_lossy().to_string();
619 cx.spawn(move |this, mut cx| async move {
620 let response = client
621 .request(proto::OpenBufferByPath {
622 project_id,
623 worktree_id,
624 path: path_string,
625 })
626 .await?;
627 let buffer_id = BufferId::new(response.buffer_id)?;
628
629 let buffer = this
630 .update(&mut cx, {
631 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
632 })?
633 .await?;
634
635 Ok(buffer)
636 })
637 }
638
639 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
640 let create = self.upstream_client.request(proto::OpenNewBuffer {
641 project_id: self.project_id,
642 });
643 cx.spawn(|this, mut cx| async move {
644 let response = create.await?;
645 let buffer_id = BufferId::new(response.buffer_id)?;
646
647 this.update(&mut cx, |this, cx| {
648 this.wait_for_remote_buffer(buffer_id, cx)
649 })?
650 .await
651 })
652 }
653
654 fn reload_buffers(
655 &self,
656 buffers: HashSet<Entity<Buffer>>,
657 push_to_history: bool,
658 cx: &mut Context<BufferStore>,
659 ) -> Task<Result<ProjectTransaction>> {
660 let request = self.upstream_client.request(proto::ReloadBuffers {
661 project_id: self.project_id,
662 buffer_ids: buffers
663 .iter()
664 .map(|buffer| buffer.read(cx).remote_id().to_proto())
665 .collect(),
666 });
667
668 cx.spawn(|this, mut cx| async move {
669 let response = request
670 .await?
671 .transaction
672 .ok_or_else(|| anyhow!("missing transaction"))?;
673 this.update(&mut cx, |this, cx| {
674 this.deserialize_project_transaction(response, push_to_history, cx)
675 })?
676 .await
677 })
678 }
679}
680
681impl LocalBufferStore {
682 fn worktree_for_buffer(
683 &self,
684 buffer: &Entity<Buffer>,
685 cx: &App,
686 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
687 let file = buffer.read(cx).file()?;
688 let worktree_id = file.worktree_id(cx);
689 let path = file.path().clone();
690 let worktree = self
691 .worktree_store
692 .read(cx)
693 .worktree_for_id(worktree_id, cx)?;
694 Some((worktree, path))
695 }
696
697 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
698 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
699 worktree.read(cx).load_staged_file(path.as_ref(), cx)
700 } else {
701 return Task::ready(Err(anyhow!("no such worktree")));
702 }
703 }
704
705 fn load_committed_text(
706 &self,
707 buffer: &Entity<Buffer>,
708 cx: &App,
709 ) -> Task<Result<Option<String>>> {
710 if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
711 worktree.read(cx).load_committed_file(path.as_ref(), cx)
712 } else {
713 Task::ready(Err(anyhow!("no such worktree")))
714 }
715 }
716
717 fn save_local_buffer(
718 &self,
719 buffer_handle: Entity<Buffer>,
720 worktree: Entity<Worktree>,
721 path: Arc<Path>,
722 mut has_changed_file: bool,
723 cx: &mut Context<BufferStore>,
724 ) -> Task<Result<()>> {
725 let buffer = buffer_handle.read(cx);
726
727 let text = buffer.as_rope().clone();
728 let line_ending = buffer.line_ending();
729 let version = buffer.version();
730 let buffer_id = buffer.remote_id();
731 if buffer
732 .file()
733 .is_some_and(|file| file.disk_state() == DiskState::New)
734 {
735 has_changed_file = true;
736 }
737
738 let save = worktree.update(cx, |worktree, cx| {
739 worktree.write_file(path.as_ref(), text, line_ending, cx)
740 });
741
742 cx.spawn(move |this, mut cx| async move {
743 let new_file = save.await?;
744 let mtime = new_file.disk_state().mtime();
745 this.update(&mut cx, |this, cx| {
746 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
747 if has_changed_file {
748 downstream_client
749 .send(proto::UpdateBufferFile {
750 project_id,
751 buffer_id: buffer_id.to_proto(),
752 file: Some(language::File::to_proto(&*new_file, cx)),
753 })
754 .log_err();
755 }
756 downstream_client
757 .send(proto::BufferSaved {
758 project_id,
759 buffer_id: buffer_id.to_proto(),
760 version: serialize_version(&version),
761 mtime: mtime.map(|time| time.into()),
762 })
763 .log_err();
764 }
765 })?;
766 buffer_handle.update(&mut cx, |buffer, cx| {
767 if has_changed_file {
768 buffer.file_updated(new_file, cx);
769 }
770 buffer.did_save(version.clone(), mtime, cx);
771 })
772 })
773 }
774
775 fn subscribe_to_worktree(
776 &mut self,
777 worktree: &Entity<Worktree>,
778 cx: &mut Context<BufferStore>,
779 ) {
780 cx.subscribe(worktree, |this, worktree, event, cx| {
781 if worktree.read(cx).is_local() {
782 match event {
783 worktree::Event::UpdatedEntries(changes) => {
784 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
785 }
786 worktree::Event::UpdatedGitRepositories(updated_repos) => {
787 Self::local_worktree_git_repos_changed(
788 this,
789 worktree.clone(),
790 updated_repos,
791 cx,
792 )
793 }
794 _ => {}
795 }
796 }
797 })
798 .detach();
799 }
800
801 fn local_worktree_entries_changed(
802 this: &mut BufferStore,
803 worktree_handle: &Entity<Worktree>,
804 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
805 cx: &mut Context<BufferStore>,
806 ) {
807 let snapshot = worktree_handle.read(cx).snapshot();
808 for (path, entry_id, _) in changes {
809 Self::local_worktree_entry_changed(
810 this,
811 *entry_id,
812 path,
813 worktree_handle,
814 &snapshot,
815 cx,
816 );
817 }
818 }
819
820 fn local_worktree_git_repos_changed(
821 this: &mut BufferStore,
822 worktree_handle: Entity<Worktree>,
823 changed_repos: &UpdatedGitRepositoriesSet,
824 cx: &mut Context<BufferStore>,
825 ) {
826 debug_assert!(worktree_handle.read(cx).is_local());
827
828 let mut change_set_state_updates = Vec::new();
829 for buffer in this.opened_buffers.values() {
830 let OpenBuffer::Complete {
831 buffer,
832 change_set_state,
833 } = buffer
834 else {
835 continue;
836 };
837 let Some(buffer) = buffer.upgrade() else {
838 continue;
839 };
840 let buffer = buffer.read(cx);
841 let Some(file) = File::from_dyn(buffer.file()) else {
842 continue;
843 };
844 if file.worktree != worktree_handle {
845 continue;
846 }
847 let change_set_state = change_set_state.read(cx);
848 if changed_repos
849 .iter()
850 .any(|(work_dir, _)| file.path.starts_with(work_dir))
851 {
852 let snapshot = buffer.text_snapshot();
853 change_set_state_updates.push((
854 snapshot.clone(),
855 file.path.clone(),
856 change_set_state
857 .unstaged_changes
858 .as_ref()
859 .and_then(|set| set.upgrade())
860 .is_some(),
861 change_set_state
862 .uncommitted_changes
863 .as_ref()
864 .and_then(|set| set.upgrade())
865 .is_some(),
866 ))
867 }
868 }
869
870 if change_set_state_updates.is_empty() {
871 return;
872 }
873
874 cx.spawn(move |this, mut cx| async move {
875 let snapshot =
876 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
877 let diff_bases_changes_by_buffer = cx
878 .background_executor()
879 .spawn(async move {
880 change_set_state_updates
881 .into_iter()
882 .filter_map(
883 |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
884 let local_repo = snapshot.local_repo_for_path(&path)?;
885 let relative_path = local_repo.relativize(&path).ok()?;
886 let staged_text = if needs_staged_text {
887 local_repo.repo().load_index_text(&relative_path)
888 } else {
889 None
890 };
891 let committed_text = if needs_committed_text {
892 local_repo.repo().load_committed_text(&relative_path)
893 } else {
894 None
895 };
896 let diff_bases_change =
897 match (needs_staged_text, needs_committed_text) {
898 (true, true) => Some(if staged_text == committed_text {
899 DiffBasesChange::SetBoth(committed_text)
900 } else {
901 DiffBasesChange::SetEach {
902 index: staged_text,
903 head: committed_text,
904 }
905 }),
906 (true, false) => {
907 Some(DiffBasesChange::SetIndex(staged_text))
908 }
909 (false, true) => {
910 Some(DiffBasesChange::SetHead(committed_text))
911 }
912 (false, false) => None,
913 };
914 Some((buffer_snapshot, diff_bases_change))
915 },
916 )
917 .collect::<Vec<_>>()
918 })
919 .await;
920
921 this.update(&mut cx, |this, cx| {
922 for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
923 let Some(OpenBuffer::Complete {
924 change_set_state, ..
925 }) = this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
926 else {
927 continue;
928 };
929 let Some(diff_bases_change) = diff_bases_change else {
930 continue;
931 };
932
933 change_set_state.update(cx, |change_set_state, cx| {
934 use proto::update_diff_bases::Mode;
935
936 if let Some((client, project_id)) = this.downstream_client.as_ref() {
937 let buffer_id = buffer_snapshot.remote_id().to_proto();
938 let (staged_text, committed_text, mode) = match diff_bases_change
939 .clone()
940 {
941 DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
942 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
943 DiffBasesChange::SetEach { index, head } => {
944 (index, head, Mode::IndexAndHead)
945 }
946 DiffBasesChange::SetBoth(text) => {
947 (None, text, Mode::IndexMatchesHead)
948 }
949 };
950 let message = proto::UpdateDiffBases {
951 project_id: *project_id,
952 buffer_id,
953 staged_text,
954 committed_text,
955 mode: mode as i32,
956 };
957
958 client.send(message).log_err();
959 }
960
961 let _ = change_set_state.diff_bases_changed(
962 buffer_snapshot,
963 diff_bases_change,
964 cx,
965 );
966 });
967 }
968 })
969 })
970 .detach_and_log_err(cx);
971 }
972
973 fn local_worktree_entry_changed(
974 this: &mut BufferStore,
975 entry_id: ProjectEntryId,
976 path: &Arc<Path>,
977 worktree: &Entity<worktree::Worktree>,
978 snapshot: &worktree::Snapshot,
979 cx: &mut Context<BufferStore>,
980 ) -> Option<()> {
981 let project_path = ProjectPath {
982 worktree_id: snapshot.id(),
983 path: path.clone(),
984 };
985
986 let buffer_id = {
987 let local = this.as_local_mut()?;
988 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
989 Some(&buffer_id) => buffer_id,
990 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
991 }
992 };
993
994 let buffer = if let Some(buffer) = this.get(buffer_id) {
995 Some(buffer)
996 } else {
997 this.opened_buffers.remove(&buffer_id);
998 None
999 };
1000
1001 let buffer = if let Some(buffer) = buffer {
1002 buffer
1003 } else {
1004 let this = this.as_local_mut()?;
1005 this.local_buffer_ids_by_path.remove(&project_path);
1006 this.local_buffer_ids_by_entry_id.remove(&entry_id);
1007 return None;
1008 };
1009
1010 let events = buffer.update(cx, |buffer, cx| {
1011 let local = this.as_local_mut()?;
1012 let file = buffer.file()?;
1013 let old_file = File::from_dyn(Some(file))?;
1014 if old_file.worktree != *worktree {
1015 return None;
1016 }
1017
1018 let snapshot_entry = old_file
1019 .entry_id
1020 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
1021 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
1022
1023 let new_file = if let Some(entry) = snapshot_entry {
1024 File {
1025 disk_state: match entry.mtime {
1026 Some(mtime) => DiskState::Present { mtime },
1027 None => old_file.disk_state,
1028 },
1029 is_local: true,
1030 entry_id: Some(entry.id),
1031 path: entry.path.clone(),
1032 worktree: worktree.clone(),
1033 is_private: entry.is_private,
1034 }
1035 } else {
1036 File {
1037 disk_state: DiskState::Deleted,
1038 is_local: true,
1039 entry_id: old_file.entry_id,
1040 path: old_file.path.clone(),
1041 worktree: worktree.clone(),
1042 is_private: old_file.is_private,
1043 }
1044 };
1045
1046 if new_file == *old_file {
1047 return None;
1048 }
1049
1050 let mut events = Vec::new();
1051 if new_file.path != old_file.path {
1052 local.local_buffer_ids_by_path.remove(&ProjectPath {
1053 path: old_file.path.clone(),
1054 worktree_id: old_file.worktree_id(cx),
1055 });
1056 local.local_buffer_ids_by_path.insert(
1057 ProjectPath {
1058 worktree_id: new_file.worktree_id(cx),
1059 path: new_file.path.clone(),
1060 },
1061 buffer_id,
1062 );
1063 events.push(BufferStoreEvent::BufferChangedFilePath {
1064 buffer: cx.entity(),
1065 old_file: buffer.file().cloned(),
1066 });
1067 }
1068
1069 if new_file.entry_id != old_file.entry_id {
1070 if let Some(entry_id) = old_file.entry_id {
1071 local.local_buffer_ids_by_entry_id.remove(&entry_id);
1072 }
1073 if let Some(entry_id) = new_file.entry_id {
1074 local
1075 .local_buffer_ids_by_entry_id
1076 .insert(entry_id, buffer_id);
1077 }
1078 }
1079
1080 if let Some((client, project_id)) = &this.downstream_client {
1081 client
1082 .send(proto::UpdateBufferFile {
1083 project_id: *project_id,
1084 buffer_id: buffer_id.to_proto(),
1085 file: Some(new_file.to_proto(cx)),
1086 })
1087 .ok();
1088 }
1089
1090 buffer.file_updated(Arc::new(new_file), cx);
1091 Some(events)
1092 })?;
1093
1094 for event in events {
1095 cx.emit(event);
1096 }
1097
1098 None
1099 }
1100
1101 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1102 let file = File::from_dyn(buffer.read(cx).file())?;
1103
1104 let remote_id = buffer.read(cx).remote_id();
1105 if let Some(entry_id) = file.entry_id {
1106 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1107 Some(_) => {
1108 return None;
1109 }
1110 None => {
1111 self.local_buffer_ids_by_entry_id
1112 .insert(entry_id, remote_id);
1113 }
1114 }
1115 };
1116 self.local_buffer_ids_by_path.insert(
1117 ProjectPath {
1118 worktree_id: file.worktree_id(cx),
1119 path: file.path.clone(),
1120 },
1121 remote_id,
1122 );
1123
1124 Some(())
1125 }
1126
1127 fn save_buffer(
1128 &self,
1129 buffer: Entity<Buffer>,
1130 cx: &mut Context<BufferStore>,
1131 ) -> Task<Result<()>> {
1132 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1133 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1134 };
1135 let worktree = file.worktree.clone();
1136 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1137 }
1138
1139 fn save_buffer_as(
1140 &self,
1141 buffer: Entity<Buffer>,
1142 path: ProjectPath,
1143 cx: &mut Context<BufferStore>,
1144 ) -> Task<Result<()>> {
1145 let Some(worktree) = self
1146 .worktree_store
1147 .read(cx)
1148 .worktree_for_id(path.worktree_id, cx)
1149 else {
1150 return Task::ready(Err(anyhow!("no such worktree")));
1151 };
1152 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1153 }
1154
1155 fn open_buffer(
1156 &self,
1157 path: Arc<Path>,
1158 worktree: Entity<Worktree>,
1159 cx: &mut Context<BufferStore>,
1160 ) -> Task<Result<Entity<Buffer>>> {
1161 let load_buffer = worktree.update(cx, |worktree, cx| {
1162 let load_file = worktree.load_file(path.as_ref(), cx);
1163 let reservation = cx.reserve_entity();
1164 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1165 cx.spawn(move |_, mut cx| async move {
1166 let loaded = load_file.await?;
1167 let text_buffer = cx
1168 .background_executor()
1169 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1170 .await;
1171 cx.insert_entity(reservation, |_| {
1172 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1173 })
1174 })
1175 });
1176
1177 cx.spawn(move |this, mut cx| async move {
1178 let buffer = match load_buffer.await {
1179 Ok(buffer) => Ok(buffer),
1180 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1181 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1182 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1183 Buffer::build(
1184 text_buffer,
1185 Some(Arc::new(File {
1186 worktree,
1187 path,
1188 disk_state: DiskState::New,
1189 entry_id: None,
1190 is_local: true,
1191 is_private: false,
1192 })),
1193 Capability::ReadWrite,
1194 )
1195 }),
1196 Err(e) => Err(e),
1197 }?;
1198 this.update(&mut cx, |this, cx| {
1199 this.add_buffer(buffer.clone(), cx)?;
1200 let buffer_id = buffer.read(cx).remote_id();
1201 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1202 let this = this.as_local_mut().unwrap();
1203 this.local_buffer_ids_by_path.insert(
1204 ProjectPath {
1205 worktree_id: file.worktree_id(cx),
1206 path: file.path.clone(),
1207 },
1208 buffer_id,
1209 );
1210
1211 if let Some(entry_id) = file.entry_id {
1212 this.local_buffer_ids_by_entry_id
1213 .insert(entry_id, buffer_id);
1214 }
1215 }
1216
1217 anyhow::Ok(())
1218 })??;
1219
1220 Ok(buffer)
1221 })
1222 }
1223
1224 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1225 cx.spawn(|buffer_store, mut cx| async move {
1226 let buffer =
1227 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1228 buffer_store.update(&mut cx, |buffer_store, cx| {
1229 buffer_store.add_buffer(buffer.clone(), cx).log_err();
1230 })?;
1231 Ok(buffer)
1232 })
1233 }
1234
1235 fn reload_buffers(
1236 &self,
1237 buffers: HashSet<Entity<Buffer>>,
1238 push_to_history: bool,
1239 cx: &mut Context<BufferStore>,
1240 ) -> Task<Result<ProjectTransaction>> {
1241 cx.spawn(move |_, mut cx| async move {
1242 let mut project_transaction = ProjectTransaction::default();
1243 for buffer in buffers {
1244 let transaction = buffer
1245 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1246 .await?;
1247 buffer.update(&mut cx, |buffer, cx| {
1248 if let Some(transaction) = transaction {
1249 if !push_to_history {
1250 buffer.forget_transaction(transaction.id);
1251 }
1252 project_transaction.0.insert(cx.entity(), transaction);
1253 }
1254 })?;
1255 }
1256
1257 Ok(project_transaction)
1258 })
1259 }
1260}
1261
1262impl BufferStore {
1263 pub fn init(client: &AnyProtoClient) {
1264 client.add_entity_message_handler(Self::handle_buffer_reloaded);
1265 client.add_entity_message_handler(Self::handle_buffer_saved);
1266 client.add_entity_message_handler(Self::handle_update_buffer_file);
1267 client.add_entity_request_handler(Self::handle_save_buffer);
1268 client.add_entity_request_handler(Self::handle_blame_buffer);
1269 client.add_entity_request_handler(Self::handle_reload_buffers);
1270 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1271 client.add_entity_request_handler(Self::handle_open_unstaged_changes);
1272 client.add_entity_request_handler(Self::handle_open_uncommitted_changes);
1273 client.add_entity_message_handler(Self::handle_update_diff_bases);
1274 }
1275
1276 /// Creates a buffer store, optionally retaining its buffers.
1277 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1278 Self {
1279 state: BufferStoreState::Local(LocalBufferStore {
1280 local_buffer_ids_by_path: Default::default(),
1281 local_buffer_ids_by_entry_id: Default::default(),
1282 worktree_store: worktree_store.clone(),
1283 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1284 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1285 let this = this.as_local_mut().unwrap();
1286 this.subscribe_to_worktree(worktree, cx);
1287 }
1288 }),
1289 }),
1290 downstream_client: None,
1291 opened_buffers: Default::default(),
1292 shared_buffers: Default::default(),
1293 loading_buffers: Default::default(),
1294 loading_change_sets: Default::default(),
1295 worktree_store,
1296 }
1297 }
1298
1299 pub fn remote(
1300 worktree_store: Entity<WorktreeStore>,
1301 upstream_client: AnyProtoClient,
1302 remote_id: u64,
1303 _cx: &mut Context<Self>,
1304 ) -> Self {
1305 Self {
1306 state: BufferStoreState::Remote(RemoteBufferStore {
1307 shared_with_me: Default::default(),
1308 loading_remote_buffers_by_id: Default::default(),
1309 remote_buffer_listeners: Default::default(),
1310 project_id: remote_id,
1311 upstream_client,
1312 worktree_store: worktree_store.clone(),
1313 }),
1314 downstream_client: None,
1315 opened_buffers: Default::default(),
1316 loading_buffers: Default::default(),
1317 loading_change_sets: Default::default(),
1318 shared_buffers: Default::default(),
1319 worktree_store,
1320 }
1321 }
1322
1323 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1324 match &mut self.state {
1325 BufferStoreState::Local(state) => Some(state),
1326 _ => None,
1327 }
1328 }
1329
1330 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1331 match &mut self.state {
1332 BufferStoreState::Remote(state) => Some(state),
1333 _ => None,
1334 }
1335 }
1336
1337 fn as_remote(&self) -> Option<&RemoteBufferStore> {
1338 match &self.state {
1339 BufferStoreState::Remote(state) => Some(state),
1340 _ => None,
1341 }
1342 }
1343
1344 pub fn open_buffer(
1345 &mut self,
1346 project_path: ProjectPath,
1347 cx: &mut Context<Self>,
1348 ) -> Task<Result<Entity<Buffer>>> {
1349 if let Some(buffer) = self.get_by_path(&project_path, cx) {
1350 return Task::ready(Ok(buffer));
1351 }
1352
1353 let task = match self.loading_buffers.entry(project_path.clone()) {
1354 hash_map::Entry::Occupied(e) => e.get().clone(),
1355 hash_map::Entry::Vacant(entry) => {
1356 let path = project_path.path.clone();
1357 let Some(worktree) = self
1358 .worktree_store
1359 .read(cx)
1360 .worktree_for_id(project_path.worktree_id, cx)
1361 else {
1362 return Task::ready(Err(anyhow!("no such worktree")));
1363 };
1364 let load_buffer = match &self.state {
1365 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1366 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1367 };
1368
1369 entry
1370 .insert(
1371 cx.spawn(move |this, mut cx| async move {
1372 let load_result = load_buffer.await;
1373 this.update(&mut cx, |this, _cx| {
1374 // Record the fact that the buffer is no longer loading.
1375 this.loading_buffers.remove(&project_path);
1376 })
1377 .ok();
1378 load_result.map_err(Arc::new)
1379 })
1380 .shared(),
1381 )
1382 .clone()
1383 }
1384 };
1385
1386 cx.background_executor()
1387 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1388 }
1389
1390 pub fn open_unstaged_changes(
1391 &mut self,
1392 buffer: Entity<Buffer>,
1393 cx: &mut Context<Self>,
1394 ) -> Task<Result<Entity<BufferChangeSet>>> {
1395 let buffer_id = buffer.read(cx).remote_id();
1396 if let Some(change_set) = self.get_unstaged_changes(buffer_id, cx) {
1397 return Task::ready(Ok(change_set));
1398 }
1399
1400 let task = match self
1401 .loading_change_sets
1402 .entry((buffer_id, ChangeSetKind::Unstaged))
1403 {
1404 hash_map::Entry::Occupied(e) => e.get().clone(),
1405 hash_map::Entry::Vacant(entry) => {
1406 let staged_text = match &self.state {
1407 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1408 BufferStoreState::Remote(this) => this.open_unstaged_changes(buffer_id, cx),
1409 };
1410
1411 entry
1412 .insert(
1413 cx.spawn(move |this, cx| async move {
1414 Self::open_change_set_internal(
1415 this,
1416 ChangeSetKind::Unstaged,
1417 staged_text.await.map(DiffBasesChange::SetIndex),
1418 buffer,
1419 cx,
1420 )
1421 .await
1422 .map_err(Arc::new)
1423 })
1424 .shared(),
1425 )
1426 .clone()
1427 }
1428 };
1429
1430 cx.background_executor()
1431 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1432 }
1433
1434 pub fn open_uncommitted_changes(
1435 &mut self,
1436 buffer: Entity<Buffer>,
1437 cx: &mut Context<Self>,
1438 ) -> Task<Result<Entity<BufferChangeSet>>> {
1439 let buffer_id = buffer.read(cx).remote_id();
1440 if let Some(change_set) = self.get_uncommitted_changes(buffer_id, cx) {
1441 return Task::ready(Ok(change_set));
1442 }
1443
1444 let task = match self
1445 .loading_change_sets
1446 .entry((buffer_id, ChangeSetKind::Uncommitted))
1447 {
1448 hash_map::Entry::Occupied(e) => e.get().clone(),
1449 hash_map::Entry::Vacant(entry) => {
1450 let changes = match &self.state {
1451 BufferStoreState::Local(this) => {
1452 let committed_text = this.load_committed_text(&buffer, cx);
1453 let staged_text = this.load_staged_text(&buffer, cx);
1454 cx.background_executor().spawn(async move {
1455 let committed_text = committed_text.await?;
1456 let staged_text = staged_text.await?;
1457 let diff_bases_change = if committed_text == staged_text {
1458 DiffBasesChange::SetBoth(committed_text)
1459 } else {
1460 DiffBasesChange::SetEach {
1461 index: staged_text,
1462 head: committed_text,
1463 }
1464 };
1465 Ok(diff_bases_change)
1466 })
1467 }
1468 BufferStoreState::Remote(this) => this.open_uncommitted_changes(buffer_id, cx),
1469 };
1470
1471 entry
1472 .insert(
1473 cx.spawn(move |this, cx| async move {
1474 Self::open_change_set_internal(
1475 this,
1476 ChangeSetKind::Uncommitted,
1477 changes.await,
1478 buffer,
1479 cx,
1480 )
1481 .await
1482 .map_err(Arc::new)
1483 })
1484 .shared(),
1485 )
1486 .clone()
1487 }
1488 };
1489
1490 cx.background_executor()
1491 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1492 }
1493
1494 #[cfg(any(test, feature = "test-support"))]
1495 pub fn set_unstaged_change_set(
1496 &mut self,
1497 buffer_id: BufferId,
1498 change_set: Entity<BufferChangeSet>,
1499 ) {
1500 self.loading_change_sets.insert(
1501 (buffer_id, ChangeSetKind::Unstaged),
1502 Task::ready(Ok(change_set)).shared(),
1503 );
1504 }
1505
1506 async fn open_change_set_internal(
1507 this: WeakEntity<Self>,
1508 kind: ChangeSetKind,
1509 texts: Result<DiffBasesChange>,
1510 buffer: Entity<Buffer>,
1511 mut cx: AsyncApp,
1512 ) -> Result<Entity<BufferChangeSet>> {
1513 let diff_bases_change = match texts {
1514 Err(e) => {
1515 this.update(&mut cx, |this, cx| {
1516 let buffer_id = buffer.read(cx).remote_id();
1517 this.loading_change_sets.remove(&(buffer_id, kind));
1518 })?;
1519 return Err(e);
1520 }
1521 Ok(change) => change,
1522 };
1523
1524 this.update(&mut cx, |this, cx| {
1525 let buffer_id = buffer.read(cx).remote_id();
1526 this.loading_change_sets.remove(&(buffer_id, kind));
1527
1528 if let Some(OpenBuffer::Complete {
1529 change_set_state, ..
1530 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1531 {
1532 change_set_state.update(cx, |change_set_state, cx| {
1533 let buffer_id = buffer.read(cx).remote_id();
1534 change_set_state.buffer_subscription.get_or_insert_with(|| {
1535 cx.subscribe(&buffer, |this, buffer, event, cx| match event {
1536 BufferEvent::LanguageChanged => {
1537 this.buffer_language_changed(buffer, cx)
1538 }
1539 _ => {}
1540 })
1541 });
1542
1543 let change_set = cx.new(|cx| BufferChangeSet {
1544 buffer_id,
1545 base_text: None,
1546 diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
1547 unstaged_change_set: None,
1548 });
1549 match kind {
1550 ChangeSetKind::Unstaged => {
1551 change_set_state.unstaged_changes = Some(change_set.downgrade())
1552 }
1553 ChangeSetKind::Uncommitted => {
1554 let unstaged_change_set =
1555 if let Some(change_set) = change_set_state.unstaged_changes() {
1556 change_set
1557 } else {
1558 let unstaged_change_set = cx.new(|cx| BufferChangeSet {
1559 buffer_id,
1560 base_text: None,
1561 diff_to_buffer: BufferDiff::new(
1562 &buffer.read(cx).text_snapshot(),
1563 ),
1564 unstaged_change_set: None,
1565 });
1566 change_set_state.unstaged_changes =
1567 Some(unstaged_change_set.downgrade());
1568 unstaged_change_set
1569 };
1570
1571 change_set.update(cx, |change_set, _| {
1572 change_set.unstaged_change_set = Some(unstaged_change_set);
1573 });
1574 change_set_state.uncommitted_changes = Some(change_set.downgrade())
1575 }
1576 };
1577
1578 let buffer = buffer.read(cx).text_snapshot();
1579 let rx = change_set_state.diff_bases_changed(buffer, diff_bases_change, cx);
1580
1581 Ok(async move {
1582 rx.await.ok();
1583 Ok(change_set)
1584 })
1585 })
1586 } else {
1587 Err(anyhow!("buffer was closed"))
1588 }
1589 })??
1590 .await
1591 }
1592
1593 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1594 match &self.state {
1595 BufferStoreState::Local(this) => this.create_buffer(cx),
1596 BufferStoreState::Remote(this) => this.create_buffer(cx),
1597 }
1598 }
1599
1600 pub fn save_buffer(
1601 &mut self,
1602 buffer: Entity<Buffer>,
1603 cx: &mut Context<Self>,
1604 ) -> Task<Result<()>> {
1605 match &mut self.state {
1606 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1607 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1608 }
1609 }
1610
1611 pub fn save_buffer_as(
1612 &mut self,
1613 buffer: Entity<Buffer>,
1614 path: ProjectPath,
1615 cx: &mut Context<Self>,
1616 ) -> Task<Result<()>> {
1617 let old_file = buffer.read(cx).file().cloned();
1618 let task = match &self.state {
1619 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1620 BufferStoreState::Remote(this) => {
1621 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1622 }
1623 };
1624 cx.spawn(|this, mut cx| async move {
1625 task.await?;
1626 this.update(&mut cx, |_, cx| {
1627 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1628 })
1629 })
1630 }
1631
1632 pub fn blame_buffer(
1633 &self,
1634 buffer: &Entity<Buffer>,
1635 version: Option<clock::Global>,
1636 cx: &App,
1637 ) -> Task<Result<Option<Blame>>> {
1638 let buffer = buffer.read(cx);
1639 let Some(file) = File::from_dyn(buffer.file()) else {
1640 return Task::ready(Err(anyhow!("buffer has no file")));
1641 };
1642
1643 match file.worktree.clone().read(cx) {
1644 Worktree::Local(worktree) => {
1645 let worktree = worktree.snapshot();
1646 let blame_params = maybe!({
1647 let local_repo = match worktree.local_repo_for_path(&file.path) {
1648 Some(repo_for_path) => repo_for_path,
1649 None => return Ok(None),
1650 };
1651
1652 let relative_path = local_repo
1653 .relativize(&file.path)
1654 .context("failed to relativize buffer path")?;
1655
1656 let repo = local_repo.repo().clone();
1657
1658 let content = match version {
1659 Some(version) => buffer.rope_for_version(&version).clone(),
1660 None => buffer.as_rope().clone(),
1661 };
1662
1663 anyhow::Ok(Some((repo, relative_path, content)))
1664 });
1665
1666 cx.background_executor().spawn(async move {
1667 let Some((repo, relative_path, content)) = blame_params? else {
1668 return Ok(None);
1669 };
1670 repo.blame(&relative_path, content)
1671 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1672 .map(Some)
1673 })
1674 }
1675 Worktree::Remote(worktree) => {
1676 let buffer_id = buffer.remote_id();
1677 let version = buffer.version();
1678 let project_id = worktree.project_id();
1679 let client = worktree.client();
1680 cx.spawn(|_| async move {
1681 let response = client
1682 .request(proto::BlameBuffer {
1683 project_id,
1684 buffer_id: buffer_id.into(),
1685 version: serialize_version(&version),
1686 })
1687 .await?;
1688 Ok(deserialize_blame_buffer_response(response))
1689 })
1690 }
1691 }
1692 }
1693
1694 pub fn get_permalink_to_line(
1695 &self,
1696 buffer: &Entity<Buffer>,
1697 selection: Range<u32>,
1698 cx: &App,
1699 ) -> Task<Result<url::Url>> {
1700 let buffer = buffer.read(cx);
1701 let Some(file) = File::from_dyn(buffer.file()) else {
1702 return Task::ready(Err(anyhow!("buffer has no file")));
1703 };
1704
1705 match file.worktree.read(cx) {
1706 Worktree::Local(worktree) => {
1707 let worktree_path = worktree.abs_path().clone();
1708 let Some((repo_entry, repo)) =
1709 worktree.repository_for_path(file.path()).and_then(|entry| {
1710 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1711 Some((entry, repo))
1712 })
1713 else {
1714 // If we're not in a Git repo, check whether this is a Rust source
1715 // file in the Cargo registry (presumably opened with go-to-definition
1716 // from a normal Rust file). If so, we can put together a permalink
1717 // using crate metadata.
1718 if !buffer
1719 .language()
1720 .is_some_and(|lang| lang.name() == "Rust".into())
1721 {
1722 return Task::ready(Err(anyhow!("no permalink available")));
1723 }
1724 let file_path = worktree_path.join(file.path());
1725 return cx.spawn(|cx| async move {
1726 let provider_registry =
1727 cx.update(GitHostingProviderRegistry::default_global)?;
1728 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1729 .map_err(|_| anyhow!("no permalink available"))
1730 });
1731 };
1732
1733 let path = match repo_entry.relativize(file.path()) {
1734 Ok(RepoPath(path)) => path,
1735 Err(e) => return Task::ready(Err(e)),
1736 };
1737
1738 cx.spawn(|cx| async move {
1739 const REMOTE_NAME: &str = "origin";
1740 let origin_url = repo
1741 .remote_url(REMOTE_NAME)
1742 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1743
1744 let sha = repo
1745 .head_sha()
1746 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1747
1748 let provider_registry =
1749 cx.update(GitHostingProviderRegistry::default_global)?;
1750
1751 let (provider, remote) =
1752 parse_git_remote_url(provider_registry, &origin_url)
1753 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1754
1755 let path = path
1756 .to_str()
1757 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1758
1759 Ok(provider.build_permalink(
1760 remote,
1761 BuildPermalinkParams {
1762 sha: &sha,
1763 path,
1764 selection: Some(selection),
1765 },
1766 ))
1767 })
1768 }
1769 Worktree::Remote(worktree) => {
1770 let buffer_id = buffer.remote_id();
1771 let project_id = worktree.project_id();
1772 let client = worktree.client();
1773 cx.spawn(|_| async move {
1774 let response = client
1775 .request(proto::GetPermalinkToLine {
1776 project_id,
1777 buffer_id: buffer_id.into(),
1778 selection: Some(proto::Range {
1779 start: selection.start as u64,
1780 end: selection.end as u64,
1781 }),
1782 })
1783 .await?;
1784
1785 url::Url::parse(&response.permalink).context("failed to parse permalink")
1786 })
1787 }
1788 }
1789 }
1790
1791 fn add_buffer(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1792 let remote_id = buffer.read(cx).remote_id();
1793 let is_remote = buffer.read(cx).replica_id() != 0;
1794 let open_buffer = OpenBuffer::Complete {
1795 buffer: buffer.downgrade(),
1796 change_set_state: cx.new(|_| BufferChangeSetState::default()),
1797 };
1798
1799 let handle = cx.entity().downgrade();
1800 buffer.update(cx, move |_, cx| {
1801 cx.on_release(move |buffer, cx| {
1802 handle
1803 .update(cx, |_, cx| {
1804 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1805 })
1806 .ok();
1807 })
1808 .detach()
1809 });
1810
1811 match self.opened_buffers.entry(remote_id) {
1812 hash_map::Entry::Vacant(entry) => {
1813 entry.insert(open_buffer);
1814 }
1815 hash_map::Entry::Occupied(mut entry) => {
1816 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1817 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1818 } else if entry.get().upgrade().is_some() {
1819 if is_remote {
1820 return Ok(());
1821 } else {
1822 debug_panic!("buffer {} was already registered", remote_id);
1823 Err(anyhow!("buffer {} was already registered", remote_id))?;
1824 }
1825 }
1826 entry.insert(open_buffer);
1827 }
1828 }
1829
1830 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1831 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1832 Ok(())
1833 }
1834
1835 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1836 self.opened_buffers
1837 .values()
1838 .filter_map(|buffer| buffer.upgrade())
1839 }
1840
1841 pub fn loading_buffers(
1842 &self,
1843 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1844 self.loading_buffers.iter().map(|(path, task)| {
1845 let task = task.clone();
1846 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1847 })
1848 }
1849
1850 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1851 self.buffers().find_map(|buffer| {
1852 let file = File::from_dyn(buffer.read(cx).file())?;
1853 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1854 Some(buffer)
1855 } else {
1856 None
1857 }
1858 })
1859 }
1860
1861 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1862 self.opened_buffers.get(&buffer_id)?.upgrade()
1863 }
1864
1865 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1866 self.get(buffer_id)
1867 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1868 }
1869
1870 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1871 self.get(buffer_id).or_else(|| {
1872 self.as_remote()
1873 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1874 })
1875 }
1876
1877 pub fn get_unstaged_changes(
1878 &self,
1879 buffer_id: BufferId,
1880 cx: &App,
1881 ) -> Option<Entity<BufferChangeSet>> {
1882 if let OpenBuffer::Complete {
1883 change_set_state, ..
1884 } = self.opened_buffers.get(&buffer_id)?
1885 {
1886 change_set_state
1887 .read(cx)
1888 .unstaged_changes
1889 .as_ref()?
1890 .upgrade()
1891 } else {
1892 None
1893 }
1894 }
1895
1896 pub fn get_uncommitted_changes(
1897 &self,
1898 buffer_id: BufferId,
1899 cx: &App,
1900 ) -> Option<Entity<BufferChangeSet>> {
1901 if let OpenBuffer::Complete {
1902 change_set_state, ..
1903 } = self.opened_buffers.get(&buffer_id)?
1904 {
1905 change_set_state
1906 .read(cx)
1907 .uncommitted_changes
1908 .as_ref()?
1909 .upgrade()
1910 } else {
1911 None
1912 }
1913 }
1914
1915 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1916 let buffers = self
1917 .buffers()
1918 .map(|buffer| {
1919 let buffer = buffer.read(cx);
1920 proto::BufferVersion {
1921 id: buffer.remote_id().into(),
1922 version: language::proto::serialize_version(&buffer.version),
1923 }
1924 })
1925 .collect();
1926 let incomplete_buffer_ids = self
1927 .as_remote()
1928 .map(|remote| remote.incomplete_buffer_ids())
1929 .unwrap_or_default();
1930 (buffers, incomplete_buffer_ids)
1931 }
1932
1933 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1934 for open_buffer in self.opened_buffers.values_mut() {
1935 if let Some(buffer) = open_buffer.upgrade() {
1936 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1937 }
1938 }
1939
1940 for buffer in self.buffers() {
1941 buffer.update(cx, |buffer, cx| {
1942 buffer.set_capability(Capability::ReadOnly, cx)
1943 });
1944 }
1945
1946 if let Some(remote) = self.as_remote_mut() {
1947 // Wake up all futures currently waiting on a buffer to get opened,
1948 // to give them a chance to fail now that we've disconnected.
1949 remote.remote_buffer_listeners.clear()
1950 }
1951 }
1952
1953 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1954 self.downstream_client = Some((downstream_client, remote_id));
1955 }
1956
1957 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1958 self.downstream_client.take();
1959 self.forget_shared_buffers();
1960 }
1961
1962 pub fn discard_incomplete(&mut self) {
1963 self.opened_buffers
1964 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1965 }
1966
1967 pub fn find_search_candidates(
1968 &mut self,
1969 query: &SearchQuery,
1970 mut limit: usize,
1971 fs: Arc<dyn Fs>,
1972 cx: &mut Context<Self>,
1973 ) -> Receiver<Entity<Buffer>> {
1974 let (tx, rx) = smol::channel::unbounded();
1975 let mut open_buffers = HashSet::default();
1976 let mut unnamed_buffers = Vec::new();
1977 for handle in self.buffers() {
1978 let buffer = handle.read(cx);
1979 if let Some(entry_id) = buffer.entry_id(cx) {
1980 open_buffers.insert(entry_id);
1981 } else {
1982 limit = limit.saturating_sub(1);
1983 unnamed_buffers.push(handle)
1984 };
1985 }
1986
1987 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1988 let project_paths_rx = self
1989 .worktree_store
1990 .update(cx, |worktree_store, cx| {
1991 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1992 })
1993 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1994
1995 cx.spawn(|this, mut cx| async move {
1996 for buffer in unnamed_buffers {
1997 tx.send(buffer).await.ok();
1998 }
1999
2000 let mut project_paths_rx = pin!(project_paths_rx);
2001 while let Some(project_paths) = project_paths_rx.next().await {
2002 let buffers = this.update(&mut cx, |this, cx| {
2003 project_paths
2004 .into_iter()
2005 .map(|project_path| this.open_buffer(project_path, cx))
2006 .collect::<Vec<_>>()
2007 })?;
2008 for buffer_task in buffers {
2009 if let Some(buffer) = buffer_task.await.log_err() {
2010 if tx.send(buffer).await.is_err() {
2011 return anyhow::Ok(());
2012 }
2013 }
2014 }
2015 }
2016 anyhow::Ok(())
2017 })
2018 .detach();
2019 rx
2020 }
2021
2022 pub fn recalculate_buffer_diffs(
2023 &mut self,
2024 buffers: Vec<Entity<Buffer>>,
2025 cx: &mut Context<Self>,
2026 ) -> impl Future<Output = ()> {
2027 let mut futures = Vec::new();
2028 for buffer in buffers {
2029 if let Some(OpenBuffer::Complete {
2030 change_set_state, ..
2031 }) = self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
2032 {
2033 let buffer = buffer.read(cx).text_snapshot();
2034 futures.push(change_set_state.update(cx, |change_set_state, cx| {
2035 change_set_state.recalculate_diffs(buffer, cx)
2036 }));
2037 }
2038 }
2039 async move {
2040 futures::future::join_all(futures).await;
2041 }
2042 }
2043
2044 fn on_buffer_event(
2045 &mut self,
2046 buffer: Entity<Buffer>,
2047 event: &BufferEvent,
2048 cx: &mut Context<Self>,
2049 ) {
2050 match event {
2051 BufferEvent::FileHandleChanged => {
2052 if let Some(local) = self.as_local_mut() {
2053 local.buffer_changed_file(buffer, cx);
2054 }
2055 }
2056 BufferEvent::Reloaded => {
2057 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
2058 return;
2059 };
2060 let buffer = buffer.read(cx);
2061 downstream_client
2062 .send(proto::BufferReloaded {
2063 project_id: *project_id,
2064 buffer_id: buffer.remote_id().to_proto(),
2065 version: serialize_version(&buffer.version()),
2066 mtime: buffer.saved_mtime().map(|t| t.into()),
2067 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
2068 })
2069 .log_err();
2070 }
2071 _ => {}
2072 }
2073 }
2074
2075 pub async fn handle_update_buffer(
2076 this: Entity<Self>,
2077 envelope: TypedEnvelope<proto::UpdateBuffer>,
2078 mut cx: AsyncApp,
2079 ) -> Result<proto::Ack> {
2080 let payload = envelope.payload.clone();
2081 let buffer_id = BufferId::new(payload.buffer_id)?;
2082 let ops = payload
2083 .operations
2084 .into_iter()
2085 .map(language::proto::deserialize_operation)
2086 .collect::<Result<Vec<_>, _>>()?;
2087 this.update(&mut cx, |this, cx| {
2088 match this.opened_buffers.entry(buffer_id) {
2089 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2090 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2091 OpenBuffer::Complete { buffer, .. } => {
2092 if let Some(buffer) = buffer.upgrade() {
2093 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2094 }
2095 }
2096 },
2097 hash_map::Entry::Vacant(e) => {
2098 e.insert(OpenBuffer::Operations(ops));
2099 }
2100 }
2101 Ok(proto::Ack {})
2102 })?
2103 }
2104
2105 pub fn register_shared_lsp_handle(
2106 &mut self,
2107 peer_id: proto::PeerId,
2108 buffer_id: BufferId,
2109 handle: OpenLspBufferHandle,
2110 ) {
2111 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2112 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2113 buffer.lsp_handle = Some(handle);
2114 return;
2115 }
2116 }
2117 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2118 }
2119
2120 pub fn handle_synchronize_buffers(
2121 &mut self,
2122 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2123 cx: &mut Context<Self>,
2124 client: Arc<Client>,
2125 ) -> Result<proto::SynchronizeBuffersResponse> {
2126 let project_id = envelope.payload.project_id;
2127 let mut response = proto::SynchronizeBuffersResponse {
2128 buffers: Default::default(),
2129 };
2130 let Some(guest_id) = envelope.original_sender_id else {
2131 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2132 };
2133
2134 self.shared_buffers.entry(guest_id).or_default().clear();
2135 for buffer in envelope.payload.buffers {
2136 let buffer_id = BufferId::new(buffer.id)?;
2137 let remote_version = language::proto::deserialize_version(&buffer.version);
2138 if let Some(buffer) = self.get(buffer_id) {
2139 self.shared_buffers
2140 .entry(guest_id)
2141 .or_default()
2142 .entry(buffer_id)
2143 .or_insert_with(|| SharedBuffer {
2144 buffer: buffer.clone(),
2145 change_set: None,
2146 lsp_handle: None,
2147 });
2148
2149 let buffer = buffer.read(cx);
2150 response.buffers.push(proto::BufferVersion {
2151 id: buffer_id.into(),
2152 version: language::proto::serialize_version(&buffer.version),
2153 });
2154
2155 let operations = buffer.serialize_ops(Some(remote_version), cx);
2156 let client = client.clone();
2157 if let Some(file) = buffer.file() {
2158 client
2159 .send(proto::UpdateBufferFile {
2160 project_id,
2161 buffer_id: buffer_id.into(),
2162 file: Some(file.to_proto(cx)),
2163 })
2164 .log_err();
2165 }
2166
2167 // TODO(max): do something
2168 // client
2169 // .send(proto::UpdateStagedText {
2170 // project_id,
2171 // buffer_id: buffer_id.into(),
2172 // diff_base: buffer.diff_base().map(ToString::to_string),
2173 // })
2174 // .log_err();
2175
2176 client
2177 .send(proto::BufferReloaded {
2178 project_id,
2179 buffer_id: buffer_id.into(),
2180 version: language::proto::serialize_version(buffer.saved_version()),
2181 mtime: buffer.saved_mtime().map(|time| time.into()),
2182 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2183 as i32,
2184 })
2185 .log_err();
2186
2187 cx.background_executor()
2188 .spawn(
2189 async move {
2190 let operations = operations.await;
2191 for chunk in split_operations(operations) {
2192 client
2193 .request(proto::UpdateBuffer {
2194 project_id,
2195 buffer_id: buffer_id.into(),
2196 operations: chunk,
2197 })
2198 .await?;
2199 }
2200 anyhow::Ok(())
2201 }
2202 .log_err(),
2203 )
2204 .detach();
2205 }
2206 }
2207 Ok(response)
2208 }
2209
2210 pub fn handle_create_buffer_for_peer(
2211 &mut self,
2212 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2213 replica_id: u16,
2214 capability: Capability,
2215 cx: &mut Context<Self>,
2216 ) -> Result<()> {
2217 let Some(remote) = self.as_remote_mut() else {
2218 return Err(anyhow!("buffer store is not a remote"));
2219 };
2220
2221 if let Some(buffer) =
2222 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2223 {
2224 self.add_buffer(buffer, cx)?;
2225 }
2226
2227 Ok(())
2228 }
2229
2230 pub async fn handle_update_buffer_file(
2231 this: Entity<Self>,
2232 envelope: TypedEnvelope<proto::UpdateBufferFile>,
2233 mut cx: AsyncApp,
2234 ) -> Result<()> {
2235 let buffer_id = envelope.payload.buffer_id;
2236 let buffer_id = BufferId::new(buffer_id)?;
2237
2238 this.update(&mut cx, |this, cx| {
2239 let payload = envelope.payload.clone();
2240 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2241 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2242 let worktree = this
2243 .worktree_store
2244 .read(cx)
2245 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2246 .ok_or_else(|| anyhow!("no such worktree"))?;
2247 let file = File::from_proto(file, worktree, cx)?;
2248 let old_file = buffer.update(cx, |buffer, cx| {
2249 let old_file = buffer.file().cloned();
2250 let new_path = file.path.clone();
2251 buffer.file_updated(Arc::new(file), cx);
2252 if old_file
2253 .as_ref()
2254 .map_or(true, |old| *old.path() != new_path)
2255 {
2256 Some(old_file)
2257 } else {
2258 None
2259 }
2260 });
2261 if let Some(old_file) = old_file {
2262 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2263 }
2264 }
2265 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2266 downstream_client
2267 .send(proto::UpdateBufferFile {
2268 project_id: *project_id,
2269 buffer_id: buffer_id.into(),
2270 file: envelope.payload.file,
2271 })
2272 .log_err();
2273 }
2274 Ok(())
2275 })?
2276 }
2277
2278 pub async fn handle_save_buffer(
2279 this: Entity<Self>,
2280 envelope: TypedEnvelope<proto::SaveBuffer>,
2281 mut cx: AsyncApp,
2282 ) -> Result<proto::BufferSaved> {
2283 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2284 let (buffer, project_id) = this.update(&mut cx, |this, _| {
2285 anyhow::Ok((
2286 this.get_existing(buffer_id)?,
2287 this.downstream_client
2288 .as_ref()
2289 .map(|(_, project_id)| *project_id)
2290 .context("project is not shared")?,
2291 ))
2292 })??;
2293 buffer
2294 .update(&mut cx, |buffer, _| {
2295 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2296 })?
2297 .await?;
2298 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2299
2300 if let Some(new_path) = envelope.payload.new_path {
2301 let new_path = ProjectPath::from_proto(new_path);
2302 this.update(&mut cx, |this, cx| {
2303 this.save_buffer_as(buffer.clone(), new_path, cx)
2304 })?
2305 .await?;
2306 } else {
2307 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2308 .await?;
2309 }
2310
2311 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2312 project_id,
2313 buffer_id: buffer_id.into(),
2314 version: serialize_version(buffer.saved_version()),
2315 mtime: buffer.saved_mtime().map(|time| time.into()),
2316 })
2317 }
2318
2319 pub async fn handle_close_buffer(
2320 this: Entity<Self>,
2321 envelope: TypedEnvelope<proto::CloseBuffer>,
2322 mut cx: AsyncApp,
2323 ) -> Result<()> {
2324 let peer_id = envelope.sender_id;
2325 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2326 this.update(&mut cx, |this, _| {
2327 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2328 if shared.remove(&buffer_id).is_some() {
2329 if shared.is_empty() {
2330 this.shared_buffers.remove(&peer_id);
2331 }
2332 return;
2333 }
2334 }
2335 debug_panic!(
2336 "peer_id {} closed buffer_id {} which was either not open or already closed",
2337 peer_id,
2338 buffer_id
2339 )
2340 })
2341 }
2342
2343 pub async fn handle_buffer_saved(
2344 this: Entity<Self>,
2345 envelope: TypedEnvelope<proto::BufferSaved>,
2346 mut cx: AsyncApp,
2347 ) -> Result<()> {
2348 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2349 let version = deserialize_version(&envelope.payload.version);
2350 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2351 this.update(&mut cx, move |this, cx| {
2352 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2353 buffer.update(cx, |buffer, cx| {
2354 buffer.did_save(version, mtime, cx);
2355 });
2356 }
2357
2358 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2359 downstream_client
2360 .send(proto::BufferSaved {
2361 project_id: *project_id,
2362 buffer_id: buffer_id.into(),
2363 mtime: envelope.payload.mtime,
2364 version: envelope.payload.version,
2365 })
2366 .log_err();
2367 }
2368 })
2369 }
2370
2371 pub async fn handle_buffer_reloaded(
2372 this: Entity<Self>,
2373 envelope: TypedEnvelope<proto::BufferReloaded>,
2374 mut cx: AsyncApp,
2375 ) -> Result<()> {
2376 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2377 let version = deserialize_version(&envelope.payload.version);
2378 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2379 let line_ending = deserialize_line_ending(
2380 proto::LineEnding::from_i32(envelope.payload.line_ending)
2381 .ok_or_else(|| anyhow!("missing line ending"))?,
2382 );
2383 this.update(&mut cx, |this, cx| {
2384 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2385 buffer.update(cx, |buffer, cx| {
2386 buffer.did_reload(version, line_ending, mtime, cx);
2387 });
2388 }
2389
2390 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2391 downstream_client
2392 .send(proto::BufferReloaded {
2393 project_id: *project_id,
2394 buffer_id: buffer_id.into(),
2395 mtime: envelope.payload.mtime,
2396 version: envelope.payload.version,
2397 line_ending: envelope.payload.line_ending,
2398 })
2399 .log_err();
2400 }
2401 })
2402 }
2403
2404 pub async fn handle_blame_buffer(
2405 this: Entity<Self>,
2406 envelope: TypedEnvelope<proto::BlameBuffer>,
2407 mut cx: AsyncApp,
2408 ) -> Result<proto::BlameBufferResponse> {
2409 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2410 let version = deserialize_version(&envelope.payload.version);
2411 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2412 buffer
2413 .update(&mut cx, |buffer, _| {
2414 buffer.wait_for_version(version.clone())
2415 })?
2416 .await?;
2417 let blame = this
2418 .update(&mut cx, |this, cx| {
2419 this.blame_buffer(&buffer, Some(version), cx)
2420 })?
2421 .await?;
2422 Ok(serialize_blame_buffer_response(blame))
2423 }
2424
2425 pub async fn handle_get_permalink_to_line(
2426 this: Entity<Self>,
2427 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2428 mut cx: AsyncApp,
2429 ) -> Result<proto::GetPermalinkToLineResponse> {
2430 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2431 // let version = deserialize_version(&envelope.payload.version);
2432 let selection = {
2433 let proto_selection = envelope
2434 .payload
2435 .selection
2436 .context("no selection to get permalink for defined")?;
2437 proto_selection.start as u32..proto_selection.end as u32
2438 };
2439 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2440 let permalink = this
2441 .update(&mut cx, |this, cx| {
2442 this.get_permalink_to_line(&buffer, selection, cx)
2443 })?
2444 .await?;
2445 Ok(proto::GetPermalinkToLineResponse {
2446 permalink: permalink.to_string(),
2447 })
2448 }
2449
2450 pub async fn handle_open_unstaged_changes(
2451 this: Entity<Self>,
2452 request: TypedEnvelope<proto::OpenUnstagedChanges>,
2453 mut cx: AsyncApp,
2454 ) -> Result<proto::OpenUnstagedChangesResponse> {
2455 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2456 let change_set = this
2457 .update(&mut cx, |this, cx| {
2458 let buffer = this.get(buffer_id)?;
2459 Some(this.open_unstaged_changes(buffer, cx))
2460 })?
2461 .ok_or_else(|| anyhow!("no such buffer"))?
2462 .await?;
2463 this.update(&mut cx, |this, _| {
2464 let shared_buffers = this
2465 .shared_buffers
2466 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2467 .or_default();
2468 debug_assert!(shared_buffers.contains_key(&buffer_id));
2469 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2470 shared.change_set = Some(change_set.clone());
2471 }
2472 })?;
2473 let staged_text = change_set.read_with(&cx, |change_set, _| {
2474 change_set.base_text.as_ref().map(|buffer| buffer.text())
2475 })?;
2476 Ok(proto::OpenUnstagedChangesResponse { staged_text })
2477 }
2478
2479 pub async fn handle_open_uncommitted_changes(
2480 this: Entity<Self>,
2481 request: TypedEnvelope<proto::OpenUncommittedChanges>,
2482 mut cx: AsyncApp,
2483 ) -> Result<proto::OpenUncommittedChangesResponse> {
2484 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2485 let change_set = this
2486 .update(&mut cx, |this, cx| {
2487 let buffer = this.get(buffer_id)?;
2488 Some(this.open_uncommitted_changes(buffer, cx))
2489 })?
2490 .ok_or_else(|| anyhow!("no such buffer"))?
2491 .await?;
2492 this.update(&mut cx, |this, _| {
2493 let shared_buffers = this
2494 .shared_buffers
2495 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2496 .or_default();
2497 debug_assert!(shared_buffers.contains_key(&buffer_id));
2498 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2499 shared.change_set = Some(change_set.clone());
2500 }
2501 })?;
2502 change_set.read_with(&cx, |change_set, cx| {
2503 use proto::open_uncommitted_changes_response::Mode;
2504
2505 let staged_buffer = change_set
2506 .unstaged_change_set
2507 .as_ref()
2508 .and_then(|change_set| change_set.read(cx).base_text.as_ref());
2509
2510 let mode;
2511 let staged_text;
2512 let committed_text;
2513 if let Some(committed_buffer) = &change_set.base_text {
2514 committed_text = Some(committed_buffer.text());
2515 if let Some(staged_buffer) = staged_buffer {
2516 if staged_buffer.remote_id() == committed_buffer.remote_id() {
2517 mode = Mode::IndexMatchesHead;
2518 staged_text = None;
2519 } else {
2520 mode = Mode::IndexAndHead;
2521 staged_text = Some(staged_buffer.text());
2522 }
2523 } else {
2524 mode = Mode::IndexAndHead;
2525 staged_text = None;
2526 }
2527 } else {
2528 mode = Mode::IndexAndHead;
2529 committed_text = None;
2530 staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2531 }
2532
2533 proto::OpenUncommittedChangesResponse {
2534 committed_text,
2535 staged_text,
2536 mode: mode.into(),
2537 }
2538 })
2539 }
2540
2541 pub async fn handle_update_diff_bases(
2542 this: Entity<Self>,
2543 request: TypedEnvelope<proto::UpdateDiffBases>,
2544 mut cx: AsyncApp,
2545 ) -> Result<()> {
2546 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2547 this.update(&mut cx, |this, cx| {
2548 if let Some(OpenBuffer::Complete {
2549 change_set_state,
2550 buffer,
2551 }) = this.opened_buffers.get_mut(&buffer_id)
2552 {
2553 if let Some(buffer) = buffer.upgrade() {
2554 let buffer = buffer.read(cx).text_snapshot();
2555 change_set_state.update(cx, |change_set_state, cx| {
2556 change_set_state.handle_base_texts_updated(buffer, request.payload, cx);
2557 })
2558 }
2559 }
2560 })
2561 }
2562
2563 pub fn reload_buffers(
2564 &self,
2565 buffers: HashSet<Entity<Buffer>>,
2566 push_to_history: bool,
2567 cx: &mut Context<Self>,
2568 ) -> Task<Result<ProjectTransaction>> {
2569 if buffers.is_empty() {
2570 return Task::ready(Ok(ProjectTransaction::default()));
2571 }
2572 match &self.state {
2573 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2574 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2575 }
2576 }
2577
2578 async fn handle_reload_buffers(
2579 this: Entity<Self>,
2580 envelope: TypedEnvelope<proto::ReloadBuffers>,
2581 mut cx: AsyncApp,
2582 ) -> Result<proto::ReloadBuffersResponse> {
2583 let sender_id = envelope.original_sender_id().unwrap_or_default();
2584 let reload = this.update(&mut cx, |this, cx| {
2585 let mut buffers = HashSet::default();
2586 for buffer_id in &envelope.payload.buffer_ids {
2587 let buffer_id = BufferId::new(*buffer_id)?;
2588 buffers.insert(this.get_existing(buffer_id)?);
2589 }
2590 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2591 })??;
2592
2593 let project_transaction = reload.await?;
2594 let project_transaction = this.update(&mut cx, |this, cx| {
2595 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2596 })?;
2597 Ok(proto::ReloadBuffersResponse {
2598 transaction: Some(project_transaction),
2599 })
2600 }
2601
2602 pub fn create_buffer_for_peer(
2603 &mut self,
2604 buffer: &Entity<Buffer>,
2605 peer_id: proto::PeerId,
2606 cx: &mut Context<Self>,
2607 ) -> Task<Result<()>> {
2608 let buffer_id = buffer.read(cx).remote_id();
2609 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2610 if shared_buffers.contains_key(&buffer_id) {
2611 return Task::ready(Ok(()));
2612 }
2613 shared_buffers.insert(
2614 buffer_id,
2615 SharedBuffer {
2616 buffer: buffer.clone(),
2617 change_set: None,
2618 lsp_handle: None,
2619 },
2620 );
2621
2622 let Some((client, project_id)) = self.downstream_client.clone() else {
2623 return Task::ready(Ok(()));
2624 };
2625
2626 cx.spawn(|this, mut cx| async move {
2627 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2628 return anyhow::Ok(());
2629 };
2630
2631 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2632 let operations = operations.await;
2633 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2634
2635 let initial_state = proto::CreateBufferForPeer {
2636 project_id,
2637 peer_id: Some(peer_id),
2638 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2639 };
2640
2641 if client.send(initial_state).log_err().is_some() {
2642 let client = client.clone();
2643 cx.background_executor()
2644 .spawn(async move {
2645 let mut chunks = split_operations(operations).peekable();
2646 while let Some(chunk) = chunks.next() {
2647 let is_last = chunks.peek().is_none();
2648 client.send(proto::CreateBufferForPeer {
2649 project_id,
2650 peer_id: Some(peer_id),
2651 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2652 proto::BufferChunk {
2653 buffer_id: buffer_id.into(),
2654 operations: chunk,
2655 is_last,
2656 },
2657 )),
2658 })?;
2659 }
2660 anyhow::Ok(())
2661 })
2662 .await
2663 .log_err();
2664 }
2665 Ok(())
2666 })
2667 }
2668
2669 pub fn forget_shared_buffers(&mut self) {
2670 self.shared_buffers.clear();
2671 }
2672
2673 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2674 self.shared_buffers.remove(peer_id);
2675 }
2676
2677 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2678 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2679 self.shared_buffers.insert(new_peer_id, buffers);
2680 }
2681 }
2682
2683 pub fn has_shared_buffers(&self) -> bool {
2684 !self.shared_buffers.is_empty()
2685 }
2686
2687 pub fn create_local_buffer(
2688 &mut self,
2689 text: &str,
2690 language: Option<Arc<Language>>,
2691 cx: &mut Context<Self>,
2692 ) -> Entity<Buffer> {
2693 let buffer = cx.new(|cx| {
2694 Buffer::local(text, cx)
2695 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2696 });
2697
2698 self.add_buffer(buffer.clone(), cx).log_err();
2699 let buffer_id = buffer.read(cx).remote_id();
2700
2701 let this = self
2702 .as_local_mut()
2703 .expect("local-only method called in a non-local context");
2704 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2705 this.local_buffer_ids_by_path.insert(
2706 ProjectPath {
2707 worktree_id: file.worktree_id(cx),
2708 path: file.path.clone(),
2709 },
2710 buffer_id,
2711 );
2712
2713 if let Some(entry_id) = file.entry_id {
2714 this.local_buffer_ids_by_entry_id
2715 .insert(entry_id, buffer_id);
2716 }
2717 }
2718 buffer
2719 }
2720
2721 pub fn deserialize_project_transaction(
2722 &mut self,
2723 message: proto::ProjectTransaction,
2724 push_to_history: bool,
2725 cx: &mut Context<Self>,
2726 ) -> Task<Result<ProjectTransaction>> {
2727 if let Some(this) = self.as_remote_mut() {
2728 this.deserialize_project_transaction(message, push_to_history, cx)
2729 } else {
2730 debug_panic!("not a remote buffer store");
2731 Task::ready(Err(anyhow!("not a remote buffer store")))
2732 }
2733 }
2734
2735 pub fn wait_for_remote_buffer(
2736 &mut self,
2737 id: BufferId,
2738 cx: &mut Context<BufferStore>,
2739 ) -> Task<Result<Entity<Buffer>>> {
2740 if let Some(this) = self.as_remote_mut() {
2741 this.wait_for_remote_buffer(id, cx)
2742 } else {
2743 debug_panic!("not a remote buffer store");
2744 Task::ready(Err(anyhow!("not a remote buffer store")))
2745 }
2746 }
2747
2748 pub fn serialize_project_transaction_for_peer(
2749 &mut self,
2750 project_transaction: ProjectTransaction,
2751 peer_id: proto::PeerId,
2752 cx: &mut Context<Self>,
2753 ) -> proto::ProjectTransaction {
2754 let mut serialized_transaction = proto::ProjectTransaction {
2755 buffer_ids: Default::default(),
2756 transactions: Default::default(),
2757 };
2758 for (buffer, transaction) in project_transaction.0 {
2759 self.create_buffer_for_peer(&buffer, peer_id, cx)
2760 .detach_and_log_err(cx);
2761 serialized_transaction
2762 .buffer_ids
2763 .push(buffer.read(cx).remote_id().into());
2764 serialized_transaction
2765 .transactions
2766 .push(language::proto::serialize_transaction(&transaction));
2767 }
2768 serialized_transaction
2769 }
2770}
2771
2772impl EventEmitter<BufferChangeSetEvent> for BufferChangeSet {}
2773
2774impl BufferChangeSet {
2775 fn set_state(
2776 &mut self,
2777 base_text: Option<language::BufferSnapshot>,
2778 diff: BufferDiff,
2779 buffer: &text::BufferSnapshot,
2780 cx: &mut Context<Self>,
2781 ) {
2782 if let Some(base_text) = base_text.as_ref() {
2783 let changed_range = if Some(base_text.remote_id())
2784 != self.base_text.as_ref().map(|buffer| buffer.remote_id())
2785 {
2786 Some(text::Anchor::MIN..text::Anchor::MAX)
2787 } else {
2788 diff.compare(&self.diff_to_buffer, buffer)
2789 };
2790 if let Some(changed_range) = changed_range {
2791 cx.emit(BufferChangeSetEvent::DiffChanged { changed_range });
2792 }
2793 }
2794 self.base_text = base_text;
2795 self.diff_to_buffer = diff;
2796 }
2797
2798 pub fn diff_hunks_intersecting_range<'a>(
2799 &'a self,
2800 range: Range<text::Anchor>,
2801 buffer_snapshot: &'a text::BufferSnapshot,
2802 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2803 self.diff_to_buffer
2804 .hunks_intersecting_range(range, buffer_snapshot)
2805 }
2806
2807 pub fn diff_hunks_intersecting_range_rev<'a>(
2808 &'a self,
2809 range: Range<text::Anchor>,
2810 buffer_snapshot: &'a text::BufferSnapshot,
2811 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2812 self.diff_to_buffer
2813 .hunks_intersecting_range_rev(range, buffer_snapshot)
2814 }
2815
2816 /// Used in cases where the change set isn't derived from git.
2817 pub fn set_base_text(
2818 &mut self,
2819 base_buffer: Entity<language::Buffer>,
2820 buffer: text::BufferSnapshot,
2821 cx: &mut Context<Self>,
2822 ) -> oneshot::Receiver<()> {
2823 let (tx, rx) = oneshot::channel();
2824 let this = cx.weak_entity();
2825 let base_buffer = base_buffer.read(cx).snapshot();
2826 cx.spawn(|_, mut cx| async move {
2827 let diff = cx
2828 .background_executor()
2829 .spawn({
2830 let base_buffer = base_buffer.clone();
2831 let buffer = buffer.clone();
2832 async move { BufferDiff::build(Some(&base_buffer.text()), &buffer) }
2833 })
2834 .await;
2835 let Some(this) = this.upgrade() else {
2836 tx.send(()).ok();
2837 return;
2838 };
2839 this.update(&mut cx, |this, cx| {
2840 this.set_state(Some(base_buffer), diff, &buffer, cx);
2841 })
2842 .log_err();
2843 tx.send(()).ok();
2844 })
2845 .detach();
2846 rx
2847 }
2848
2849 #[cfg(any(test, feature = "test-support"))]
2850 pub fn base_text_string(&self) -> Option<String> {
2851 self.base_text.as_ref().map(|buffer| buffer.text())
2852 }
2853
2854 pub fn new(buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2855 BufferChangeSet {
2856 buffer_id: buffer.read(cx).remote_id(),
2857 base_text: None,
2858 diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
2859 unstaged_change_set: None,
2860 }
2861 }
2862
2863 #[cfg(any(test, feature = "test-support"))]
2864 pub fn new_with_base_text(base_text: &str, buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2865 let mut base_text = base_text.to_owned();
2866 text::LineEnding::normalize(&mut base_text);
2867 let diff_to_buffer = BufferDiff::build(Some(&base_text), &buffer.read(cx).text_snapshot());
2868 let base_text = language::Buffer::build_snapshot_sync(base_text.into(), None, None, cx);
2869 BufferChangeSet {
2870 buffer_id: buffer.read(cx).remote_id(),
2871 base_text: Some(base_text),
2872 diff_to_buffer,
2873 unstaged_change_set: None,
2874 }
2875 }
2876
2877 #[cfg(any(test, feature = "test-support"))]
2878 pub fn recalculate_diff_sync(
2879 &mut self,
2880 snapshot: text::BufferSnapshot,
2881 cx: &mut Context<Self>,
2882 ) {
2883 let mut base_text = self.base_text.as_ref().map(|buffer| buffer.text());
2884 if let Some(base_text) = base_text.as_mut() {
2885 text::LineEnding::normalize(base_text);
2886 }
2887 let diff_to_buffer = BufferDiff::build(base_text.as_deref(), &snapshot);
2888 self.set_state(self.base_text.clone(), diff_to_buffer, &snapshot, cx);
2889 }
2890}
2891
2892impl OpenBuffer {
2893 fn upgrade(&self) -> Option<Entity<Buffer>> {
2894 match self {
2895 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2896 OpenBuffer::Operations(_) => None,
2897 }
2898 }
2899}
2900
2901fn is_not_found_error(error: &anyhow::Error) -> bool {
2902 error
2903 .root_cause()
2904 .downcast_ref::<io::Error>()
2905 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2906}
2907
2908fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2909 let Some(blame) = blame else {
2910 return proto::BlameBufferResponse {
2911 blame_response: None,
2912 };
2913 };
2914
2915 let entries = blame
2916 .entries
2917 .into_iter()
2918 .map(|entry| proto::BlameEntry {
2919 sha: entry.sha.as_bytes().into(),
2920 start_line: entry.range.start,
2921 end_line: entry.range.end,
2922 original_line_number: entry.original_line_number,
2923 author: entry.author.clone(),
2924 author_mail: entry.author_mail.clone(),
2925 author_time: entry.author_time,
2926 author_tz: entry.author_tz.clone(),
2927 committer: entry.committer.clone(),
2928 committer_mail: entry.committer_mail.clone(),
2929 committer_time: entry.committer_time,
2930 committer_tz: entry.committer_tz.clone(),
2931 summary: entry.summary.clone(),
2932 previous: entry.previous.clone(),
2933 filename: entry.filename.clone(),
2934 })
2935 .collect::<Vec<_>>();
2936
2937 let messages = blame
2938 .messages
2939 .into_iter()
2940 .map(|(oid, message)| proto::CommitMessage {
2941 oid: oid.as_bytes().into(),
2942 message,
2943 })
2944 .collect::<Vec<_>>();
2945
2946 let permalinks = blame
2947 .permalinks
2948 .into_iter()
2949 .map(|(oid, url)| proto::CommitPermalink {
2950 oid: oid.as_bytes().into(),
2951 permalink: url.to_string(),
2952 })
2953 .collect::<Vec<_>>();
2954
2955 proto::BlameBufferResponse {
2956 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2957 entries,
2958 messages,
2959 permalinks,
2960 remote_url: blame.remote_url,
2961 }),
2962 }
2963}
2964
2965fn deserialize_blame_buffer_response(
2966 response: proto::BlameBufferResponse,
2967) -> Option<git::blame::Blame> {
2968 let response = response.blame_response?;
2969 let entries = response
2970 .entries
2971 .into_iter()
2972 .filter_map(|entry| {
2973 Some(git::blame::BlameEntry {
2974 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2975 range: entry.start_line..entry.end_line,
2976 original_line_number: entry.original_line_number,
2977 committer: entry.committer,
2978 committer_time: entry.committer_time,
2979 committer_tz: entry.committer_tz,
2980 committer_mail: entry.committer_mail,
2981 author: entry.author,
2982 author_mail: entry.author_mail,
2983 author_time: entry.author_time,
2984 author_tz: entry.author_tz,
2985 summary: entry.summary,
2986 previous: entry.previous,
2987 filename: entry.filename,
2988 })
2989 })
2990 .collect::<Vec<_>>();
2991
2992 let messages = response
2993 .messages
2994 .into_iter()
2995 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2996 .collect::<HashMap<_, _>>();
2997
2998 let permalinks = response
2999 .permalinks
3000 .into_iter()
3001 .filter_map(|permalink| {
3002 Some((
3003 git::Oid::from_bytes(&permalink.oid).ok()?,
3004 Url::from_str(&permalink.permalink).ok()?,
3005 ))
3006 })
3007 .collect::<HashMap<_, _>>();
3008
3009 Some(Blame {
3010 entries,
3011 permalinks,
3012 messages,
3013 remote_url: response.remote_url,
3014 })
3015}
3016
3017fn get_permalink_in_rust_registry_src(
3018 provider_registry: Arc<GitHostingProviderRegistry>,
3019 path: PathBuf,
3020 selection: Range<u32>,
3021) -> Result<url::Url> {
3022 #[derive(Deserialize)]
3023 struct CargoVcsGit {
3024 sha1: String,
3025 }
3026
3027 #[derive(Deserialize)]
3028 struct CargoVcsInfo {
3029 git: CargoVcsGit,
3030 path_in_vcs: String,
3031 }
3032
3033 #[derive(Deserialize)]
3034 struct CargoPackage {
3035 repository: String,
3036 }
3037
3038 #[derive(Deserialize)]
3039 struct CargoToml {
3040 package: CargoPackage,
3041 }
3042
3043 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
3044 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
3045 Some((dir, json))
3046 }) else {
3047 bail!("No .cargo_vcs_info.json found in parent directories")
3048 };
3049 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
3050 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
3051 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
3052 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
3053 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
3054 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
3055 let permalink = provider.build_permalink(
3056 remote,
3057 BuildPermalinkParams {
3058 sha: &cargo_vcs_info.git.sha1,
3059 path: &path.to_string_lossy(),
3060 selection: Some(selection),
3061 },
3062 );
3063 Ok(permalink)
3064}