1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
13use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
14use gpui::{
15 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
16};
17use http_client::Url;
18use language::{
19 proto::{
20 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
21 split_operations,
22 },
23 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
24};
25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
26use serde::Deserialize;
27use smol::channel::Receiver;
28use std::{
29 io,
30 ops::Range,
31 path::{Path, PathBuf},
32 pin::pin,
33 str::FromStr as _,
34 sync::Arc,
35 time::Instant,
36};
37use text::{BufferId, LineEnding, Rope};
38use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
39use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
40
41/// A set of open buffers.
42pub struct BufferStore {
43 state: BufferStoreState,
44 #[allow(clippy::type_complexity)]
45 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
46 #[allow(clippy::type_complexity)]
47 loading_change_sets:
48 HashMap<BufferId, Shared<Task<Result<Entity<BufferChangeSet>, Arc<anyhow::Error>>>>>,
49 worktree_store: Entity<WorktreeStore>,
50 opened_buffers: HashMap<BufferId, OpenBuffer>,
51 downstream_client: Option<(AnyProtoClient, u64)>,
52 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
53}
54
55#[derive(Hash, Eq, PartialEq, Clone)]
56struct SharedBuffer {
57 buffer: Entity<Buffer>,
58 unstaged_changes: Option<Entity<BufferChangeSet>>,
59 lsp_handle: Option<OpenLspBufferHandle>,
60}
61
62pub struct BufferChangeSet {
63 pub buffer_id: BufferId,
64 pub base_text: Option<language::BufferSnapshot>,
65 pub language: Option<Arc<Language>>,
66 pub diff_to_buffer: git::diff::BufferDiff,
67 pub recalculate_diff_task: Option<Task<Result<()>>>,
68 pub diff_updated_futures: Vec<oneshot::Sender<()>>,
69 pub language_registry: Option<Arc<LanguageRegistry>>,
70}
71
72pub enum BufferChangeSetEvent {
73 DiffChanged { changed_range: Range<text::Anchor> },
74}
75
76enum BufferStoreState {
77 Local(LocalBufferStore),
78 Remote(RemoteBufferStore),
79}
80
81struct RemoteBufferStore {
82 shared_with_me: HashSet<Entity<Buffer>>,
83 upstream_client: AnyProtoClient,
84 project_id: u64,
85 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
86 remote_buffer_listeners:
87 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
88 worktree_store: Entity<WorktreeStore>,
89}
90
91struct LocalBufferStore {
92 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
93 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
94 worktree_store: Entity<WorktreeStore>,
95 _subscription: Subscription,
96}
97
98enum OpenBuffer {
99 Complete {
100 buffer: WeakEntity<Buffer>,
101 unstaged_changes: Option<WeakEntity<BufferChangeSet>>,
102 },
103 Operations(Vec<Operation>),
104}
105
106pub enum BufferStoreEvent {
107 BufferAdded(Entity<Buffer>),
108 BufferDropped(BufferId),
109 BufferChangedFilePath {
110 buffer: Entity<Buffer>,
111 old_file: Option<Arc<dyn language::File>>,
112 },
113}
114
115#[derive(Default, Debug)]
116pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
117
118impl EventEmitter<BufferStoreEvent> for BufferStore {}
119
120impl RemoteBufferStore {
121 fn load_staged_text(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
122 let project_id = self.project_id;
123 let client = self.upstream_client.clone();
124 cx.background_executor().spawn(async move {
125 Ok(client
126 .request(proto::GetStagedText {
127 project_id,
128 buffer_id: buffer_id.to_proto(),
129 })
130 .await?
131 .staged_text)
132 })
133 }
134 pub fn wait_for_remote_buffer(
135 &mut self,
136 id: BufferId,
137 cx: &mut Context<BufferStore>,
138 ) -> Task<Result<Entity<Buffer>>> {
139 let (tx, rx) = oneshot::channel();
140 self.remote_buffer_listeners.entry(id).or_default().push(tx);
141
142 cx.spawn(|this, cx| async move {
143 if let Some(buffer) = this
144 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
145 .ok()
146 .flatten()
147 {
148 return Ok(buffer);
149 }
150
151 cx.background_executor()
152 .spawn(async move { rx.await? })
153 .await
154 })
155 }
156
157 fn save_remote_buffer(
158 &self,
159 buffer_handle: Entity<Buffer>,
160 new_path: Option<proto::ProjectPath>,
161 cx: &Context<BufferStore>,
162 ) -> Task<Result<()>> {
163 let buffer = buffer_handle.read(cx);
164 let buffer_id = buffer.remote_id().into();
165 let version = buffer.version();
166 let rpc = self.upstream_client.clone();
167 let project_id = self.project_id;
168 cx.spawn(move |_, mut cx| async move {
169 let response = rpc
170 .request(proto::SaveBuffer {
171 project_id,
172 buffer_id,
173 new_path,
174 version: serialize_version(&version),
175 })
176 .await?;
177 let version = deserialize_version(&response.version);
178 let mtime = response.mtime.map(|mtime| mtime.into());
179
180 buffer_handle.update(&mut cx, |buffer, cx| {
181 buffer.did_save(version.clone(), mtime, cx);
182 })?;
183
184 Ok(())
185 })
186 }
187
188 pub fn handle_create_buffer_for_peer(
189 &mut self,
190 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
191 replica_id: u16,
192 capability: Capability,
193 cx: &mut Context<BufferStore>,
194 ) -> Result<Option<Entity<Buffer>>> {
195 match envelope
196 .payload
197 .variant
198 .ok_or_else(|| anyhow!("missing variant"))?
199 {
200 proto::create_buffer_for_peer::Variant::State(mut state) => {
201 let buffer_id = BufferId::new(state.id)?;
202
203 let buffer_result = maybe!({
204 let mut buffer_file = None;
205 if let Some(file) = state.file.take() {
206 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
207 let worktree = self
208 .worktree_store
209 .read(cx)
210 .worktree_for_id(worktree_id, cx)
211 .ok_or_else(|| {
212 anyhow!("no worktree found for id {}", file.worktree_id)
213 })?;
214 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
215 as Arc<dyn language::File>);
216 }
217 Buffer::from_proto(replica_id, capability, state, buffer_file)
218 });
219
220 match buffer_result {
221 Ok(buffer) => {
222 let buffer = cx.new(|_| buffer);
223 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
224 }
225 Err(error) => {
226 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
227 for listener in listeners {
228 listener.send(Err(anyhow!(error.cloned()))).ok();
229 }
230 }
231 }
232 }
233 }
234 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
235 let buffer_id = BufferId::new(chunk.buffer_id)?;
236 let buffer = self
237 .loading_remote_buffers_by_id
238 .get(&buffer_id)
239 .cloned()
240 .ok_or_else(|| {
241 anyhow!(
242 "received chunk for buffer {} without initial state",
243 chunk.buffer_id
244 )
245 })?;
246
247 let result = maybe!({
248 let operations = chunk
249 .operations
250 .into_iter()
251 .map(language::proto::deserialize_operation)
252 .collect::<Result<Vec<_>>>()?;
253 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
254 anyhow::Ok(())
255 });
256
257 if let Err(error) = result {
258 self.loading_remote_buffers_by_id.remove(&buffer_id);
259 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
260 for listener in listeners {
261 listener.send(Err(error.cloned())).ok();
262 }
263 }
264 } else if chunk.is_last {
265 self.loading_remote_buffers_by_id.remove(&buffer_id);
266 if self.upstream_client.is_via_collab() {
267 // retain buffers sent by peers to avoid races.
268 self.shared_with_me.insert(buffer.clone());
269 }
270
271 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
272 for sender in senders {
273 sender.send(Ok(buffer.clone())).ok();
274 }
275 }
276 return Ok(Some(buffer));
277 }
278 }
279 }
280 return Ok(None);
281 }
282
283 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
284 self.loading_remote_buffers_by_id
285 .keys()
286 .copied()
287 .collect::<Vec<_>>()
288 }
289
290 pub fn deserialize_project_transaction(
291 &self,
292 message: proto::ProjectTransaction,
293 push_to_history: bool,
294 cx: &mut Context<BufferStore>,
295 ) -> Task<Result<ProjectTransaction>> {
296 cx.spawn(|this, mut cx| async move {
297 let mut project_transaction = ProjectTransaction::default();
298 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
299 {
300 let buffer_id = BufferId::new(buffer_id)?;
301 let buffer = this
302 .update(&mut cx, |this, cx| {
303 this.wait_for_remote_buffer(buffer_id, cx)
304 })?
305 .await?;
306 let transaction = language::proto::deserialize_transaction(transaction)?;
307 project_transaction.0.insert(buffer, transaction);
308 }
309
310 for (buffer, transaction) in &project_transaction.0 {
311 buffer
312 .update(&mut cx, |buffer, _| {
313 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
314 })?
315 .await?;
316
317 if push_to_history {
318 buffer.update(&mut cx, |buffer, _| {
319 buffer.push_transaction(transaction.clone(), Instant::now());
320 })?;
321 }
322 }
323
324 Ok(project_transaction)
325 })
326 }
327
328 fn open_buffer(
329 &self,
330 path: Arc<Path>,
331 worktree: Entity<Worktree>,
332 skip_file_contents: bool,
333 cx: &mut Context<BufferStore>,
334 ) -> Task<Result<Entity<Buffer>>> {
335 let worktree_id = worktree.read(cx).id().to_proto();
336 let project_id = self.project_id;
337 let client = self.upstream_client.clone();
338 let path_string = path.clone().to_string_lossy().to_string();
339 cx.spawn(move |this, mut cx| async move {
340 let response = client
341 .request(proto::OpenBufferByPath {
342 project_id,
343 worktree_id,
344 path: path_string,
345 skip_file_contents,
346 })
347 .await?;
348 let buffer_id = BufferId::new(response.buffer_id)?;
349
350 let buffer = this
351 .update(&mut cx, {
352 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
353 })?
354 .await?;
355
356 Ok(buffer)
357 })
358 }
359
360 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
361 let create = self.upstream_client.request(proto::OpenNewBuffer {
362 project_id: self.project_id,
363 });
364 cx.spawn(|this, mut cx| async move {
365 let response = create.await?;
366 let buffer_id = BufferId::new(response.buffer_id)?;
367
368 this.update(&mut cx, |this, cx| {
369 this.wait_for_remote_buffer(buffer_id, cx)
370 })?
371 .await
372 })
373 }
374
375 fn reload_buffers(
376 &self,
377 buffers: HashSet<Entity<Buffer>>,
378 push_to_history: bool,
379 cx: &mut Context<BufferStore>,
380 ) -> Task<Result<ProjectTransaction>> {
381 let request = self.upstream_client.request(proto::ReloadBuffers {
382 project_id: self.project_id,
383 buffer_ids: buffers
384 .iter()
385 .map(|buffer| buffer.read(cx).remote_id().to_proto())
386 .collect(),
387 });
388
389 cx.spawn(|this, mut cx| async move {
390 let response = request
391 .await?
392 .transaction
393 .ok_or_else(|| anyhow!("missing transaction"))?;
394 this.update(&mut cx, |this, cx| {
395 this.deserialize_project_transaction(response, push_to_history, cx)
396 })?
397 .await
398 })
399 }
400}
401
402impl LocalBufferStore {
403 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
404 let Some(file) = buffer.read(cx).file() else {
405 return Task::ready(Ok(None));
406 };
407 let worktree_id = file.worktree_id(cx);
408 let path = file.path().clone();
409 let Some(worktree) = self
410 .worktree_store
411 .read(cx)
412 .worktree_for_id(worktree_id, cx)
413 else {
414 return Task::ready(Err(anyhow!("no such worktree")));
415 };
416
417 worktree.read(cx).load_staged_file(path.as_ref(), cx)
418 }
419
420 fn save_local_buffer(
421 &self,
422 buffer_handle: Entity<Buffer>,
423 worktree: Entity<Worktree>,
424 path: Arc<Path>,
425 mut has_changed_file: bool,
426 cx: &mut Context<BufferStore>,
427 ) -> Task<Result<()>> {
428 let buffer = buffer_handle.read(cx);
429
430 let text = buffer.as_rope().clone();
431 let line_ending = buffer.line_ending();
432 let version = buffer.version();
433 let buffer_id = buffer.remote_id();
434 if buffer
435 .file()
436 .is_some_and(|file| file.disk_state() == DiskState::New)
437 {
438 has_changed_file = true;
439 }
440
441 let save = worktree.update(cx, |worktree, cx| {
442 worktree.write_file(path.as_ref(), text, line_ending, cx)
443 });
444
445 cx.spawn(move |this, mut cx| async move {
446 let new_file = save.await?;
447 let mtime = new_file.disk_state().mtime();
448 this.update(&mut cx, |this, cx| {
449 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
450 if has_changed_file {
451 downstream_client
452 .send(proto::UpdateBufferFile {
453 project_id,
454 buffer_id: buffer_id.to_proto(),
455 file: Some(language::File::to_proto(&*new_file, cx)),
456 })
457 .log_err();
458 }
459 downstream_client
460 .send(proto::BufferSaved {
461 project_id,
462 buffer_id: buffer_id.to_proto(),
463 version: serialize_version(&version),
464 mtime: mtime.map(|time| time.into()),
465 })
466 .log_err();
467 }
468 })?;
469 buffer_handle.update(&mut cx, |buffer, cx| {
470 if has_changed_file {
471 buffer.file_updated(new_file, cx);
472 }
473 buffer.did_save(version.clone(), mtime, cx);
474 })
475 })
476 }
477
478 fn subscribe_to_worktree(
479 &mut self,
480 worktree: &Entity<Worktree>,
481 cx: &mut Context<BufferStore>,
482 ) {
483 cx.subscribe(worktree, |this, worktree, event, cx| {
484 if worktree.read(cx).is_local() {
485 match event {
486 worktree::Event::UpdatedEntries(changes) => {
487 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
488 }
489 worktree::Event::UpdatedGitRepositories(updated_repos) => {
490 Self::local_worktree_git_repos_changed(
491 this,
492 worktree.clone(),
493 updated_repos,
494 cx,
495 )
496 }
497 _ => {}
498 }
499 }
500 })
501 .detach();
502 }
503
504 fn local_worktree_entries_changed(
505 this: &mut BufferStore,
506 worktree_handle: &Entity<Worktree>,
507 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
508 cx: &mut Context<BufferStore>,
509 ) {
510 let snapshot = worktree_handle.read(cx).snapshot();
511 for (path, entry_id, _) in changes {
512 Self::local_worktree_entry_changed(
513 this,
514 *entry_id,
515 path,
516 worktree_handle,
517 &snapshot,
518 cx,
519 );
520 }
521 }
522
523 fn local_worktree_git_repos_changed(
524 this: &mut BufferStore,
525 worktree_handle: Entity<Worktree>,
526 changed_repos: &UpdatedGitRepositoriesSet,
527 cx: &mut Context<BufferStore>,
528 ) {
529 debug_assert!(worktree_handle.read(cx).is_local());
530
531 let buffer_change_sets = this
532 .opened_buffers
533 .values()
534 .filter_map(|buffer| {
535 if let OpenBuffer::Complete {
536 buffer,
537 unstaged_changes,
538 } = buffer
539 {
540 let buffer = buffer.upgrade()?.read(cx);
541 let file = File::from_dyn(buffer.file())?;
542 if file.worktree != worktree_handle {
543 return None;
544 }
545 changed_repos
546 .iter()
547 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
548 let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
549 let snapshot = buffer.text_snapshot();
550 Some((unstaged_changes, snapshot, file.path.clone()))
551 } else {
552 None
553 }
554 })
555 .collect::<Vec<_>>();
556
557 if buffer_change_sets.is_empty() {
558 return;
559 }
560
561 cx.spawn(move |this, mut cx| async move {
562 let snapshot =
563 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
564 let diff_bases_by_buffer = cx
565 .background_executor()
566 .spawn(async move {
567 buffer_change_sets
568 .into_iter()
569 .filter_map(|(change_set, buffer_snapshot, path)| {
570 let local_repo = snapshot.local_repo_for_path(&path)?;
571 let relative_path = local_repo.relativize(&path).ok()?;
572 let base_text = local_repo.repo().load_index_text(&relative_path);
573 Some((change_set, buffer_snapshot, base_text))
574 })
575 .collect::<Vec<_>>()
576 })
577 .await;
578
579 this.update(&mut cx, |this, cx| {
580 for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
581 change_set.update(cx, |change_set, cx| {
582 if let Some(staged_text) = staged_text.clone() {
583 let _ =
584 change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
585 } else {
586 change_set.unset_base_text(buffer_snapshot.clone(), cx);
587 }
588 });
589
590 if let Some((client, project_id)) = &this.downstream_client.clone() {
591 client
592 .send(proto::UpdateDiffBase {
593 project_id: *project_id,
594 buffer_id: buffer_snapshot.remote_id().to_proto(),
595 staged_text,
596 })
597 .log_err();
598 }
599 }
600 })
601 })
602 .detach_and_log_err(cx);
603 }
604
605 fn local_worktree_entry_changed(
606 this: &mut BufferStore,
607 entry_id: ProjectEntryId,
608 path: &Arc<Path>,
609 worktree: &Entity<worktree::Worktree>,
610 snapshot: &worktree::Snapshot,
611 cx: &mut Context<BufferStore>,
612 ) -> Option<()> {
613 let project_path = ProjectPath {
614 worktree_id: snapshot.id(),
615 path: path.clone(),
616 };
617
618 let buffer_id = {
619 let local = this.as_local_mut()?;
620 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
621 Some(&buffer_id) => buffer_id,
622 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
623 }
624 };
625
626 let buffer = if let Some(buffer) = this.get(buffer_id) {
627 Some(buffer)
628 } else {
629 this.opened_buffers.remove(&buffer_id);
630 None
631 };
632
633 let buffer = if let Some(buffer) = buffer {
634 buffer
635 } else {
636 let this = this.as_local_mut()?;
637 this.local_buffer_ids_by_path.remove(&project_path);
638 this.local_buffer_ids_by_entry_id.remove(&entry_id);
639 return None;
640 };
641
642 let events = buffer.update(cx, |buffer, cx| {
643 let local = this.as_local_mut()?;
644 let file = buffer.file()?;
645 let old_file = File::from_dyn(Some(file))?;
646 if old_file.worktree != *worktree {
647 return None;
648 }
649
650 let snapshot_entry = old_file
651 .entry_id
652 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
653 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
654
655 let new_file = if let Some(entry) = snapshot_entry {
656 File {
657 disk_state: match entry.mtime {
658 Some(mtime) => DiskState::Present { mtime },
659 None => old_file.disk_state,
660 },
661 is_local: true,
662 entry_id: Some(entry.id),
663 path: entry.path.clone(),
664 worktree: worktree.clone(),
665 is_private: entry.is_private,
666 }
667 } else {
668 File {
669 disk_state: DiskState::Deleted,
670 is_local: true,
671 entry_id: old_file.entry_id,
672 path: old_file.path.clone(),
673 worktree: worktree.clone(),
674 is_private: old_file.is_private,
675 }
676 };
677
678 if new_file == *old_file {
679 return None;
680 }
681
682 let mut events = Vec::new();
683 if new_file.path != old_file.path {
684 local.local_buffer_ids_by_path.remove(&ProjectPath {
685 path: old_file.path.clone(),
686 worktree_id: old_file.worktree_id(cx),
687 });
688 local.local_buffer_ids_by_path.insert(
689 ProjectPath {
690 worktree_id: new_file.worktree_id(cx),
691 path: new_file.path.clone(),
692 },
693 buffer_id,
694 );
695 events.push(BufferStoreEvent::BufferChangedFilePath {
696 buffer: cx.entity(),
697 old_file: buffer.file().cloned(),
698 });
699 }
700
701 if new_file.entry_id != old_file.entry_id {
702 if let Some(entry_id) = old_file.entry_id {
703 local.local_buffer_ids_by_entry_id.remove(&entry_id);
704 }
705 if let Some(entry_id) = new_file.entry_id {
706 local
707 .local_buffer_ids_by_entry_id
708 .insert(entry_id, buffer_id);
709 }
710 }
711
712 if let Some((client, project_id)) = &this.downstream_client {
713 client
714 .send(proto::UpdateBufferFile {
715 project_id: *project_id,
716 buffer_id: buffer_id.to_proto(),
717 file: Some(new_file.to_proto(cx)),
718 })
719 .ok();
720 }
721
722 buffer.file_updated(Arc::new(new_file), cx);
723 Some(events)
724 })?;
725
726 for event in events {
727 cx.emit(event);
728 }
729
730 None
731 }
732
733 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
734 let file = File::from_dyn(buffer.read(cx).file())?;
735
736 let remote_id = buffer.read(cx).remote_id();
737 if let Some(entry_id) = file.entry_id {
738 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
739 Some(_) => {
740 return None;
741 }
742 None => {
743 self.local_buffer_ids_by_entry_id
744 .insert(entry_id, remote_id);
745 }
746 }
747 };
748 self.local_buffer_ids_by_path.insert(
749 ProjectPath {
750 worktree_id: file.worktree_id(cx),
751 path: file.path.clone(),
752 },
753 remote_id,
754 );
755
756 Some(())
757 }
758
759 fn save_buffer(
760 &self,
761 buffer: Entity<Buffer>,
762 cx: &mut Context<BufferStore>,
763 ) -> Task<Result<()>> {
764 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
765 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
766 };
767 let worktree = file.worktree.clone();
768 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
769 }
770
771 fn save_buffer_as(
772 &self,
773 buffer: Entity<Buffer>,
774 path: ProjectPath,
775 cx: &mut Context<BufferStore>,
776 ) -> Task<Result<()>> {
777 let Some(worktree) = self
778 .worktree_store
779 .read(cx)
780 .worktree_for_id(path.worktree_id, cx)
781 else {
782 return Task::ready(Err(anyhow!("no such worktree")));
783 };
784 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
785 }
786
787 fn open_buffer(
788 &self,
789 path: Arc<Path>,
790 worktree: Entity<Worktree>,
791 skip_file_contents: bool,
792 cx: &mut Context<BufferStore>,
793 ) -> Task<Result<Entity<Buffer>>> {
794 let load_buffer = worktree.update(cx, |worktree, cx| {
795 let load_file = worktree.load_file(path.as_ref(), skip_file_contents, cx);
796 let reservation = cx.reserve_entity();
797 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
798 cx.spawn(move |_, mut cx| async move {
799 let loaded = load_file.await?;
800 let text_buffer = cx
801 .background_executor()
802 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
803 .await;
804 cx.insert_entity(reservation, |_| {
805 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
806 })
807 })
808 });
809
810 cx.spawn(move |this, mut cx| async move {
811 let buffer = match load_buffer.await {
812 Ok(buffer) => Ok(buffer),
813 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
814 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
815 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
816 Buffer::build(
817 text_buffer,
818 Some(Arc::new(File {
819 worktree,
820 path,
821 disk_state: DiskState::New,
822 entry_id: None,
823 is_local: true,
824 is_private: false,
825 })),
826 Capability::ReadWrite,
827 )
828 }),
829 Err(e) => Err(e),
830 }?;
831 this.update(&mut cx, |this, cx| {
832 this.add_buffer(buffer.clone(), cx)?;
833 let buffer_id = buffer.read(cx).remote_id();
834 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
835 let this = this.as_local_mut().unwrap();
836 this.local_buffer_ids_by_path.insert(
837 ProjectPath {
838 worktree_id: file.worktree_id(cx),
839 path: file.path.clone(),
840 },
841 buffer_id,
842 );
843
844 if let Some(entry_id) = file.entry_id {
845 this.local_buffer_ids_by_entry_id
846 .insert(entry_id, buffer_id);
847 }
848 }
849
850 anyhow::Ok(())
851 })??;
852
853 Ok(buffer)
854 })
855 }
856
857 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
858 cx.spawn(|buffer_store, mut cx| async move {
859 let buffer =
860 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
861 buffer_store.update(&mut cx, |buffer_store, cx| {
862 buffer_store.add_buffer(buffer.clone(), cx).log_err();
863 })?;
864 Ok(buffer)
865 })
866 }
867
868 fn reload_buffers(
869 &self,
870 buffers: HashSet<Entity<Buffer>>,
871 push_to_history: bool,
872 cx: &mut Context<BufferStore>,
873 ) -> Task<Result<ProjectTransaction>> {
874 cx.spawn(move |_, mut cx| async move {
875 let mut project_transaction = ProjectTransaction::default();
876 for buffer in buffers {
877 let transaction = buffer
878 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
879 .await?;
880 buffer.update(&mut cx, |buffer, cx| {
881 if let Some(transaction) = transaction {
882 if !push_to_history {
883 buffer.forget_transaction(transaction.id);
884 }
885 project_transaction.0.insert(cx.entity(), transaction);
886 }
887 })?;
888 }
889
890 Ok(project_transaction)
891 })
892 }
893}
894
895impl BufferStore {
896 pub fn init(client: &AnyProtoClient) {
897 client.add_model_message_handler(Self::handle_buffer_reloaded);
898 client.add_model_message_handler(Self::handle_buffer_saved);
899 client.add_model_message_handler(Self::handle_update_buffer_file);
900 client.add_model_request_handler(Self::handle_save_buffer);
901 client.add_model_request_handler(Self::handle_blame_buffer);
902 client.add_model_request_handler(Self::handle_reload_buffers);
903 client.add_model_request_handler(Self::handle_get_permalink_to_line);
904 client.add_model_request_handler(Self::handle_get_staged_text);
905 client.add_model_message_handler(Self::handle_update_diff_base);
906 }
907
908 /// Creates a buffer store, optionally retaining its buffers.
909 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
910 Self {
911 state: BufferStoreState::Local(LocalBufferStore {
912 local_buffer_ids_by_path: Default::default(),
913 local_buffer_ids_by_entry_id: Default::default(),
914 worktree_store: worktree_store.clone(),
915 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
916 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
917 let this = this.as_local_mut().unwrap();
918 this.subscribe_to_worktree(worktree, cx);
919 }
920 }),
921 }),
922 downstream_client: None,
923 opened_buffers: Default::default(),
924 shared_buffers: Default::default(),
925 loading_buffers: Default::default(),
926 loading_change_sets: Default::default(),
927 worktree_store,
928 }
929 }
930
931 pub fn remote(
932 worktree_store: Entity<WorktreeStore>,
933 upstream_client: AnyProtoClient,
934 remote_id: u64,
935 _cx: &mut Context<Self>,
936 ) -> Self {
937 Self {
938 state: BufferStoreState::Remote(RemoteBufferStore {
939 shared_with_me: Default::default(),
940 loading_remote_buffers_by_id: Default::default(),
941 remote_buffer_listeners: Default::default(),
942 project_id: remote_id,
943 upstream_client,
944 worktree_store: worktree_store.clone(),
945 }),
946 downstream_client: None,
947 opened_buffers: Default::default(),
948 loading_buffers: Default::default(),
949 loading_change_sets: Default::default(),
950 shared_buffers: Default::default(),
951 worktree_store,
952 }
953 }
954
955 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
956 match &mut self.state {
957 BufferStoreState::Local(state) => Some(state),
958 _ => None,
959 }
960 }
961
962 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
963 match &mut self.state {
964 BufferStoreState::Remote(state) => Some(state),
965 _ => None,
966 }
967 }
968
969 fn as_remote(&self) -> Option<&RemoteBufferStore> {
970 match &self.state {
971 BufferStoreState::Remote(state) => Some(state),
972 _ => None,
973 }
974 }
975
976 pub fn open_buffer(
977 &mut self,
978 project_path: ProjectPath,
979 skip_file_contents: bool,
980 cx: &mut Context<Self>,
981 ) -> Task<Result<Entity<Buffer>>> {
982 if let Some(buffer) = self.get_by_path(&project_path, cx) {
983 return Task::ready(Ok(buffer));
984 }
985
986 let task = match self.loading_buffers.entry(project_path.clone()) {
987 hash_map::Entry::Occupied(e) => e.get().clone(),
988 hash_map::Entry::Vacant(entry) => {
989 let path = project_path.path.clone();
990 let Some(worktree) = self
991 .worktree_store
992 .read(cx)
993 .worktree_for_id(project_path.worktree_id, cx)
994 else {
995 return Task::ready(Err(anyhow!("no such worktree")));
996 };
997 let load_buffer = match &self.state {
998 BufferStoreState::Local(this) => {
999 this.open_buffer(path, worktree, skip_file_contents, cx)
1000 }
1001 BufferStoreState::Remote(this) => {
1002 this.open_buffer(path, worktree, skip_file_contents, cx)
1003 }
1004 };
1005
1006 entry
1007 .insert(
1008 cx.spawn(move |this, mut cx| async move {
1009 let load_result = load_buffer.await;
1010 this.update(&mut cx, |this, _cx| {
1011 // Record the fact that the buffer is no longer loading.
1012 this.loading_buffers.remove(&project_path);
1013 })
1014 .ok();
1015 load_result.map_err(Arc::new)
1016 })
1017 .shared(),
1018 )
1019 .clone()
1020 }
1021 };
1022
1023 cx.background_executor()
1024 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1025 }
1026
1027 pub fn open_unstaged_changes(
1028 &mut self,
1029 buffer: Entity<Buffer>,
1030 cx: &mut Context<Self>,
1031 ) -> Task<Result<Entity<BufferChangeSet>>> {
1032 let buffer_id = buffer.read(cx).remote_id();
1033 if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
1034 return Task::ready(Ok(change_set));
1035 }
1036
1037 let task = match self.loading_change_sets.entry(buffer_id) {
1038 hash_map::Entry::Occupied(e) => e.get().clone(),
1039 hash_map::Entry::Vacant(entry) => {
1040 let load = match &self.state {
1041 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1042 BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
1043 };
1044
1045 entry
1046 .insert(
1047 cx.spawn(move |this, cx| async move {
1048 Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
1049 .await
1050 .map_err(Arc::new)
1051 })
1052 .shared(),
1053 )
1054 .clone()
1055 }
1056 };
1057
1058 cx.background_executor()
1059 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1060 }
1061
1062 #[cfg(any(test, feature = "test-support"))]
1063 pub fn set_change_set(&mut self, buffer_id: BufferId, change_set: Entity<BufferChangeSet>) {
1064 self.loading_change_sets
1065 .insert(buffer_id, Task::ready(Ok(change_set)).shared());
1066 }
1067
1068 pub async fn open_unstaged_changes_internal(
1069 this: WeakEntity<Self>,
1070 text: Result<Option<String>>,
1071 buffer: Entity<Buffer>,
1072 mut cx: AsyncApp,
1073 ) -> Result<Entity<BufferChangeSet>> {
1074 let text = match text {
1075 Err(e) => {
1076 this.update(&mut cx, |this, cx| {
1077 let buffer_id = buffer.read(cx).remote_id();
1078 this.loading_change_sets.remove(&buffer_id);
1079 })?;
1080 return Err(e);
1081 }
1082 Ok(text) => text,
1083 };
1084
1085 let change_set = cx.new(|cx| BufferChangeSet::new(&buffer, cx)).unwrap();
1086
1087 if let Some(text) = text {
1088 change_set
1089 .update(&mut cx, |change_set, cx| {
1090 let snapshot = buffer.read(cx).text_snapshot();
1091 change_set.set_base_text(text, snapshot, cx)
1092 })?
1093 .await
1094 .ok();
1095 }
1096
1097 this.update(&mut cx, |this, cx| {
1098 let buffer_id = buffer.read(cx).remote_id();
1099 this.loading_change_sets.remove(&buffer_id);
1100 if let Some(OpenBuffer::Complete {
1101 unstaged_changes, ..
1102 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1103 {
1104 *unstaged_changes = Some(change_set.downgrade());
1105 }
1106 })?;
1107
1108 Ok(change_set)
1109 }
1110
1111 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1112 match &self.state {
1113 BufferStoreState::Local(this) => this.create_buffer(cx),
1114 BufferStoreState::Remote(this) => this.create_buffer(cx),
1115 }
1116 }
1117
1118 pub fn save_buffer(
1119 &mut self,
1120 buffer: Entity<Buffer>,
1121 cx: &mut Context<Self>,
1122 ) -> Task<Result<()>> {
1123 match &mut self.state {
1124 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1125 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1126 }
1127 }
1128
1129 pub fn save_buffer_as(
1130 &mut self,
1131 buffer: Entity<Buffer>,
1132 path: ProjectPath,
1133 cx: &mut Context<Self>,
1134 ) -> Task<Result<()>> {
1135 let old_file = buffer.read(cx).file().cloned();
1136 let task = match &self.state {
1137 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1138 BufferStoreState::Remote(this) => {
1139 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1140 }
1141 };
1142 cx.spawn(|this, mut cx| async move {
1143 task.await?;
1144 this.update(&mut cx, |_, cx| {
1145 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1146 })
1147 })
1148 }
1149
1150 pub fn blame_buffer(
1151 &self,
1152 buffer: &Entity<Buffer>,
1153 version: Option<clock::Global>,
1154 cx: &App,
1155 ) -> Task<Result<Option<Blame>>> {
1156 let buffer = buffer.read(cx);
1157 let Some(file) = File::from_dyn(buffer.file()) else {
1158 return Task::ready(Err(anyhow!("buffer has no file")));
1159 };
1160
1161 match file.worktree.clone().read(cx) {
1162 Worktree::Local(worktree) => {
1163 let worktree = worktree.snapshot();
1164 let blame_params = maybe!({
1165 let local_repo = match worktree.local_repo_for_path(&file.path) {
1166 Some(repo_for_path) => repo_for_path,
1167 None => return Ok(None),
1168 };
1169
1170 let relative_path = local_repo
1171 .relativize(&file.path)
1172 .context("failed to relativize buffer path")?;
1173
1174 let repo = local_repo.repo().clone();
1175
1176 let content = match version {
1177 Some(version) => buffer.rope_for_version(&version).clone(),
1178 None => buffer.as_rope().clone(),
1179 };
1180
1181 anyhow::Ok(Some((repo, relative_path, content)))
1182 });
1183
1184 cx.background_executor().spawn(async move {
1185 let Some((repo, relative_path, content)) = blame_params? else {
1186 return Ok(None);
1187 };
1188 repo.blame(&relative_path, content)
1189 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1190 .map(Some)
1191 })
1192 }
1193 Worktree::Remote(worktree) => {
1194 let buffer_id = buffer.remote_id();
1195 let version = buffer.version();
1196 let project_id = worktree.project_id();
1197 let client = worktree.client();
1198 cx.spawn(|_| async move {
1199 let response = client
1200 .request(proto::BlameBuffer {
1201 project_id,
1202 buffer_id: buffer_id.into(),
1203 version: serialize_version(&version),
1204 })
1205 .await?;
1206 Ok(deserialize_blame_buffer_response(response))
1207 })
1208 }
1209 }
1210 }
1211
1212 pub fn get_permalink_to_line(
1213 &self,
1214 buffer: &Entity<Buffer>,
1215 selection: Range<u32>,
1216 cx: &App,
1217 ) -> Task<Result<url::Url>> {
1218 let buffer = buffer.read(cx);
1219 let Some(file) = File::from_dyn(buffer.file()) else {
1220 return Task::ready(Err(anyhow!("buffer has no file")));
1221 };
1222
1223 match file.worktree.read(cx) {
1224 Worktree::Local(worktree) => {
1225 let worktree_path = worktree.abs_path().clone();
1226 let Some((repo_entry, repo)) =
1227 worktree.repository_for_path(file.path()).and_then(|entry| {
1228 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1229 Some((entry, repo))
1230 })
1231 else {
1232 // If we're not in a Git repo, check whether this is a Rust source
1233 // file in the Cargo registry (presumably opened with go-to-definition
1234 // from a normal Rust file). If so, we can put together a permalink
1235 // using crate metadata.
1236 if !buffer
1237 .language()
1238 .is_some_and(|lang| lang.name() == "Rust".into())
1239 {
1240 return Task::ready(Err(anyhow!("no permalink available")));
1241 }
1242 let file_path = worktree_path.join(file.path());
1243 return cx.spawn(|cx| async move {
1244 let provider_registry =
1245 cx.update(GitHostingProviderRegistry::default_global)?;
1246 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1247 .map_err(|_| anyhow!("no permalink available"))
1248 });
1249 };
1250
1251 let path = match repo_entry.relativize(file.path()) {
1252 Ok(RepoPath(path)) => path,
1253 Err(e) => return Task::ready(Err(e)),
1254 };
1255
1256 cx.spawn(|cx| async move {
1257 const REMOTE_NAME: &str = "origin";
1258 let origin_url = repo
1259 .remote_url(REMOTE_NAME)
1260 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1261
1262 let sha = repo
1263 .head_sha()
1264 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1265
1266 let provider_registry =
1267 cx.update(GitHostingProviderRegistry::default_global)?;
1268
1269 let (provider, remote) =
1270 parse_git_remote_url(provider_registry, &origin_url)
1271 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1272
1273 let path = path
1274 .to_str()
1275 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1276
1277 Ok(provider.build_permalink(
1278 remote,
1279 BuildPermalinkParams {
1280 sha: &sha,
1281 path,
1282 selection: Some(selection),
1283 },
1284 ))
1285 })
1286 }
1287 Worktree::Remote(worktree) => {
1288 let buffer_id = buffer.remote_id();
1289 let project_id = worktree.project_id();
1290 let client = worktree.client();
1291 cx.spawn(|_| async move {
1292 let response = client
1293 .request(proto::GetPermalinkToLine {
1294 project_id,
1295 buffer_id: buffer_id.into(),
1296 selection: Some(proto::Range {
1297 start: selection.start as u64,
1298 end: selection.end as u64,
1299 }),
1300 })
1301 .await?;
1302
1303 url::Url::parse(&response.permalink).context("failed to parse permalink")
1304 })
1305 }
1306 }
1307 }
1308
1309 fn add_buffer(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1310 let remote_id = buffer.read(cx).remote_id();
1311 let is_remote = buffer.read(cx).replica_id() != 0;
1312 let open_buffer = OpenBuffer::Complete {
1313 buffer: buffer.downgrade(),
1314 unstaged_changes: None,
1315 };
1316
1317 let handle = cx.entity().downgrade();
1318 buffer.update(cx, move |_, cx| {
1319 cx.on_release(move |buffer, cx| {
1320 handle
1321 .update(cx, |_, cx| {
1322 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1323 })
1324 .ok();
1325 })
1326 .detach()
1327 });
1328
1329 match self.opened_buffers.entry(remote_id) {
1330 hash_map::Entry::Vacant(entry) => {
1331 entry.insert(open_buffer);
1332 }
1333 hash_map::Entry::Occupied(mut entry) => {
1334 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1335 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1336 } else if entry.get().upgrade().is_some() {
1337 if is_remote {
1338 return Ok(());
1339 } else {
1340 debug_panic!("buffer {} was already registered", remote_id);
1341 Err(anyhow!("buffer {} was already registered", remote_id))?;
1342 }
1343 }
1344 entry.insert(open_buffer);
1345 }
1346 }
1347
1348 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1349 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1350 Ok(())
1351 }
1352
1353 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1354 self.opened_buffers
1355 .values()
1356 .filter_map(|buffer| buffer.upgrade())
1357 }
1358
1359 pub fn loading_buffers(
1360 &self,
1361 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1362 self.loading_buffers.iter().map(|(path, task)| {
1363 let task = task.clone();
1364 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1365 })
1366 }
1367
1368 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1369 self.buffers().find_map(|buffer| {
1370 let file = File::from_dyn(buffer.read(cx).file())?;
1371 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1372 Some(buffer)
1373 } else {
1374 None
1375 }
1376 })
1377 }
1378
1379 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1380 self.opened_buffers.get(&buffer_id)?.upgrade()
1381 }
1382
1383 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1384 self.get(buffer_id)
1385 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1386 }
1387
1388 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1389 self.get(buffer_id).or_else(|| {
1390 self.as_remote()
1391 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1392 })
1393 }
1394
1395 pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Entity<BufferChangeSet>> {
1396 if let OpenBuffer::Complete {
1397 unstaged_changes, ..
1398 } = self.opened_buffers.get(&buffer_id)?
1399 {
1400 unstaged_changes.as_ref()?.upgrade()
1401 } else {
1402 None
1403 }
1404 }
1405
1406 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1407 let buffers = self
1408 .buffers()
1409 .map(|buffer| {
1410 let buffer = buffer.read(cx);
1411 proto::BufferVersion {
1412 id: buffer.remote_id().into(),
1413 version: language::proto::serialize_version(&buffer.version),
1414 }
1415 })
1416 .collect();
1417 let incomplete_buffer_ids = self
1418 .as_remote()
1419 .map(|remote| remote.incomplete_buffer_ids())
1420 .unwrap_or_default();
1421 (buffers, incomplete_buffer_ids)
1422 }
1423
1424 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1425 for open_buffer in self.opened_buffers.values_mut() {
1426 if let Some(buffer) = open_buffer.upgrade() {
1427 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1428 }
1429 }
1430
1431 for buffer in self.buffers() {
1432 buffer.update(cx, |buffer, cx| {
1433 buffer.set_capability(Capability::ReadOnly, cx)
1434 });
1435 }
1436
1437 if let Some(remote) = self.as_remote_mut() {
1438 // Wake up all futures currently waiting on a buffer to get opened,
1439 // to give them a chance to fail now that we've disconnected.
1440 remote.remote_buffer_listeners.clear()
1441 }
1442 }
1443
1444 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1445 self.downstream_client = Some((downstream_client, remote_id));
1446 }
1447
1448 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1449 self.downstream_client.take();
1450 self.forget_shared_buffers();
1451 }
1452
1453 pub fn discard_incomplete(&mut self) {
1454 self.opened_buffers
1455 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1456 }
1457
1458 pub fn find_search_candidates(
1459 &mut self,
1460 query: &SearchQuery,
1461 mut limit: usize,
1462 fs: Arc<dyn Fs>,
1463 cx: &mut Context<Self>,
1464 ) -> Receiver<Entity<Buffer>> {
1465 let (tx, rx) = smol::channel::unbounded();
1466 let mut open_buffers = HashSet::default();
1467 let mut unnamed_buffers = Vec::new();
1468 for handle in self.buffers() {
1469 let buffer = handle.read(cx);
1470 if let Some(entry_id) = buffer.entry_id(cx) {
1471 open_buffers.insert(entry_id);
1472 } else {
1473 limit = limit.saturating_sub(1);
1474 unnamed_buffers.push(handle)
1475 };
1476 }
1477
1478 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1479 let project_paths_rx = self
1480 .worktree_store
1481 .update(cx, |worktree_store, cx| {
1482 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1483 })
1484 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1485
1486 cx.spawn(|this, mut cx| async move {
1487 for buffer in unnamed_buffers {
1488 tx.send(buffer).await.ok();
1489 }
1490
1491 let mut project_paths_rx = pin!(project_paths_rx);
1492 while let Some(project_paths) = project_paths_rx.next().await {
1493 let buffers = this.update(&mut cx, |this, cx| {
1494 project_paths
1495 .into_iter()
1496 .map(|project_path| this.open_buffer(project_path, false, cx))
1497 .collect::<Vec<_>>()
1498 })?;
1499 for buffer_task in buffers {
1500 if let Some(buffer) = buffer_task.await.log_err() {
1501 if tx.send(buffer).await.is_err() {
1502 return anyhow::Ok(());
1503 }
1504 }
1505 }
1506 }
1507 anyhow::Ok(())
1508 })
1509 .detach();
1510 rx
1511 }
1512
1513 pub fn recalculate_buffer_diffs(
1514 &mut self,
1515 buffers: Vec<Entity<Buffer>>,
1516 cx: &mut Context<Self>,
1517 ) -> impl Future<Output = ()> {
1518 let mut futures = Vec::new();
1519 for buffer in buffers {
1520 let buffer = buffer.read(cx).text_snapshot();
1521 if let Some(OpenBuffer::Complete {
1522 unstaged_changes, ..
1523 }) = self.opened_buffers.get_mut(&buffer.remote_id())
1524 {
1525 if let Some(unstaged_changes) = unstaged_changes
1526 .as_ref()
1527 .and_then(|changes| changes.upgrade())
1528 {
1529 unstaged_changes.update(cx, |unstaged_changes, cx| {
1530 futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
1531 });
1532 } else {
1533 unstaged_changes.take();
1534 }
1535 }
1536 }
1537 async move {
1538 futures::future::join_all(futures).await;
1539 }
1540 }
1541
1542 fn on_buffer_event(
1543 &mut self,
1544 buffer: Entity<Buffer>,
1545 event: &BufferEvent,
1546 cx: &mut Context<Self>,
1547 ) {
1548 match event {
1549 BufferEvent::FileHandleChanged => {
1550 if let Some(local) = self.as_local_mut() {
1551 local.buffer_changed_file(buffer, cx);
1552 }
1553 }
1554 BufferEvent::Reloaded => {
1555 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1556 return;
1557 };
1558 let buffer = buffer.read(cx);
1559 downstream_client
1560 .send(proto::BufferReloaded {
1561 project_id: *project_id,
1562 buffer_id: buffer.remote_id().to_proto(),
1563 version: serialize_version(&buffer.version()),
1564 mtime: buffer.saved_mtime().map(|t| t.into()),
1565 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1566 })
1567 .log_err();
1568 }
1569 _ => {}
1570 }
1571 }
1572
1573 pub async fn handle_update_buffer(
1574 this: Entity<Self>,
1575 envelope: TypedEnvelope<proto::UpdateBuffer>,
1576 mut cx: AsyncApp,
1577 ) -> Result<proto::Ack> {
1578 let payload = envelope.payload.clone();
1579 let buffer_id = BufferId::new(payload.buffer_id)?;
1580 let ops = payload
1581 .operations
1582 .into_iter()
1583 .map(language::proto::deserialize_operation)
1584 .collect::<Result<Vec<_>, _>>()?;
1585 this.update(&mut cx, |this, cx| {
1586 match this.opened_buffers.entry(buffer_id) {
1587 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1588 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1589 OpenBuffer::Complete { buffer, .. } => {
1590 if let Some(buffer) = buffer.upgrade() {
1591 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1592 }
1593 }
1594 },
1595 hash_map::Entry::Vacant(e) => {
1596 e.insert(OpenBuffer::Operations(ops));
1597 }
1598 }
1599 Ok(proto::Ack {})
1600 })?
1601 }
1602
1603 pub fn register_shared_lsp_handle(
1604 &mut self,
1605 peer_id: proto::PeerId,
1606 buffer_id: BufferId,
1607 handle: OpenLspBufferHandle,
1608 ) {
1609 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
1610 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
1611 buffer.lsp_handle = Some(handle);
1612 return;
1613 }
1614 }
1615 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1616 }
1617
1618 pub fn handle_synchronize_buffers(
1619 &mut self,
1620 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1621 cx: &mut Context<Self>,
1622 client: Arc<Client>,
1623 ) -> Result<proto::SynchronizeBuffersResponse> {
1624 let project_id = envelope.payload.project_id;
1625 let mut response = proto::SynchronizeBuffersResponse {
1626 buffers: Default::default(),
1627 };
1628 let Some(guest_id) = envelope.original_sender_id else {
1629 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1630 };
1631
1632 self.shared_buffers.entry(guest_id).or_default().clear();
1633 for buffer in envelope.payload.buffers {
1634 let buffer_id = BufferId::new(buffer.id)?;
1635 let remote_version = language::proto::deserialize_version(&buffer.version);
1636 if let Some(buffer) = self.get(buffer_id) {
1637 self.shared_buffers
1638 .entry(guest_id)
1639 .or_default()
1640 .entry(buffer_id)
1641 .or_insert_with(|| SharedBuffer {
1642 buffer: buffer.clone(),
1643 unstaged_changes: None,
1644 lsp_handle: None,
1645 });
1646
1647 let buffer = buffer.read(cx);
1648 response.buffers.push(proto::BufferVersion {
1649 id: buffer_id.into(),
1650 version: language::proto::serialize_version(&buffer.version),
1651 });
1652
1653 let operations = buffer.serialize_ops(Some(remote_version), cx);
1654 let client = client.clone();
1655 if let Some(file) = buffer.file() {
1656 client
1657 .send(proto::UpdateBufferFile {
1658 project_id,
1659 buffer_id: buffer_id.into(),
1660 file: Some(file.to_proto(cx)),
1661 })
1662 .log_err();
1663 }
1664
1665 // TODO(max): do something
1666 // client
1667 // .send(proto::UpdateStagedText {
1668 // project_id,
1669 // buffer_id: buffer_id.into(),
1670 // diff_base: buffer.diff_base().map(ToString::to_string),
1671 // })
1672 // .log_err();
1673
1674 client
1675 .send(proto::BufferReloaded {
1676 project_id,
1677 buffer_id: buffer_id.into(),
1678 version: language::proto::serialize_version(buffer.saved_version()),
1679 mtime: buffer.saved_mtime().map(|time| time.into()),
1680 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1681 as i32,
1682 })
1683 .log_err();
1684
1685 cx.background_executor()
1686 .spawn(
1687 async move {
1688 let operations = operations.await;
1689 for chunk in split_operations(operations) {
1690 client
1691 .request(proto::UpdateBuffer {
1692 project_id,
1693 buffer_id: buffer_id.into(),
1694 operations: chunk,
1695 })
1696 .await?;
1697 }
1698 anyhow::Ok(())
1699 }
1700 .log_err(),
1701 )
1702 .detach();
1703 }
1704 }
1705 Ok(response)
1706 }
1707
1708 pub fn handle_create_buffer_for_peer(
1709 &mut self,
1710 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1711 replica_id: u16,
1712 capability: Capability,
1713 cx: &mut Context<Self>,
1714 ) -> Result<()> {
1715 let Some(remote) = self.as_remote_mut() else {
1716 return Err(anyhow!("buffer store is not a remote"));
1717 };
1718
1719 if let Some(buffer) =
1720 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1721 {
1722 self.add_buffer(buffer, cx)?;
1723 }
1724
1725 Ok(())
1726 }
1727
1728 pub async fn handle_update_buffer_file(
1729 this: Entity<Self>,
1730 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1731 mut cx: AsyncApp,
1732 ) -> Result<()> {
1733 let buffer_id = envelope.payload.buffer_id;
1734 let buffer_id = BufferId::new(buffer_id)?;
1735
1736 this.update(&mut cx, |this, cx| {
1737 let payload = envelope.payload.clone();
1738 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1739 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1740 let worktree = this
1741 .worktree_store
1742 .read(cx)
1743 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1744 .ok_or_else(|| anyhow!("no such worktree"))?;
1745 let file = File::from_proto(file, worktree, cx)?;
1746 let old_file = buffer.update(cx, |buffer, cx| {
1747 let old_file = buffer.file().cloned();
1748 let new_path = file.path.clone();
1749 buffer.file_updated(Arc::new(file), cx);
1750 if old_file
1751 .as_ref()
1752 .map_or(true, |old| *old.path() != new_path)
1753 {
1754 Some(old_file)
1755 } else {
1756 None
1757 }
1758 });
1759 if let Some(old_file) = old_file {
1760 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1761 }
1762 }
1763 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1764 downstream_client
1765 .send(proto::UpdateBufferFile {
1766 project_id: *project_id,
1767 buffer_id: buffer_id.into(),
1768 file: envelope.payload.file,
1769 })
1770 .log_err();
1771 }
1772 Ok(())
1773 })?
1774 }
1775
1776 pub async fn handle_save_buffer(
1777 this: Entity<Self>,
1778 envelope: TypedEnvelope<proto::SaveBuffer>,
1779 mut cx: AsyncApp,
1780 ) -> Result<proto::BufferSaved> {
1781 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1782 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1783 anyhow::Ok((
1784 this.get_existing(buffer_id)?,
1785 this.downstream_client
1786 .as_ref()
1787 .map(|(_, project_id)| *project_id)
1788 .context("project is not shared")?,
1789 ))
1790 })??;
1791 buffer
1792 .update(&mut cx, |buffer, _| {
1793 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1794 })?
1795 .await?;
1796 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1797
1798 if let Some(new_path) = envelope.payload.new_path {
1799 let new_path = ProjectPath::from_proto(new_path);
1800 this.update(&mut cx, |this, cx| {
1801 this.save_buffer_as(buffer.clone(), new_path, cx)
1802 })?
1803 .await?;
1804 } else {
1805 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1806 .await?;
1807 }
1808
1809 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1810 project_id,
1811 buffer_id: buffer_id.into(),
1812 version: serialize_version(buffer.saved_version()),
1813 mtime: buffer.saved_mtime().map(|time| time.into()),
1814 })
1815 }
1816
1817 pub async fn handle_close_buffer(
1818 this: Entity<Self>,
1819 envelope: TypedEnvelope<proto::CloseBuffer>,
1820 mut cx: AsyncApp,
1821 ) -> Result<()> {
1822 let peer_id = envelope.sender_id;
1823 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1824 this.update(&mut cx, |this, _| {
1825 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1826 if shared.remove(&buffer_id).is_some() {
1827 if shared.is_empty() {
1828 this.shared_buffers.remove(&peer_id);
1829 }
1830 return;
1831 }
1832 }
1833 debug_panic!(
1834 "peer_id {} closed buffer_id {} which was either not open or already closed",
1835 peer_id,
1836 buffer_id
1837 )
1838 })
1839 }
1840
1841 pub async fn handle_buffer_saved(
1842 this: Entity<Self>,
1843 envelope: TypedEnvelope<proto::BufferSaved>,
1844 mut cx: AsyncApp,
1845 ) -> Result<()> {
1846 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1847 let version = deserialize_version(&envelope.payload.version);
1848 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1849 this.update(&mut cx, move |this, cx| {
1850 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1851 buffer.update(cx, |buffer, cx| {
1852 buffer.did_save(version, mtime, cx);
1853 });
1854 }
1855
1856 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1857 downstream_client
1858 .send(proto::BufferSaved {
1859 project_id: *project_id,
1860 buffer_id: buffer_id.into(),
1861 mtime: envelope.payload.mtime,
1862 version: envelope.payload.version,
1863 })
1864 .log_err();
1865 }
1866 })
1867 }
1868
1869 pub async fn handle_buffer_reloaded(
1870 this: Entity<Self>,
1871 envelope: TypedEnvelope<proto::BufferReloaded>,
1872 mut cx: AsyncApp,
1873 ) -> Result<()> {
1874 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1875 let version = deserialize_version(&envelope.payload.version);
1876 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1877 let line_ending = deserialize_line_ending(
1878 proto::LineEnding::from_i32(envelope.payload.line_ending)
1879 .ok_or_else(|| anyhow!("missing line ending"))?,
1880 );
1881 this.update(&mut cx, |this, cx| {
1882 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1883 buffer.update(cx, |buffer, cx| {
1884 buffer.did_reload(version, line_ending, mtime, cx);
1885 });
1886 }
1887
1888 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1889 downstream_client
1890 .send(proto::BufferReloaded {
1891 project_id: *project_id,
1892 buffer_id: buffer_id.into(),
1893 mtime: envelope.payload.mtime,
1894 version: envelope.payload.version,
1895 line_ending: envelope.payload.line_ending,
1896 })
1897 .log_err();
1898 }
1899 })
1900 }
1901
1902 pub async fn handle_blame_buffer(
1903 this: Entity<Self>,
1904 envelope: TypedEnvelope<proto::BlameBuffer>,
1905 mut cx: AsyncApp,
1906 ) -> Result<proto::BlameBufferResponse> {
1907 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1908 let version = deserialize_version(&envelope.payload.version);
1909 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1910 buffer
1911 .update(&mut cx, |buffer, _| {
1912 buffer.wait_for_version(version.clone())
1913 })?
1914 .await?;
1915 let blame = this
1916 .update(&mut cx, |this, cx| {
1917 this.blame_buffer(&buffer, Some(version), cx)
1918 })?
1919 .await?;
1920 Ok(serialize_blame_buffer_response(blame))
1921 }
1922
1923 pub async fn handle_get_permalink_to_line(
1924 this: Entity<Self>,
1925 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1926 mut cx: AsyncApp,
1927 ) -> Result<proto::GetPermalinkToLineResponse> {
1928 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1929 // let version = deserialize_version(&envelope.payload.version);
1930 let selection = {
1931 let proto_selection = envelope
1932 .payload
1933 .selection
1934 .context("no selection to get permalink for defined")?;
1935 proto_selection.start as u32..proto_selection.end as u32
1936 };
1937 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1938 let permalink = this
1939 .update(&mut cx, |this, cx| {
1940 this.get_permalink_to_line(&buffer, selection, cx)
1941 })?
1942 .await?;
1943 Ok(proto::GetPermalinkToLineResponse {
1944 permalink: permalink.to_string(),
1945 })
1946 }
1947
1948 pub async fn handle_get_staged_text(
1949 this: Entity<Self>,
1950 request: TypedEnvelope<proto::GetStagedText>,
1951 mut cx: AsyncApp,
1952 ) -> Result<proto::GetStagedTextResponse> {
1953 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1954 let change_set = this
1955 .update(&mut cx, |this, cx| {
1956 let buffer = this.get(buffer_id)?;
1957 Some(this.open_unstaged_changes(buffer, cx))
1958 })?
1959 .ok_or_else(|| anyhow!("no such buffer"))?
1960 .await?;
1961 this.update(&mut cx, |this, _| {
1962 let shared_buffers = this
1963 .shared_buffers
1964 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1965 .or_default();
1966 debug_assert!(shared_buffers.contains_key(&buffer_id));
1967 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
1968 shared.unstaged_changes = Some(change_set.clone());
1969 }
1970 })?;
1971 let staged_text = change_set.read_with(&cx, |change_set, _| {
1972 change_set.base_text.as_ref().map(|buffer| buffer.text())
1973 })?;
1974 Ok(proto::GetStagedTextResponse { staged_text })
1975 }
1976
1977 pub async fn handle_update_diff_base(
1978 this: Entity<Self>,
1979 request: TypedEnvelope<proto::UpdateDiffBase>,
1980 mut cx: AsyncApp,
1981 ) -> Result<()> {
1982 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1983 let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
1984 if let OpenBuffer::Complete {
1985 unstaged_changes,
1986 buffer,
1987 } = this.opened_buffers.get(&buffer_id)?
1988 {
1989 Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
1990 } else {
1991 None
1992 }
1993 })?
1994 else {
1995 return Ok(());
1996 };
1997 change_set.update(&mut cx, |change_set, cx| {
1998 if let Some(staged_text) = request.payload.staged_text {
1999 let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
2000 } else {
2001 change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
2002 }
2003 })?;
2004 Ok(())
2005 }
2006
2007 pub fn reload_buffers(
2008 &self,
2009 buffers: HashSet<Entity<Buffer>>,
2010 push_to_history: bool,
2011 cx: &mut Context<Self>,
2012 ) -> Task<Result<ProjectTransaction>> {
2013 if buffers.is_empty() {
2014 return Task::ready(Ok(ProjectTransaction::default()));
2015 }
2016 match &self.state {
2017 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2018 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2019 }
2020 }
2021
2022 async fn handle_reload_buffers(
2023 this: Entity<Self>,
2024 envelope: TypedEnvelope<proto::ReloadBuffers>,
2025 mut cx: AsyncApp,
2026 ) -> Result<proto::ReloadBuffersResponse> {
2027 let sender_id = envelope.original_sender_id().unwrap_or_default();
2028 let reload = this.update(&mut cx, |this, cx| {
2029 let mut buffers = HashSet::default();
2030 for buffer_id in &envelope.payload.buffer_ids {
2031 let buffer_id = BufferId::new(*buffer_id)?;
2032 buffers.insert(this.get_existing(buffer_id)?);
2033 }
2034 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2035 })??;
2036
2037 let project_transaction = reload.await?;
2038 let project_transaction = this.update(&mut cx, |this, cx| {
2039 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2040 })?;
2041 Ok(proto::ReloadBuffersResponse {
2042 transaction: Some(project_transaction),
2043 })
2044 }
2045
2046 pub fn create_buffer_for_peer(
2047 &mut self,
2048 buffer: &Entity<Buffer>,
2049 peer_id: proto::PeerId,
2050 cx: &mut Context<Self>,
2051 ) -> Task<Result<()>> {
2052 let buffer_id = buffer.read(cx).remote_id();
2053 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2054 if shared_buffers.contains_key(&buffer_id) {
2055 return Task::ready(Ok(()));
2056 }
2057 shared_buffers.insert(
2058 buffer_id,
2059 SharedBuffer {
2060 buffer: buffer.clone(),
2061 unstaged_changes: None,
2062 lsp_handle: None,
2063 },
2064 );
2065
2066 let Some((client, project_id)) = self.downstream_client.clone() else {
2067 return Task::ready(Ok(()));
2068 };
2069
2070 cx.spawn(|this, mut cx| async move {
2071 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2072 return anyhow::Ok(());
2073 };
2074
2075 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2076 let operations = operations.await;
2077 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2078
2079 let initial_state = proto::CreateBufferForPeer {
2080 project_id,
2081 peer_id: Some(peer_id),
2082 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2083 };
2084
2085 if client.send(initial_state).log_err().is_some() {
2086 let client = client.clone();
2087 cx.background_executor()
2088 .spawn(async move {
2089 let mut chunks = split_operations(operations).peekable();
2090 while let Some(chunk) = chunks.next() {
2091 let is_last = chunks.peek().is_none();
2092 client.send(proto::CreateBufferForPeer {
2093 project_id,
2094 peer_id: Some(peer_id),
2095 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2096 proto::BufferChunk {
2097 buffer_id: buffer_id.into(),
2098 operations: chunk,
2099 is_last,
2100 },
2101 )),
2102 })?;
2103 }
2104 anyhow::Ok(())
2105 })
2106 .await
2107 .log_err();
2108 }
2109 Ok(())
2110 })
2111 }
2112
2113 pub fn forget_shared_buffers(&mut self) {
2114 self.shared_buffers.clear();
2115 }
2116
2117 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2118 self.shared_buffers.remove(peer_id);
2119 }
2120
2121 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2122 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2123 self.shared_buffers.insert(new_peer_id, buffers);
2124 }
2125 }
2126
2127 pub fn has_shared_buffers(&self) -> bool {
2128 !self.shared_buffers.is_empty()
2129 }
2130
2131 pub fn create_local_buffer(
2132 &mut self,
2133 text: &str,
2134 language: Option<Arc<Language>>,
2135 cx: &mut Context<Self>,
2136 ) -> Entity<Buffer> {
2137 let buffer = cx.new(|cx| {
2138 Buffer::local(text, cx)
2139 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2140 });
2141
2142 self.add_buffer(buffer.clone(), cx).log_err();
2143 let buffer_id = buffer.read(cx).remote_id();
2144
2145 let this = self
2146 .as_local_mut()
2147 .expect("local-only method called in a non-local context");
2148 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2149 this.local_buffer_ids_by_path.insert(
2150 ProjectPath {
2151 worktree_id: file.worktree_id(cx),
2152 path: file.path.clone(),
2153 },
2154 buffer_id,
2155 );
2156
2157 if let Some(entry_id) = file.entry_id {
2158 this.local_buffer_ids_by_entry_id
2159 .insert(entry_id, buffer_id);
2160 }
2161 }
2162 buffer
2163 }
2164
2165 pub fn deserialize_project_transaction(
2166 &mut self,
2167 message: proto::ProjectTransaction,
2168 push_to_history: bool,
2169 cx: &mut Context<Self>,
2170 ) -> Task<Result<ProjectTransaction>> {
2171 if let Some(this) = self.as_remote_mut() {
2172 this.deserialize_project_transaction(message, push_to_history, cx)
2173 } else {
2174 debug_panic!("not a remote buffer store");
2175 Task::ready(Err(anyhow!("not a remote buffer store")))
2176 }
2177 }
2178
2179 pub fn wait_for_remote_buffer(
2180 &mut self,
2181 id: BufferId,
2182 cx: &mut Context<BufferStore>,
2183 ) -> Task<Result<Entity<Buffer>>> {
2184 if let Some(this) = self.as_remote_mut() {
2185 this.wait_for_remote_buffer(id, cx)
2186 } else {
2187 debug_panic!("not a remote buffer store");
2188 Task::ready(Err(anyhow!("not a remote buffer store")))
2189 }
2190 }
2191
2192 pub fn serialize_project_transaction_for_peer(
2193 &mut self,
2194 project_transaction: ProjectTransaction,
2195 peer_id: proto::PeerId,
2196 cx: &mut Context<Self>,
2197 ) -> proto::ProjectTransaction {
2198 let mut serialized_transaction = proto::ProjectTransaction {
2199 buffer_ids: Default::default(),
2200 transactions: Default::default(),
2201 };
2202 for (buffer, transaction) in project_transaction.0 {
2203 self.create_buffer_for_peer(&buffer, peer_id, cx)
2204 .detach_and_log_err(cx);
2205 serialized_transaction
2206 .buffer_ids
2207 .push(buffer.read(cx).remote_id().into());
2208 serialized_transaction
2209 .transactions
2210 .push(language::proto::serialize_transaction(&transaction));
2211 }
2212 serialized_transaction
2213 }
2214}
2215
2216impl EventEmitter<BufferChangeSetEvent> for BufferChangeSet {}
2217
2218impl BufferChangeSet {
2219 pub fn new(buffer: &Entity<Buffer>, cx: &mut Context<Self>) -> Self {
2220 cx.subscribe(buffer, |this, buffer, event, cx| match event {
2221 BufferEvent::LanguageChanged => {
2222 this.language = buffer.read(cx).language().cloned();
2223 if let Some(base_text) = &this.base_text {
2224 let snapshot = language::Buffer::build_snapshot(
2225 base_text.as_rope().clone(),
2226 this.language.clone(),
2227 this.language_registry.clone(),
2228 cx,
2229 );
2230 this.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2231 let base_text = cx.background_executor().spawn(snapshot).await;
2232 this.update(&mut cx, |this, cx| {
2233 this.base_text = Some(base_text);
2234 cx.emit(BufferChangeSetEvent::DiffChanged {
2235 changed_range: text::Anchor::MIN..text::Anchor::MAX,
2236 });
2237 })
2238 }));
2239 }
2240 }
2241 _ => {}
2242 })
2243 .detach();
2244
2245 let buffer = buffer.read(cx);
2246
2247 Self {
2248 buffer_id: buffer.remote_id(),
2249 base_text: None,
2250 diff_to_buffer: git::diff::BufferDiff::new(buffer),
2251 recalculate_diff_task: None,
2252 diff_updated_futures: Vec::new(),
2253 language: buffer.language().cloned(),
2254 language_registry: buffer.language_registry(),
2255 }
2256 }
2257
2258 #[cfg(any(test, feature = "test-support"))]
2259 pub fn new_with_base_text(
2260 base_text: String,
2261 buffer: &Entity<Buffer>,
2262 cx: &mut Context<Self>,
2263 ) -> Self {
2264 let mut this = Self::new(&buffer, cx);
2265 let _ = this.set_base_text(base_text, buffer.read(cx).text_snapshot(), cx);
2266 this
2267 }
2268
2269 pub fn diff_hunks_intersecting_range<'a>(
2270 &'a self,
2271 range: Range<text::Anchor>,
2272 buffer_snapshot: &'a text::BufferSnapshot,
2273 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2274 self.diff_to_buffer
2275 .hunks_intersecting_range(range, buffer_snapshot)
2276 }
2277
2278 pub fn diff_hunks_intersecting_range_rev<'a>(
2279 &'a self,
2280 range: Range<text::Anchor>,
2281 buffer_snapshot: &'a text::BufferSnapshot,
2282 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2283 self.diff_to_buffer
2284 .hunks_intersecting_range_rev(range, buffer_snapshot)
2285 }
2286
2287 #[cfg(any(test, feature = "test-support"))]
2288 pub fn base_text_string(&self) -> Option<String> {
2289 self.base_text.as_ref().map(|buffer| buffer.text())
2290 }
2291
2292 pub fn set_base_text(
2293 &mut self,
2294 mut base_text: String,
2295 buffer_snapshot: text::BufferSnapshot,
2296 cx: &mut Context<Self>,
2297 ) -> oneshot::Receiver<()> {
2298 LineEnding::normalize(&mut base_text);
2299 self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
2300 }
2301
2302 pub fn unset_base_text(
2303 &mut self,
2304 buffer_snapshot: text::BufferSnapshot,
2305 cx: &mut Context<Self>,
2306 ) {
2307 if self.base_text.is_some() {
2308 self.base_text = None;
2309 self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
2310 self.recalculate_diff_task.take();
2311 cx.notify();
2312 }
2313 }
2314
2315 pub fn recalculate_diff(
2316 &mut self,
2317 buffer_snapshot: text::BufferSnapshot,
2318 cx: &mut Context<Self>,
2319 ) -> oneshot::Receiver<()> {
2320 if let Some(base_text) = self.base_text.clone() {
2321 self.recalculate_diff_internal(base_text.text(), buffer_snapshot, false, cx)
2322 } else {
2323 oneshot::channel().1
2324 }
2325 }
2326
2327 fn recalculate_diff_internal(
2328 &mut self,
2329 base_text: String,
2330 buffer_snapshot: text::BufferSnapshot,
2331 base_text_changed: bool,
2332 cx: &mut Context<Self>,
2333 ) -> oneshot::Receiver<()> {
2334 let (tx, rx) = oneshot::channel();
2335 self.diff_updated_futures.push(tx);
2336 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2337 let (old_diff, new_base_text) = this.update(&mut cx, |this, cx| {
2338 let new_base_text = if base_text_changed {
2339 let base_text_rope: Rope = base_text.as_str().into();
2340 let snapshot = language::Buffer::build_snapshot(
2341 base_text_rope,
2342 this.language.clone(),
2343 this.language_registry.clone(),
2344 cx,
2345 );
2346 cx.background_executor()
2347 .spawn(async move { Some(snapshot.await) })
2348 } else {
2349 Task::ready(None)
2350 };
2351 (this.diff_to_buffer.clone(), new_base_text)
2352 })?;
2353
2354 let diff = cx.background_executor().spawn(async move {
2355 let new_diff = BufferDiff::build(&base_text, &buffer_snapshot);
2356 let changed_range = if base_text_changed {
2357 Some(text::Anchor::MIN..text::Anchor::MAX)
2358 } else {
2359 new_diff.compare(&old_diff, &buffer_snapshot)
2360 };
2361 (new_diff, changed_range)
2362 });
2363
2364 let (new_base_text, (diff, changed_range)) = futures::join!(new_base_text, diff);
2365
2366 this.update(&mut cx, |this, cx| {
2367 if let Some(new_base_text) = new_base_text {
2368 this.base_text = Some(new_base_text)
2369 }
2370 this.diff_to_buffer = diff;
2371
2372 this.recalculate_diff_task.take();
2373 for tx in this.diff_updated_futures.drain(..) {
2374 tx.send(()).ok();
2375 }
2376 if let Some(changed_range) = changed_range {
2377 cx.emit(BufferChangeSetEvent::DiffChanged { changed_range });
2378 }
2379 })?;
2380 Ok(())
2381 }));
2382 rx
2383 }
2384}
2385
2386impl OpenBuffer {
2387 fn upgrade(&self) -> Option<Entity<Buffer>> {
2388 match self {
2389 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2390 OpenBuffer::Operations(_) => None,
2391 }
2392 }
2393}
2394
2395fn is_not_found_error(error: &anyhow::Error) -> bool {
2396 error
2397 .root_cause()
2398 .downcast_ref::<io::Error>()
2399 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2400}
2401
2402fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2403 let Some(blame) = blame else {
2404 return proto::BlameBufferResponse {
2405 blame_response: None,
2406 };
2407 };
2408
2409 let entries = blame
2410 .entries
2411 .into_iter()
2412 .map(|entry| proto::BlameEntry {
2413 sha: entry.sha.as_bytes().into(),
2414 start_line: entry.range.start,
2415 end_line: entry.range.end,
2416 original_line_number: entry.original_line_number,
2417 author: entry.author.clone(),
2418 author_mail: entry.author_mail.clone(),
2419 author_time: entry.author_time,
2420 author_tz: entry.author_tz.clone(),
2421 committer: entry.committer.clone(),
2422 committer_mail: entry.committer_mail.clone(),
2423 committer_time: entry.committer_time,
2424 committer_tz: entry.committer_tz.clone(),
2425 summary: entry.summary.clone(),
2426 previous: entry.previous.clone(),
2427 filename: entry.filename.clone(),
2428 })
2429 .collect::<Vec<_>>();
2430
2431 let messages = blame
2432 .messages
2433 .into_iter()
2434 .map(|(oid, message)| proto::CommitMessage {
2435 oid: oid.as_bytes().into(),
2436 message,
2437 })
2438 .collect::<Vec<_>>();
2439
2440 let permalinks = blame
2441 .permalinks
2442 .into_iter()
2443 .map(|(oid, url)| proto::CommitPermalink {
2444 oid: oid.as_bytes().into(),
2445 permalink: url.to_string(),
2446 })
2447 .collect::<Vec<_>>();
2448
2449 proto::BlameBufferResponse {
2450 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2451 entries,
2452 messages,
2453 permalinks,
2454 remote_url: blame.remote_url,
2455 }),
2456 }
2457}
2458
2459fn deserialize_blame_buffer_response(
2460 response: proto::BlameBufferResponse,
2461) -> Option<git::blame::Blame> {
2462 let response = response.blame_response?;
2463 let entries = response
2464 .entries
2465 .into_iter()
2466 .filter_map(|entry| {
2467 Some(git::blame::BlameEntry {
2468 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2469 range: entry.start_line..entry.end_line,
2470 original_line_number: entry.original_line_number,
2471 committer: entry.committer,
2472 committer_time: entry.committer_time,
2473 committer_tz: entry.committer_tz,
2474 committer_mail: entry.committer_mail,
2475 author: entry.author,
2476 author_mail: entry.author_mail,
2477 author_time: entry.author_time,
2478 author_tz: entry.author_tz,
2479 summary: entry.summary,
2480 previous: entry.previous,
2481 filename: entry.filename,
2482 })
2483 })
2484 .collect::<Vec<_>>();
2485
2486 let messages = response
2487 .messages
2488 .into_iter()
2489 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2490 .collect::<HashMap<_, _>>();
2491
2492 let permalinks = response
2493 .permalinks
2494 .into_iter()
2495 .filter_map(|permalink| {
2496 Some((
2497 git::Oid::from_bytes(&permalink.oid).ok()?,
2498 Url::from_str(&permalink.permalink).ok()?,
2499 ))
2500 })
2501 .collect::<HashMap<_, _>>();
2502
2503 Some(Blame {
2504 entries,
2505 permalinks,
2506 messages,
2507 remote_url: response.remote_url,
2508 })
2509}
2510
2511fn get_permalink_in_rust_registry_src(
2512 provider_registry: Arc<GitHostingProviderRegistry>,
2513 path: PathBuf,
2514 selection: Range<u32>,
2515) -> Result<url::Url> {
2516 #[derive(Deserialize)]
2517 struct CargoVcsGit {
2518 sha1: String,
2519 }
2520
2521 #[derive(Deserialize)]
2522 struct CargoVcsInfo {
2523 git: CargoVcsGit,
2524 path_in_vcs: String,
2525 }
2526
2527 #[derive(Deserialize)]
2528 struct CargoPackage {
2529 repository: String,
2530 }
2531
2532 #[derive(Deserialize)]
2533 struct CargoToml {
2534 package: CargoPackage,
2535 }
2536
2537 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2538 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2539 Some((dir, json))
2540 }) else {
2541 bail!("No .cargo_vcs_info.json found in parent directories")
2542 };
2543 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2544 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2545 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2546 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2547 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2548 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2549 let permalink = provider.build_permalink(
2550 remote,
2551 BuildPermalinkParams {
2552 sha: &cargo_vcs_info.git.sha1,
2553 path: &path.to_string_lossy(),
2554 selection: Some(selection),
2555 },
2556 );
2557 Ok(permalink)
2558}