1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
13use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
14use gpui::{
15 App, AppContext as _, AsyncAppContext, Context, Entity, EventEmitter, Subscription, Task,
16 WeakEntity,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
25};
26use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
27use serde::Deserialize;
28use smol::channel::Receiver;
29use std::{
30 io,
31 ops::Range,
32 path::{Path, PathBuf},
33 pin::pin,
34 str::FromStr as _,
35 sync::Arc,
36 time::Instant,
37};
38use text::{BufferId, LineEnding, Rope};
39use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
40use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
41
42/// A set of open buffers.
43pub struct BufferStore {
44 state: BufferStoreState,
45 #[allow(clippy::type_complexity)]
46 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
47 #[allow(clippy::type_complexity)]
48 loading_change_sets:
49 HashMap<BufferId, Shared<Task<Result<Entity<BufferChangeSet>, Arc<anyhow::Error>>>>>,
50 worktree_store: Entity<WorktreeStore>,
51 opened_buffers: HashMap<BufferId, OpenBuffer>,
52 downstream_client: Option<(AnyProtoClient, u64)>,
53 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
54}
55
56#[derive(Hash, Eq, PartialEq, Clone)]
57struct SharedBuffer {
58 buffer: Entity<Buffer>,
59 unstaged_changes: Option<Entity<BufferChangeSet>>,
60 lsp_handle: Option<OpenLspBufferHandle>,
61}
62
63pub struct BufferChangeSet {
64 pub buffer_id: BufferId,
65 pub base_text: Option<language::BufferSnapshot>,
66 pub language: Option<Arc<Language>>,
67 pub diff_to_buffer: git::diff::BufferDiff,
68 pub recalculate_diff_task: Option<Task<Result<()>>>,
69 pub diff_updated_futures: Vec<oneshot::Sender<()>>,
70 pub language_registry: Option<Arc<LanguageRegistry>>,
71}
72
73enum BufferStoreState {
74 Local(LocalBufferStore),
75 Remote(RemoteBufferStore),
76}
77
78struct RemoteBufferStore {
79 shared_with_me: HashSet<Entity<Buffer>>,
80 upstream_client: AnyProtoClient,
81 project_id: u64,
82 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
83 remote_buffer_listeners:
84 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
85 worktree_store: Entity<WorktreeStore>,
86}
87
88struct LocalBufferStore {
89 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
90 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
91 worktree_store: Entity<WorktreeStore>,
92 _subscription: Subscription,
93}
94
95enum OpenBuffer {
96 Complete {
97 buffer: WeakEntity<Buffer>,
98 unstaged_changes: Option<WeakEntity<BufferChangeSet>>,
99 },
100 Operations(Vec<Operation>),
101}
102
103pub enum BufferStoreEvent {
104 BufferAdded(Entity<Buffer>),
105 BufferDropped(BufferId),
106 BufferChangedFilePath {
107 buffer: Entity<Buffer>,
108 old_file: Option<Arc<dyn language::File>>,
109 },
110}
111
112#[derive(Default, Debug)]
113pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
114
115impl EventEmitter<BufferStoreEvent> for BufferStore {}
116
117impl RemoteBufferStore {
118 fn load_staged_text(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
119 let project_id = self.project_id;
120 let client = self.upstream_client.clone();
121 cx.background_executor().spawn(async move {
122 Ok(client
123 .request(proto::GetStagedText {
124 project_id,
125 buffer_id: buffer_id.to_proto(),
126 })
127 .await?
128 .staged_text)
129 })
130 }
131 pub fn wait_for_remote_buffer(
132 &mut self,
133 id: BufferId,
134 cx: &mut Context<BufferStore>,
135 ) -> Task<Result<Entity<Buffer>>> {
136 let (tx, rx) = oneshot::channel();
137 self.remote_buffer_listeners.entry(id).or_default().push(tx);
138
139 cx.spawn(|this, cx| async move {
140 if let Some(buffer) = this
141 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
142 .ok()
143 .flatten()
144 {
145 return Ok(buffer);
146 }
147
148 cx.background_executor()
149 .spawn(async move { rx.await? })
150 .await
151 })
152 }
153
154 fn save_remote_buffer(
155 &self,
156 buffer_handle: Entity<Buffer>,
157 new_path: Option<proto::ProjectPath>,
158 cx: &Context<BufferStore>,
159 ) -> Task<Result<()>> {
160 let buffer = buffer_handle.read(cx);
161 let buffer_id = buffer.remote_id().into();
162 let version = buffer.version();
163 let rpc = self.upstream_client.clone();
164 let project_id = self.project_id;
165 cx.spawn(move |_, mut cx| async move {
166 let response = rpc
167 .request(proto::SaveBuffer {
168 project_id,
169 buffer_id,
170 new_path,
171 version: serialize_version(&version),
172 })
173 .await?;
174 let version = deserialize_version(&response.version);
175 let mtime = response.mtime.map(|mtime| mtime.into());
176
177 buffer_handle.update(&mut cx, |buffer, cx| {
178 buffer.did_save(version.clone(), mtime, cx);
179 })?;
180
181 Ok(())
182 })
183 }
184
185 pub fn handle_create_buffer_for_peer(
186 &mut self,
187 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
188 replica_id: u16,
189 capability: Capability,
190 cx: &mut Context<BufferStore>,
191 ) -> Result<Option<Entity<Buffer>>> {
192 match envelope
193 .payload
194 .variant
195 .ok_or_else(|| anyhow!("missing variant"))?
196 {
197 proto::create_buffer_for_peer::Variant::State(mut state) => {
198 let buffer_id = BufferId::new(state.id)?;
199
200 let buffer_result = maybe!({
201 let mut buffer_file = None;
202 if let Some(file) = state.file.take() {
203 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
204 let worktree = self
205 .worktree_store
206 .read(cx)
207 .worktree_for_id(worktree_id, cx)
208 .ok_or_else(|| {
209 anyhow!("no worktree found for id {}", file.worktree_id)
210 })?;
211 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
212 as Arc<dyn language::File>);
213 }
214 Buffer::from_proto(replica_id, capability, state, buffer_file)
215 });
216
217 match buffer_result {
218 Ok(buffer) => {
219 let buffer = cx.new(|_| buffer);
220 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
221 }
222 Err(error) => {
223 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
224 for listener in listeners {
225 listener.send(Err(anyhow!(error.cloned()))).ok();
226 }
227 }
228 }
229 }
230 }
231 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
232 let buffer_id = BufferId::new(chunk.buffer_id)?;
233 let buffer = self
234 .loading_remote_buffers_by_id
235 .get(&buffer_id)
236 .cloned()
237 .ok_or_else(|| {
238 anyhow!(
239 "received chunk for buffer {} without initial state",
240 chunk.buffer_id
241 )
242 })?;
243
244 let result = maybe!({
245 let operations = chunk
246 .operations
247 .into_iter()
248 .map(language::proto::deserialize_operation)
249 .collect::<Result<Vec<_>>>()?;
250 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
251 anyhow::Ok(())
252 });
253
254 if let Err(error) = result {
255 self.loading_remote_buffers_by_id.remove(&buffer_id);
256 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
257 for listener in listeners {
258 listener.send(Err(error.cloned())).ok();
259 }
260 }
261 } else if chunk.is_last {
262 self.loading_remote_buffers_by_id.remove(&buffer_id);
263 if self.upstream_client.is_via_collab() {
264 // retain buffers sent by peers to avoid races.
265 self.shared_with_me.insert(buffer.clone());
266 }
267
268 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
269 for sender in senders {
270 sender.send(Ok(buffer.clone())).ok();
271 }
272 }
273 return Ok(Some(buffer));
274 }
275 }
276 }
277 return Ok(None);
278 }
279
280 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
281 self.loading_remote_buffers_by_id
282 .keys()
283 .copied()
284 .collect::<Vec<_>>()
285 }
286
287 pub fn deserialize_project_transaction(
288 &self,
289 message: proto::ProjectTransaction,
290 push_to_history: bool,
291 cx: &mut Context<BufferStore>,
292 ) -> Task<Result<ProjectTransaction>> {
293 cx.spawn(|this, mut cx| async move {
294 let mut project_transaction = ProjectTransaction::default();
295 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
296 {
297 let buffer_id = BufferId::new(buffer_id)?;
298 let buffer = this
299 .update(&mut cx, |this, cx| {
300 this.wait_for_remote_buffer(buffer_id, cx)
301 })?
302 .await?;
303 let transaction = language::proto::deserialize_transaction(transaction)?;
304 project_transaction.0.insert(buffer, transaction);
305 }
306
307 for (buffer, transaction) in &project_transaction.0 {
308 buffer
309 .update(&mut cx, |buffer, _| {
310 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
311 })?
312 .await?;
313
314 if push_to_history {
315 buffer.update(&mut cx, |buffer, _| {
316 buffer.push_transaction(transaction.clone(), Instant::now());
317 })?;
318 }
319 }
320
321 Ok(project_transaction)
322 })
323 }
324
325 fn open_buffer(
326 &self,
327 path: Arc<Path>,
328 worktree: Entity<Worktree>,
329 cx: &mut Context<BufferStore>,
330 ) -> Task<Result<Entity<Buffer>>> {
331 let worktree_id = worktree.read(cx).id().to_proto();
332 let project_id = self.project_id;
333 let client = self.upstream_client.clone();
334 let path_string = path.clone().to_string_lossy().to_string();
335 cx.spawn(move |this, mut cx| async move {
336 let response = client
337 .request(proto::OpenBufferByPath {
338 project_id,
339 worktree_id,
340 path: path_string,
341 })
342 .await?;
343 let buffer_id = BufferId::new(response.buffer_id)?;
344
345 let buffer = this
346 .update(&mut cx, {
347 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
348 })?
349 .await?;
350
351 Ok(buffer)
352 })
353 }
354
355 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
356 let create = self.upstream_client.request(proto::OpenNewBuffer {
357 project_id: self.project_id,
358 });
359 cx.spawn(|this, mut cx| async move {
360 let response = create.await?;
361 let buffer_id = BufferId::new(response.buffer_id)?;
362
363 this.update(&mut cx, |this, cx| {
364 this.wait_for_remote_buffer(buffer_id, cx)
365 })?
366 .await
367 })
368 }
369
370 fn reload_buffers(
371 &self,
372 buffers: HashSet<Entity<Buffer>>,
373 push_to_history: bool,
374 cx: &mut Context<BufferStore>,
375 ) -> Task<Result<ProjectTransaction>> {
376 let request = self.upstream_client.request(proto::ReloadBuffers {
377 project_id: self.project_id,
378 buffer_ids: buffers
379 .iter()
380 .map(|buffer| buffer.read(cx).remote_id().to_proto())
381 .collect(),
382 });
383
384 cx.spawn(|this, mut cx| async move {
385 let response = request
386 .await?
387 .transaction
388 .ok_or_else(|| anyhow!("missing transaction"))?;
389 this.update(&mut cx, |this, cx| {
390 this.deserialize_project_transaction(response, push_to_history, cx)
391 })?
392 .await
393 })
394 }
395}
396
397impl LocalBufferStore {
398 fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
399 let Some(file) = buffer.read(cx).file() else {
400 return Task::ready(Ok(None));
401 };
402 let worktree_id = file.worktree_id(cx);
403 let path = file.path().clone();
404 let Some(worktree) = self
405 .worktree_store
406 .read(cx)
407 .worktree_for_id(worktree_id, cx)
408 else {
409 return Task::ready(Err(anyhow!("no such worktree")));
410 };
411
412 worktree.read(cx).load_staged_file(path.as_ref(), cx)
413 }
414
415 fn save_local_buffer(
416 &self,
417 buffer_handle: Entity<Buffer>,
418 worktree: Entity<Worktree>,
419 path: Arc<Path>,
420 mut has_changed_file: bool,
421 cx: &mut Context<BufferStore>,
422 ) -> Task<Result<()>> {
423 let buffer = buffer_handle.read(cx);
424
425 let text = buffer.as_rope().clone();
426 let line_ending = buffer.line_ending();
427 let version = buffer.version();
428 let buffer_id = buffer.remote_id();
429 if buffer
430 .file()
431 .is_some_and(|file| file.disk_state() == DiskState::New)
432 {
433 has_changed_file = true;
434 }
435
436 let save = worktree.update(cx, |worktree, cx| {
437 worktree.write_file(path.as_ref(), text, line_ending, cx)
438 });
439
440 cx.spawn(move |this, mut cx| async move {
441 let new_file = save.await?;
442 let mtime = new_file.disk_state().mtime();
443 this.update(&mut cx, |this, cx| {
444 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
445 if has_changed_file {
446 downstream_client
447 .send(proto::UpdateBufferFile {
448 project_id,
449 buffer_id: buffer_id.to_proto(),
450 file: Some(language::File::to_proto(&*new_file, cx)),
451 })
452 .log_err();
453 }
454 downstream_client
455 .send(proto::BufferSaved {
456 project_id,
457 buffer_id: buffer_id.to_proto(),
458 version: serialize_version(&version),
459 mtime: mtime.map(|time| time.into()),
460 })
461 .log_err();
462 }
463 })?;
464 buffer_handle.update(&mut cx, |buffer, cx| {
465 if has_changed_file {
466 buffer.file_updated(new_file, cx);
467 }
468 buffer.did_save(version.clone(), mtime, cx);
469 })
470 })
471 }
472
473 fn subscribe_to_worktree(
474 &mut self,
475 worktree: &Entity<Worktree>,
476 cx: &mut Context<BufferStore>,
477 ) {
478 cx.subscribe(worktree, |this, worktree, event, cx| {
479 if worktree.read(cx).is_local() {
480 match event {
481 worktree::Event::UpdatedEntries(changes) => {
482 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
483 }
484 worktree::Event::UpdatedGitRepositories(updated_repos) => {
485 Self::local_worktree_git_repos_changed(
486 this,
487 worktree.clone(),
488 updated_repos,
489 cx,
490 )
491 }
492 _ => {}
493 }
494 }
495 })
496 .detach();
497 }
498
499 fn local_worktree_entries_changed(
500 this: &mut BufferStore,
501 worktree_handle: &Entity<Worktree>,
502 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
503 cx: &mut Context<BufferStore>,
504 ) {
505 let snapshot = worktree_handle.read(cx).snapshot();
506 for (path, entry_id, _) in changes {
507 Self::local_worktree_entry_changed(
508 this,
509 *entry_id,
510 path,
511 worktree_handle,
512 &snapshot,
513 cx,
514 );
515 }
516 }
517
518 fn local_worktree_git_repos_changed(
519 this: &mut BufferStore,
520 worktree_handle: Entity<Worktree>,
521 changed_repos: &UpdatedGitRepositoriesSet,
522 cx: &mut Context<BufferStore>,
523 ) {
524 debug_assert!(worktree_handle.read(cx).is_local());
525
526 let buffer_change_sets = this
527 .opened_buffers
528 .values()
529 .filter_map(|buffer| {
530 if let OpenBuffer::Complete {
531 buffer,
532 unstaged_changes,
533 } = buffer
534 {
535 let buffer = buffer.upgrade()?.read(cx);
536 let file = File::from_dyn(buffer.file())?;
537 if file.worktree != worktree_handle {
538 return None;
539 }
540 changed_repos
541 .iter()
542 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
543 let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
544 let snapshot = buffer.text_snapshot();
545 Some((unstaged_changes, snapshot, file.path.clone()))
546 } else {
547 None
548 }
549 })
550 .collect::<Vec<_>>();
551
552 if buffer_change_sets.is_empty() {
553 return;
554 }
555
556 cx.spawn(move |this, mut cx| async move {
557 let snapshot =
558 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
559 let diff_bases_by_buffer = cx
560 .background_executor()
561 .spawn(async move {
562 buffer_change_sets
563 .into_iter()
564 .filter_map(|(change_set, buffer_snapshot, path)| {
565 let local_repo = snapshot.local_repo_for_path(&path)?;
566 let relative_path = local_repo.relativize(&path).ok()?;
567 let base_text = local_repo.repo().load_index_text(&relative_path);
568 Some((change_set, buffer_snapshot, base_text))
569 })
570 .collect::<Vec<_>>()
571 })
572 .await;
573
574 this.update(&mut cx, |this, cx| {
575 for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
576 change_set.update(cx, |change_set, cx| {
577 if let Some(staged_text) = staged_text.clone() {
578 let _ =
579 change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
580 } else {
581 change_set.unset_base_text(buffer_snapshot.clone(), cx);
582 }
583 });
584
585 if let Some((client, project_id)) = &this.downstream_client.clone() {
586 client
587 .send(proto::UpdateDiffBase {
588 project_id: *project_id,
589 buffer_id: buffer_snapshot.remote_id().to_proto(),
590 staged_text,
591 })
592 .log_err();
593 }
594 }
595 })
596 })
597 .detach_and_log_err(cx);
598 }
599
600 fn local_worktree_entry_changed(
601 this: &mut BufferStore,
602 entry_id: ProjectEntryId,
603 path: &Arc<Path>,
604 worktree: &Entity<worktree::Worktree>,
605 snapshot: &worktree::Snapshot,
606 cx: &mut Context<BufferStore>,
607 ) -> Option<()> {
608 let project_path = ProjectPath {
609 worktree_id: snapshot.id(),
610 path: path.clone(),
611 };
612
613 let buffer_id = {
614 let local = this.as_local_mut()?;
615 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
616 Some(&buffer_id) => buffer_id,
617 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
618 }
619 };
620
621 let buffer = if let Some(buffer) = this.get(buffer_id) {
622 Some(buffer)
623 } else {
624 this.opened_buffers.remove(&buffer_id);
625 None
626 };
627
628 let buffer = if let Some(buffer) = buffer {
629 buffer
630 } else {
631 let this = this.as_local_mut()?;
632 this.local_buffer_ids_by_path.remove(&project_path);
633 this.local_buffer_ids_by_entry_id.remove(&entry_id);
634 return None;
635 };
636
637 let events = buffer.update(cx, |buffer, cx| {
638 let local = this.as_local_mut()?;
639 let file = buffer.file()?;
640 let old_file = File::from_dyn(Some(file))?;
641 if old_file.worktree != *worktree {
642 return None;
643 }
644
645 let snapshot_entry = old_file
646 .entry_id
647 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
648 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
649
650 let new_file = if let Some(entry) = snapshot_entry {
651 File {
652 disk_state: match entry.mtime {
653 Some(mtime) => DiskState::Present { mtime },
654 None => old_file.disk_state,
655 },
656 is_local: true,
657 entry_id: Some(entry.id),
658 path: entry.path.clone(),
659 worktree: worktree.clone(),
660 is_private: entry.is_private,
661 }
662 } else {
663 File {
664 disk_state: DiskState::Deleted,
665 is_local: true,
666 entry_id: old_file.entry_id,
667 path: old_file.path.clone(),
668 worktree: worktree.clone(),
669 is_private: old_file.is_private,
670 }
671 };
672
673 if new_file == *old_file {
674 return None;
675 }
676
677 let mut events = Vec::new();
678 if new_file.path != old_file.path {
679 local.local_buffer_ids_by_path.remove(&ProjectPath {
680 path: old_file.path.clone(),
681 worktree_id: old_file.worktree_id(cx),
682 });
683 local.local_buffer_ids_by_path.insert(
684 ProjectPath {
685 worktree_id: new_file.worktree_id(cx),
686 path: new_file.path.clone(),
687 },
688 buffer_id,
689 );
690 events.push(BufferStoreEvent::BufferChangedFilePath {
691 buffer: cx.model(),
692 old_file: buffer.file().cloned(),
693 });
694 }
695
696 if new_file.entry_id != old_file.entry_id {
697 if let Some(entry_id) = old_file.entry_id {
698 local.local_buffer_ids_by_entry_id.remove(&entry_id);
699 }
700 if let Some(entry_id) = new_file.entry_id {
701 local
702 .local_buffer_ids_by_entry_id
703 .insert(entry_id, buffer_id);
704 }
705 }
706
707 if let Some((client, project_id)) = &this.downstream_client {
708 client
709 .send(proto::UpdateBufferFile {
710 project_id: *project_id,
711 buffer_id: buffer_id.to_proto(),
712 file: Some(new_file.to_proto(cx)),
713 })
714 .ok();
715 }
716
717 buffer.file_updated(Arc::new(new_file), cx);
718 Some(events)
719 })?;
720
721 for event in events {
722 cx.emit(event);
723 }
724
725 None
726 }
727
728 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
729 let file = File::from_dyn(buffer.read(cx).file())?;
730
731 let remote_id = buffer.read(cx).remote_id();
732 if let Some(entry_id) = file.entry_id {
733 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
734 Some(_) => {
735 return None;
736 }
737 None => {
738 self.local_buffer_ids_by_entry_id
739 .insert(entry_id, remote_id);
740 }
741 }
742 };
743 self.local_buffer_ids_by_path.insert(
744 ProjectPath {
745 worktree_id: file.worktree_id(cx),
746 path: file.path.clone(),
747 },
748 remote_id,
749 );
750
751 Some(())
752 }
753
754 fn save_buffer(
755 &self,
756 buffer: Entity<Buffer>,
757 cx: &mut Context<BufferStore>,
758 ) -> Task<Result<()>> {
759 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
760 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
761 };
762 let worktree = file.worktree.clone();
763 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
764 }
765
766 fn save_buffer_as(
767 &self,
768 buffer: Entity<Buffer>,
769 path: ProjectPath,
770 cx: &mut Context<BufferStore>,
771 ) -> Task<Result<()>> {
772 let Some(worktree) = self
773 .worktree_store
774 .read(cx)
775 .worktree_for_id(path.worktree_id, cx)
776 else {
777 return Task::ready(Err(anyhow!("no such worktree")));
778 };
779 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
780 }
781
782 fn open_buffer(
783 &self,
784 path: Arc<Path>,
785 worktree: Entity<Worktree>,
786 cx: &mut Context<BufferStore>,
787 ) -> Task<Result<Entity<Buffer>>> {
788 let load_buffer = worktree.update(cx, |worktree, cx| {
789 let load_file = worktree.load_file(path.as_ref(), cx);
790 let reservation = cx.reserve_model();
791 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
792 cx.spawn(move |_, mut cx| async move {
793 let loaded = load_file.await?;
794 let text_buffer = cx
795 .background_executor()
796 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
797 .await;
798 cx.insert_model(reservation, |_| {
799 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
800 })
801 })
802 });
803
804 cx.spawn(move |this, mut cx| async move {
805 let buffer = match load_buffer.await {
806 Ok(buffer) => Ok(buffer),
807 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
808 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
809 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
810 Buffer::build(
811 text_buffer,
812 Some(Arc::new(File {
813 worktree,
814 path,
815 disk_state: DiskState::New,
816 entry_id: None,
817 is_local: true,
818 is_private: false,
819 })),
820 Capability::ReadWrite,
821 )
822 }),
823 Err(e) => Err(e),
824 }?;
825 this.update(&mut cx, |this, cx| {
826 this.add_buffer(buffer.clone(), cx)?;
827 let buffer_id = buffer.read(cx).remote_id();
828 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
829 let this = this.as_local_mut().unwrap();
830 this.local_buffer_ids_by_path.insert(
831 ProjectPath {
832 worktree_id: file.worktree_id(cx),
833 path: file.path.clone(),
834 },
835 buffer_id,
836 );
837
838 if let Some(entry_id) = file.entry_id {
839 this.local_buffer_ids_by_entry_id
840 .insert(entry_id, buffer_id);
841 }
842 }
843
844 anyhow::Ok(())
845 })??;
846
847 Ok(buffer)
848 })
849 }
850
851 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
852 cx.spawn(|buffer_store, mut cx| async move {
853 let buffer =
854 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
855 buffer_store.update(&mut cx, |buffer_store, cx| {
856 buffer_store.add_buffer(buffer.clone(), cx).log_err();
857 })?;
858 Ok(buffer)
859 })
860 }
861
862 fn reload_buffers(
863 &self,
864 buffers: HashSet<Entity<Buffer>>,
865 push_to_history: bool,
866 cx: &mut Context<BufferStore>,
867 ) -> Task<Result<ProjectTransaction>> {
868 cx.spawn(move |_, mut cx| async move {
869 let mut project_transaction = ProjectTransaction::default();
870 for buffer in buffers {
871 let transaction = buffer
872 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
873 .await?;
874 buffer.update(&mut cx, |buffer, cx| {
875 if let Some(transaction) = transaction {
876 if !push_to_history {
877 buffer.forget_transaction(transaction.id);
878 }
879 project_transaction.0.insert(cx.model(), transaction);
880 }
881 })?;
882 }
883
884 Ok(project_transaction)
885 })
886 }
887}
888
889impl BufferStore {
890 pub fn init(client: &AnyProtoClient) {
891 client.add_model_message_handler(Self::handle_buffer_reloaded);
892 client.add_model_message_handler(Self::handle_buffer_saved);
893 client.add_model_message_handler(Self::handle_update_buffer_file);
894 client.add_model_request_handler(Self::handle_save_buffer);
895 client.add_model_request_handler(Self::handle_blame_buffer);
896 client.add_model_request_handler(Self::handle_reload_buffers);
897 client.add_model_request_handler(Self::handle_get_permalink_to_line);
898 client.add_model_request_handler(Self::handle_get_staged_text);
899 client.add_model_message_handler(Self::handle_update_diff_base);
900 }
901
902 /// Creates a buffer store, optionally retaining its buffers.
903 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
904 Self {
905 state: BufferStoreState::Local(LocalBufferStore {
906 local_buffer_ids_by_path: Default::default(),
907 local_buffer_ids_by_entry_id: Default::default(),
908 worktree_store: worktree_store.clone(),
909 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
910 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
911 let this = this.as_local_mut().unwrap();
912 this.subscribe_to_worktree(worktree, cx);
913 }
914 }),
915 }),
916 downstream_client: None,
917 opened_buffers: Default::default(),
918 shared_buffers: Default::default(),
919 loading_buffers: Default::default(),
920 loading_change_sets: Default::default(),
921 worktree_store,
922 }
923 }
924
925 pub fn remote(
926 worktree_store: Entity<WorktreeStore>,
927 upstream_client: AnyProtoClient,
928 remote_id: u64,
929 _cx: &mut Context<Self>,
930 ) -> Self {
931 Self {
932 state: BufferStoreState::Remote(RemoteBufferStore {
933 shared_with_me: Default::default(),
934 loading_remote_buffers_by_id: Default::default(),
935 remote_buffer_listeners: Default::default(),
936 project_id: remote_id,
937 upstream_client,
938 worktree_store: worktree_store.clone(),
939 }),
940 downstream_client: None,
941 opened_buffers: Default::default(),
942 loading_buffers: Default::default(),
943 loading_change_sets: Default::default(),
944 shared_buffers: Default::default(),
945 worktree_store,
946 }
947 }
948
949 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
950 match &mut self.state {
951 BufferStoreState::Local(state) => Some(state),
952 _ => None,
953 }
954 }
955
956 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
957 match &mut self.state {
958 BufferStoreState::Remote(state) => Some(state),
959 _ => None,
960 }
961 }
962
963 fn as_remote(&self) -> Option<&RemoteBufferStore> {
964 match &self.state {
965 BufferStoreState::Remote(state) => Some(state),
966 _ => None,
967 }
968 }
969
970 pub fn open_buffer(
971 &mut self,
972 project_path: ProjectPath,
973 cx: &mut Context<Self>,
974 ) -> Task<Result<Entity<Buffer>>> {
975 if let Some(buffer) = self.get_by_path(&project_path, cx) {
976 return Task::ready(Ok(buffer));
977 }
978
979 let task = match self.loading_buffers.entry(project_path.clone()) {
980 hash_map::Entry::Occupied(e) => e.get().clone(),
981 hash_map::Entry::Vacant(entry) => {
982 let path = project_path.path.clone();
983 let Some(worktree) = self
984 .worktree_store
985 .read(cx)
986 .worktree_for_id(project_path.worktree_id, cx)
987 else {
988 return Task::ready(Err(anyhow!("no such worktree")));
989 };
990 let load_buffer = match &self.state {
991 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
992 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
993 };
994
995 entry
996 .insert(
997 cx.spawn(move |this, mut cx| async move {
998 let load_result = load_buffer.await;
999 this.update(&mut cx, |this, _cx| {
1000 // Record the fact that the buffer is no longer loading.
1001 this.loading_buffers.remove(&project_path);
1002 })
1003 .ok();
1004 load_result.map_err(Arc::new)
1005 })
1006 .shared(),
1007 )
1008 .clone()
1009 }
1010 };
1011
1012 cx.background_executor()
1013 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1014 }
1015
1016 pub fn open_unstaged_changes(
1017 &mut self,
1018 buffer: Entity<Buffer>,
1019 cx: &mut Context<Self>,
1020 ) -> Task<Result<Entity<BufferChangeSet>>> {
1021 let buffer_id = buffer.read(cx).remote_id();
1022 if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
1023 return Task::ready(Ok(change_set));
1024 }
1025
1026 let task = match self.loading_change_sets.entry(buffer_id) {
1027 hash_map::Entry::Occupied(e) => e.get().clone(),
1028 hash_map::Entry::Vacant(entry) => {
1029 let load = match &self.state {
1030 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1031 BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
1032 };
1033
1034 entry
1035 .insert(
1036 cx.spawn(move |this, cx| async move {
1037 Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
1038 .await
1039 .map_err(Arc::new)
1040 })
1041 .shared(),
1042 )
1043 .clone()
1044 }
1045 };
1046
1047 cx.background_executor()
1048 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1049 }
1050
1051 #[cfg(any(test, feature = "test-support"))]
1052 pub fn set_change_set(&mut self, buffer_id: BufferId, change_set: Entity<BufferChangeSet>) {
1053 self.loading_change_sets
1054 .insert(buffer_id, Task::ready(Ok(change_set)).shared());
1055 }
1056
1057 pub async fn open_unstaged_changes_internal(
1058 this: WeakEntity<Self>,
1059 text: Result<Option<String>>,
1060 buffer: Entity<Buffer>,
1061 mut cx: AsyncAppContext,
1062 ) -> Result<Entity<BufferChangeSet>> {
1063 let text = match text {
1064 Err(e) => {
1065 this.update(&mut cx, |this, cx| {
1066 let buffer_id = buffer.read(cx).remote_id();
1067 this.loading_change_sets.remove(&buffer_id);
1068 })?;
1069 return Err(e);
1070 }
1071 Ok(text) => text,
1072 };
1073
1074 let change_set = cx.new(|cx| BufferChangeSet::new(&buffer, cx)).unwrap();
1075
1076 if let Some(text) = text {
1077 change_set
1078 .update(&mut cx, |change_set, cx| {
1079 let snapshot = buffer.read(cx).text_snapshot();
1080 change_set.set_base_text(text, snapshot, cx)
1081 })?
1082 .await
1083 .ok();
1084 }
1085
1086 this.update(&mut cx, |this, cx| {
1087 let buffer_id = buffer.read(cx).remote_id();
1088 this.loading_change_sets.remove(&buffer_id);
1089 if let Some(OpenBuffer::Complete {
1090 unstaged_changes, ..
1091 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1092 {
1093 *unstaged_changes = Some(change_set.downgrade());
1094 }
1095 })?;
1096
1097 Ok(change_set)
1098 }
1099
1100 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1101 match &self.state {
1102 BufferStoreState::Local(this) => this.create_buffer(cx),
1103 BufferStoreState::Remote(this) => this.create_buffer(cx),
1104 }
1105 }
1106
1107 pub fn save_buffer(
1108 &mut self,
1109 buffer: Entity<Buffer>,
1110 cx: &mut Context<Self>,
1111 ) -> Task<Result<()>> {
1112 match &mut self.state {
1113 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1114 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1115 }
1116 }
1117
1118 pub fn save_buffer_as(
1119 &mut self,
1120 buffer: Entity<Buffer>,
1121 path: ProjectPath,
1122 cx: &mut Context<Self>,
1123 ) -> Task<Result<()>> {
1124 let old_file = buffer.read(cx).file().cloned();
1125 let task = match &self.state {
1126 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1127 BufferStoreState::Remote(this) => {
1128 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1129 }
1130 };
1131 cx.spawn(|this, mut cx| async move {
1132 task.await?;
1133 this.update(&mut cx, |_, cx| {
1134 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1135 })
1136 })
1137 }
1138
1139 pub fn blame_buffer(
1140 &self,
1141 buffer: &Entity<Buffer>,
1142 version: Option<clock::Global>,
1143 cx: &App,
1144 ) -> Task<Result<Option<Blame>>> {
1145 let buffer = buffer.read(cx);
1146 let Some(file) = File::from_dyn(buffer.file()) else {
1147 return Task::ready(Err(anyhow!("buffer has no file")));
1148 };
1149
1150 match file.worktree.clone().read(cx) {
1151 Worktree::Local(worktree) => {
1152 let worktree = worktree.snapshot();
1153 let blame_params = maybe!({
1154 let local_repo = match worktree.local_repo_for_path(&file.path) {
1155 Some(repo_for_path) => repo_for_path,
1156 None => return Ok(None),
1157 };
1158
1159 let relative_path = local_repo
1160 .relativize(&file.path)
1161 .context("failed to relativize buffer path")?;
1162
1163 let repo = local_repo.repo().clone();
1164
1165 let content = match version {
1166 Some(version) => buffer.rope_for_version(&version).clone(),
1167 None => buffer.as_rope().clone(),
1168 };
1169
1170 anyhow::Ok(Some((repo, relative_path, content)))
1171 });
1172
1173 cx.background_executor().spawn(async move {
1174 let Some((repo, relative_path, content)) = blame_params? else {
1175 return Ok(None);
1176 };
1177 repo.blame(&relative_path, content)
1178 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1179 .map(Some)
1180 })
1181 }
1182 Worktree::Remote(worktree) => {
1183 let buffer_id = buffer.remote_id();
1184 let version = buffer.version();
1185 let project_id = worktree.project_id();
1186 let client = worktree.client();
1187 cx.spawn(|_| async move {
1188 let response = client
1189 .request(proto::BlameBuffer {
1190 project_id,
1191 buffer_id: buffer_id.into(),
1192 version: serialize_version(&version),
1193 })
1194 .await?;
1195 Ok(deserialize_blame_buffer_response(response))
1196 })
1197 }
1198 }
1199 }
1200
1201 pub fn get_permalink_to_line(
1202 &self,
1203 buffer: &Entity<Buffer>,
1204 selection: Range<u32>,
1205 cx: &App,
1206 ) -> Task<Result<url::Url>> {
1207 let buffer = buffer.read(cx);
1208 let Some(file) = File::from_dyn(buffer.file()) else {
1209 return Task::ready(Err(anyhow!("buffer has no file")));
1210 };
1211
1212 match file.worktree.read(cx) {
1213 Worktree::Local(worktree) => {
1214 let worktree_path = worktree.abs_path().clone();
1215 let Some((repo_entry, repo)) =
1216 worktree.repository_for_path(file.path()).and_then(|entry| {
1217 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1218 Some((entry, repo))
1219 })
1220 else {
1221 // If we're not in a Git repo, check whether this is a Rust source
1222 // file in the Cargo registry (presumably opened with go-to-definition
1223 // from a normal Rust file). If so, we can put together a permalink
1224 // using crate metadata.
1225 if !buffer
1226 .language()
1227 .is_some_and(|lang| lang.name() == "Rust".into())
1228 {
1229 return Task::ready(Err(anyhow!("no permalink available")));
1230 }
1231 let file_path = worktree_path.join(file.path());
1232 return cx.spawn(|cx| async move {
1233 let provider_registry =
1234 cx.update(GitHostingProviderRegistry::default_global)?;
1235 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1236 .map_err(|_| anyhow!("no permalink available"))
1237 });
1238 };
1239
1240 let path = match repo_entry.relativize(file.path()) {
1241 Ok(RepoPath(path)) => path,
1242 Err(e) => return Task::ready(Err(e)),
1243 };
1244
1245 cx.spawn(|cx| async move {
1246 const REMOTE_NAME: &str = "origin";
1247 let origin_url = repo
1248 .remote_url(REMOTE_NAME)
1249 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1250
1251 let sha = repo
1252 .head_sha()
1253 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1254
1255 let provider_registry =
1256 cx.update(GitHostingProviderRegistry::default_global)?;
1257
1258 let (provider, remote) =
1259 parse_git_remote_url(provider_registry, &origin_url)
1260 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1261
1262 let path = path
1263 .to_str()
1264 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1265
1266 Ok(provider.build_permalink(
1267 remote,
1268 BuildPermalinkParams {
1269 sha: &sha,
1270 path,
1271 selection: Some(selection),
1272 },
1273 ))
1274 })
1275 }
1276 Worktree::Remote(worktree) => {
1277 let buffer_id = buffer.remote_id();
1278 let project_id = worktree.project_id();
1279 let client = worktree.client();
1280 cx.spawn(|_| async move {
1281 let response = client
1282 .request(proto::GetPermalinkToLine {
1283 project_id,
1284 buffer_id: buffer_id.into(),
1285 selection: Some(proto::Range {
1286 start: selection.start as u64,
1287 end: selection.end as u64,
1288 }),
1289 })
1290 .await?;
1291
1292 url::Url::parse(&response.permalink).context("failed to parse permalink")
1293 })
1294 }
1295 }
1296 }
1297
1298 fn add_buffer(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1299 let remote_id = buffer.read(cx).remote_id();
1300 let is_remote = buffer.read(cx).replica_id() != 0;
1301 let open_buffer = OpenBuffer::Complete {
1302 buffer: buffer.downgrade(),
1303 unstaged_changes: None,
1304 };
1305
1306 let handle = cx.model().downgrade();
1307 buffer.update(cx, move |_, cx| {
1308 cx.on_release(move |buffer, cx| {
1309 handle
1310 .update(cx, |_, cx| {
1311 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1312 })
1313 .ok();
1314 })
1315 .detach()
1316 });
1317
1318 match self.opened_buffers.entry(remote_id) {
1319 hash_map::Entry::Vacant(entry) => {
1320 entry.insert(open_buffer);
1321 }
1322 hash_map::Entry::Occupied(mut entry) => {
1323 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1324 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1325 } else if entry.get().upgrade().is_some() {
1326 if is_remote {
1327 return Ok(());
1328 } else {
1329 debug_panic!("buffer {} was already registered", remote_id);
1330 Err(anyhow!("buffer {} was already registered", remote_id))?;
1331 }
1332 }
1333 entry.insert(open_buffer);
1334 }
1335 }
1336
1337 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1338 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1339 Ok(())
1340 }
1341
1342 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1343 self.opened_buffers
1344 .values()
1345 .filter_map(|buffer| buffer.upgrade())
1346 }
1347
1348 pub fn loading_buffers(
1349 &self,
1350 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1351 self.loading_buffers.iter().map(|(path, task)| {
1352 let task = task.clone();
1353 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1354 })
1355 }
1356
1357 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1358 self.buffers().find_map(|buffer| {
1359 let file = File::from_dyn(buffer.read(cx).file())?;
1360 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1361 Some(buffer)
1362 } else {
1363 None
1364 }
1365 })
1366 }
1367
1368 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1369 self.opened_buffers.get(&buffer_id)?.upgrade()
1370 }
1371
1372 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1373 self.get(buffer_id)
1374 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1375 }
1376
1377 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1378 self.get(buffer_id).or_else(|| {
1379 self.as_remote()
1380 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1381 })
1382 }
1383
1384 pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Entity<BufferChangeSet>> {
1385 if let OpenBuffer::Complete {
1386 unstaged_changes, ..
1387 } = self.opened_buffers.get(&buffer_id)?
1388 {
1389 unstaged_changes.as_ref()?.upgrade()
1390 } else {
1391 None
1392 }
1393 }
1394
1395 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1396 let buffers = self
1397 .buffers()
1398 .map(|buffer| {
1399 let buffer = buffer.read(cx);
1400 proto::BufferVersion {
1401 id: buffer.remote_id().into(),
1402 version: language::proto::serialize_version(&buffer.version),
1403 }
1404 })
1405 .collect();
1406 let incomplete_buffer_ids = self
1407 .as_remote()
1408 .map(|remote| remote.incomplete_buffer_ids())
1409 .unwrap_or_default();
1410 (buffers, incomplete_buffer_ids)
1411 }
1412
1413 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1414 for open_buffer in self.opened_buffers.values_mut() {
1415 if let Some(buffer) = open_buffer.upgrade() {
1416 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1417 }
1418 }
1419
1420 for buffer in self.buffers() {
1421 buffer.update(cx, |buffer, cx| {
1422 buffer.set_capability(Capability::ReadOnly, cx)
1423 });
1424 }
1425
1426 if let Some(remote) = self.as_remote_mut() {
1427 // Wake up all futures currently waiting on a buffer to get opened,
1428 // to give them a chance to fail now that we've disconnected.
1429 remote.remote_buffer_listeners.clear()
1430 }
1431 }
1432
1433 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1434 self.downstream_client = Some((downstream_client, remote_id));
1435 }
1436
1437 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1438 self.downstream_client.take();
1439 self.forget_shared_buffers();
1440 }
1441
1442 pub fn discard_incomplete(&mut self) {
1443 self.opened_buffers
1444 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1445 }
1446
1447 pub fn find_search_candidates(
1448 &mut self,
1449 query: &SearchQuery,
1450 mut limit: usize,
1451 fs: Arc<dyn Fs>,
1452 cx: &mut Context<Self>,
1453 ) -> Receiver<Entity<Buffer>> {
1454 let (tx, rx) = smol::channel::unbounded();
1455 let mut open_buffers = HashSet::default();
1456 let mut unnamed_buffers = Vec::new();
1457 for handle in self.buffers() {
1458 let buffer = handle.read(cx);
1459 if let Some(entry_id) = buffer.entry_id(cx) {
1460 open_buffers.insert(entry_id);
1461 } else {
1462 limit = limit.saturating_sub(1);
1463 unnamed_buffers.push(handle)
1464 };
1465 }
1466
1467 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1468 let project_paths_rx = self
1469 .worktree_store
1470 .update(cx, |worktree_store, cx| {
1471 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1472 })
1473 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1474
1475 cx.spawn(|this, mut cx| async move {
1476 for buffer in unnamed_buffers {
1477 tx.send(buffer).await.ok();
1478 }
1479
1480 let mut project_paths_rx = pin!(project_paths_rx);
1481 while let Some(project_paths) = project_paths_rx.next().await {
1482 let buffers = this.update(&mut cx, |this, cx| {
1483 project_paths
1484 .into_iter()
1485 .map(|project_path| this.open_buffer(project_path, cx))
1486 .collect::<Vec<_>>()
1487 })?;
1488 for buffer_task in buffers {
1489 if let Some(buffer) = buffer_task.await.log_err() {
1490 if tx.send(buffer).await.is_err() {
1491 return anyhow::Ok(());
1492 }
1493 }
1494 }
1495 }
1496 anyhow::Ok(())
1497 })
1498 .detach();
1499 rx
1500 }
1501
1502 pub fn recalculate_buffer_diffs(
1503 &mut self,
1504 buffers: Vec<Entity<Buffer>>,
1505 cx: &mut Context<Self>,
1506 ) -> impl Future<Output = ()> {
1507 let mut futures = Vec::new();
1508 for buffer in buffers {
1509 let buffer = buffer.read(cx).text_snapshot();
1510 if let Some(OpenBuffer::Complete {
1511 unstaged_changes, ..
1512 }) = self.opened_buffers.get_mut(&buffer.remote_id())
1513 {
1514 if let Some(unstaged_changes) = unstaged_changes
1515 .as_ref()
1516 .and_then(|changes| changes.upgrade())
1517 {
1518 unstaged_changes.update(cx, |unstaged_changes, cx| {
1519 futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
1520 });
1521 } else {
1522 unstaged_changes.take();
1523 }
1524 }
1525 }
1526 async move {
1527 futures::future::join_all(futures).await;
1528 }
1529 }
1530
1531 fn on_buffer_event(
1532 &mut self,
1533 buffer: Entity<Buffer>,
1534 event: &BufferEvent,
1535 cx: &mut Context<Self>,
1536 ) {
1537 match event {
1538 BufferEvent::FileHandleChanged => {
1539 if let Some(local) = self.as_local_mut() {
1540 local.buffer_changed_file(buffer, cx);
1541 }
1542 }
1543 BufferEvent::Reloaded => {
1544 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1545 return;
1546 };
1547 let buffer = buffer.read(cx);
1548 downstream_client
1549 .send(proto::BufferReloaded {
1550 project_id: *project_id,
1551 buffer_id: buffer.remote_id().to_proto(),
1552 version: serialize_version(&buffer.version()),
1553 mtime: buffer.saved_mtime().map(|t| t.into()),
1554 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1555 })
1556 .log_err();
1557 }
1558 _ => {}
1559 }
1560 }
1561
1562 pub async fn handle_update_buffer(
1563 this: Entity<Self>,
1564 envelope: TypedEnvelope<proto::UpdateBuffer>,
1565 mut cx: AsyncAppContext,
1566 ) -> Result<proto::Ack> {
1567 let payload = envelope.payload.clone();
1568 let buffer_id = BufferId::new(payload.buffer_id)?;
1569 let ops = payload
1570 .operations
1571 .into_iter()
1572 .map(language::proto::deserialize_operation)
1573 .collect::<Result<Vec<_>, _>>()?;
1574 this.update(&mut cx, |this, cx| {
1575 match this.opened_buffers.entry(buffer_id) {
1576 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1577 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1578 OpenBuffer::Complete { buffer, .. } => {
1579 if let Some(buffer) = buffer.upgrade() {
1580 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1581 }
1582 }
1583 },
1584 hash_map::Entry::Vacant(e) => {
1585 e.insert(OpenBuffer::Operations(ops));
1586 }
1587 }
1588 Ok(proto::Ack {})
1589 })?
1590 }
1591
1592 pub fn register_shared_lsp_handle(
1593 &mut self,
1594 peer_id: proto::PeerId,
1595 buffer_id: BufferId,
1596 handle: OpenLspBufferHandle,
1597 ) {
1598 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
1599 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
1600 buffer.lsp_handle = Some(handle);
1601 return;
1602 }
1603 }
1604 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1605 }
1606
1607 pub fn handle_synchronize_buffers(
1608 &mut self,
1609 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1610 cx: &mut Context<Self>,
1611 client: Arc<Client>,
1612 ) -> Result<proto::SynchronizeBuffersResponse> {
1613 let project_id = envelope.payload.project_id;
1614 let mut response = proto::SynchronizeBuffersResponse {
1615 buffers: Default::default(),
1616 };
1617 let Some(guest_id) = envelope.original_sender_id else {
1618 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1619 };
1620
1621 self.shared_buffers.entry(guest_id).or_default().clear();
1622 for buffer in envelope.payload.buffers {
1623 let buffer_id = BufferId::new(buffer.id)?;
1624 let remote_version = language::proto::deserialize_version(&buffer.version);
1625 if let Some(buffer) = self.get(buffer_id) {
1626 self.shared_buffers
1627 .entry(guest_id)
1628 .or_default()
1629 .entry(buffer_id)
1630 .or_insert_with(|| SharedBuffer {
1631 buffer: buffer.clone(),
1632 unstaged_changes: None,
1633 lsp_handle: None,
1634 });
1635
1636 let buffer = buffer.read(cx);
1637 response.buffers.push(proto::BufferVersion {
1638 id: buffer_id.into(),
1639 version: language::proto::serialize_version(&buffer.version),
1640 });
1641
1642 let operations = buffer.serialize_ops(Some(remote_version), cx);
1643 let client = client.clone();
1644 if let Some(file) = buffer.file() {
1645 client
1646 .send(proto::UpdateBufferFile {
1647 project_id,
1648 buffer_id: buffer_id.into(),
1649 file: Some(file.to_proto(cx)),
1650 })
1651 .log_err();
1652 }
1653
1654 // TODO(max): do something
1655 // client
1656 // .send(proto::UpdateStagedText {
1657 // project_id,
1658 // buffer_id: buffer_id.into(),
1659 // diff_base: buffer.diff_base().map(ToString::to_string),
1660 // })
1661 // .log_err();
1662
1663 client
1664 .send(proto::BufferReloaded {
1665 project_id,
1666 buffer_id: buffer_id.into(),
1667 version: language::proto::serialize_version(buffer.saved_version()),
1668 mtime: buffer.saved_mtime().map(|time| time.into()),
1669 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1670 as i32,
1671 })
1672 .log_err();
1673
1674 cx.background_executor()
1675 .spawn(
1676 async move {
1677 let operations = operations.await;
1678 for chunk in split_operations(operations) {
1679 client
1680 .request(proto::UpdateBuffer {
1681 project_id,
1682 buffer_id: buffer_id.into(),
1683 operations: chunk,
1684 })
1685 .await?;
1686 }
1687 anyhow::Ok(())
1688 }
1689 .log_err(),
1690 )
1691 .detach();
1692 }
1693 }
1694 Ok(response)
1695 }
1696
1697 pub fn handle_create_buffer_for_peer(
1698 &mut self,
1699 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1700 replica_id: u16,
1701 capability: Capability,
1702 cx: &mut Context<Self>,
1703 ) -> Result<()> {
1704 let Some(remote) = self.as_remote_mut() else {
1705 return Err(anyhow!("buffer store is not a remote"));
1706 };
1707
1708 if let Some(buffer) =
1709 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1710 {
1711 self.add_buffer(buffer, cx)?;
1712 }
1713
1714 Ok(())
1715 }
1716
1717 pub async fn handle_update_buffer_file(
1718 this: Entity<Self>,
1719 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1720 mut cx: AsyncAppContext,
1721 ) -> Result<()> {
1722 let buffer_id = envelope.payload.buffer_id;
1723 let buffer_id = BufferId::new(buffer_id)?;
1724
1725 this.update(&mut cx, |this, cx| {
1726 let payload = envelope.payload.clone();
1727 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1728 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1729 let worktree = this
1730 .worktree_store
1731 .read(cx)
1732 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1733 .ok_or_else(|| anyhow!("no such worktree"))?;
1734 let file = File::from_proto(file, worktree, cx)?;
1735 let old_file = buffer.update(cx, |buffer, cx| {
1736 let old_file = buffer.file().cloned();
1737 let new_path = file.path.clone();
1738 buffer.file_updated(Arc::new(file), cx);
1739 if old_file
1740 .as_ref()
1741 .map_or(true, |old| *old.path() != new_path)
1742 {
1743 Some(old_file)
1744 } else {
1745 None
1746 }
1747 });
1748 if let Some(old_file) = old_file {
1749 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1750 }
1751 }
1752 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1753 downstream_client
1754 .send(proto::UpdateBufferFile {
1755 project_id: *project_id,
1756 buffer_id: buffer_id.into(),
1757 file: envelope.payload.file,
1758 })
1759 .log_err();
1760 }
1761 Ok(())
1762 })?
1763 }
1764
1765 pub async fn handle_save_buffer(
1766 this: Entity<Self>,
1767 envelope: TypedEnvelope<proto::SaveBuffer>,
1768 mut cx: AsyncAppContext,
1769 ) -> Result<proto::BufferSaved> {
1770 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1771 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1772 anyhow::Ok((
1773 this.get_existing(buffer_id)?,
1774 this.downstream_client
1775 .as_ref()
1776 .map(|(_, project_id)| *project_id)
1777 .context("project is not shared")?,
1778 ))
1779 })??;
1780 buffer
1781 .update(&mut cx, |buffer, _| {
1782 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1783 })?
1784 .await?;
1785 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1786
1787 if let Some(new_path) = envelope.payload.new_path {
1788 let new_path = ProjectPath::from_proto(new_path);
1789 this.update(&mut cx, |this, cx| {
1790 this.save_buffer_as(buffer.clone(), new_path, cx)
1791 })?
1792 .await?;
1793 } else {
1794 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1795 .await?;
1796 }
1797
1798 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1799 project_id,
1800 buffer_id: buffer_id.into(),
1801 version: serialize_version(buffer.saved_version()),
1802 mtime: buffer.saved_mtime().map(|time| time.into()),
1803 })
1804 }
1805
1806 pub async fn handle_close_buffer(
1807 this: Entity<Self>,
1808 envelope: TypedEnvelope<proto::CloseBuffer>,
1809 mut cx: AsyncAppContext,
1810 ) -> Result<()> {
1811 let peer_id = envelope.sender_id;
1812 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1813 this.update(&mut cx, |this, _| {
1814 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1815 if shared.remove(&buffer_id).is_some() {
1816 if shared.is_empty() {
1817 this.shared_buffers.remove(&peer_id);
1818 }
1819 return;
1820 }
1821 }
1822 debug_panic!(
1823 "peer_id {} closed buffer_id {} which was either not open or already closed",
1824 peer_id,
1825 buffer_id
1826 )
1827 })
1828 }
1829
1830 pub async fn handle_buffer_saved(
1831 this: Entity<Self>,
1832 envelope: TypedEnvelope<proto::BufferSaved>,
1833 mut cx: AsyncAppContext,
1834 ) -> Result<()> {
1835 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1836 let version = deserialize_version(&envelope.payload.version);
1837 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1838 this.update(&mut cx, move |this, cx| {
1839 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1840 buffer.update(cx, |buffer, cx| {
1841 buffer.did_save(version, mtime, cx);
1842 });
1843 }
1844
1845 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1846 downstream_client
1847 .send(proto::BufferSaved {
1848 project_id: *project_id,
1849 buffer_id: buffer_id.into(),
1850 mtime: envelope.payload.mtime,
1851 version: envelope.payload.version,
1852 })
1853 .log_err();
1854 }
1855 })
1856 }
1857
1858 pub async fn handle_buffer_reloaded(
1859 this: Entity<Self>,
1860 envelope: TypedEnvelope<proto::BufferReloaded>,
1861 mut cx: AsyncAppContext,
1862 ) -> Result<()> {
1863 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1864 let version = deserialize_version(&envelope.payload.version);
1865 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1866 let line_ending = deserialize_line_ending(
1867 proto::LineEnding::from_i32(envelope.payload.line_ending)
1868 .ok_or_else(|| anyhow!("missing line ending"))?,
1869 );
1870 this.update(&mut cx, |this, cx| {
1871 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1872 buffer.update(cx, |buffer, cx| {
1873 buffer.did_reload(version, line_ending, mtime, cx);
1874 });
1875 }
1876
1877 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1878 downstream_client
1879 .send(proto::BufferReloaded {
1880 project_id: *project_id,
1881 buffer_id: buffer_id.into(),
1882 mtime: envelope.payload.mtime,
1883 version: envelope.payload.version,
1884 line_ending: envelope.payload.line_ending,
1885 })
1886 .log_err();
1887 }
1888 })
1889 }
1890
1891 pub async fn handle_blame_buffer(
1892 this: Entity<Self>,
1893 envelope: TypedEnvelope<proto::BlameBuffer>,
1894 mut cx: AsyncAppContext,
1895 ) -> Result<proto::BlameBufferResponse> {
1896 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1897 let version = deserialize_version(&envelope.payload.version);
1898 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1899 buffer
1900 .update(&mut cx, |buffer, _| {
1901 buffer.wait_for_version(version.clone())
1902 })?
1903 .await?;
1904 let blame = this
1905 .update(&mut cx, |this, cx| {
1906 this.blame_buffer(&buffer, Some(version), cx)
1907 })?
1908 .await?;
1909 Ok(serialize_blame_buffer_response(blame))
1910 }
1911
1912 pub async fn handle_get_permalink_to_line(
1913 this: Entity<Self>,
1914 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1915 mut cx: AsyncAppContext,
1916 ) -> Result<proto::GetPermalinkToLineResponse> {
1917 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1918 // let version = deserialize_version(&envelope.payload.version);
1919 let selection = {
1920 let proto_selection = envelope
1921 .payload
1922 .selection
1923 .context("no selection to get permalink for defined")?;
1924 proto_selection.start as u32..proto_selection.end as u32
1925 };
1926 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1927 let permalink = this
1928 .update(&mut cx, |this, cx| {
1929 this.get_permalink_to_line(&buffer, selection, cx)
1930 })?
1931 .await?;
1932 Ok(proto::GetPermalinkToLineResponse {
1933 permalink: permalink.to_string(),
1934 })
1935 }
1936
1937 pub async fn handle_get_staged_text(
1938 this: Entity<Self>,
1939 request: TypedEnvelope<proto::GetStagedText>,
1940 mut cx: AsyncAppContext,
1941 ) -> Result<proto::GetStagedTextResponse> {
1942 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1943 let change_set = this
1944 .update(&mut cx, |this, cx| {
1945 let buffer = this.get(buffer_id)?;
1946 Some(this.open_unstaged_changes(buffer, cx))
1947 })?
1948 .ok_or_else(|| anyhow!("no such buffer"))?
1949 .await?;
1950 this.update(&mut cx, |this, _| {
1951 let shared_buffers = this
1952 .shared_buffers
1953 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1954 .or_default();
1955 debug_assert!(shared_buffers.contains_key(&buffer_id));
1956 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
1957 shared.unstaged_changes = Some(change_set.clone());
1958 }
1959 })?;
1960 let staged_text = change_set.read_with(&cx, |change_set, _| {
1961 change_set.base_text.as_ref().map(|buffer| buffer.text())
1962 })?;
1963 Ok(proto::GetStagedTextResponse { staged_text })
1964 }
1965
1966 pub async fn handle_update_diff_base(
1967 this: Entity<Self>,
1968 request: TypedEnvelope<proto::UpdateDiffBase>,
1969 mut cx: AsyncAppContext,
1970 ) -> Result<()> {
1971 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1972 let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
1973 if let OpenBuffer::Complete {
1974 unstaged_changes,
1975 buffer,
1976 } = this.opened_buffers.get(&buffer_id)?
1977 {
1978 Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
1979 } else {
1980 None
1981 }
1982 })?
1983 else {
1984 return Ok(());
1985 };
1986 change_set.update(&mut cx, |change_set, cx| {
1987 if let Some(staged_text) = request.payload.staged_text {
1988 let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
1989 } else {
1990 change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
1991 }
1992 })?;
1993 Ok(())
1994 }
1995
1996 pub fn reload_buffers(
1997 &self,
1998 buffers: HashSet<Entity<Buffer>>,
1999 push_to_history: bool,
2000 cx: &mut Context<Self>,
2001 ) -> Task<Result<ProjectTransaction>> {
2002 if buffers.is_empty() {
2003 return Task::ready(Ok(ProjectTransaction::default()));
2004 }
2005 match &self.state {
2006 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2007 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2008 }
2009 }
2010
2011 async fn handle_reload_buffers(
2012 this: Entity<Self>,
2013 envelope: TypedEnvelope<proto::ReloadBuffers>,
2014 mut cx: AsyncAppContext,
2015 ) -> Result<proto::ReloadBuffersResponse> {
2016 let sender_id = envelope.original_sender_id().unwrap_or_default();
2017 let reload = this.update(&mut cx, |this, cx| {
2018 let mut buffers = HashSet::default();
2019 for buffer_id in &envelope.payload.buffer_ids {
2020 let buffer_id = BufferId::new(*buffer_id)?;
2021 buffers.insert(this.get_existing(buffer_id)?);
2022 }
2023 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2024 })??;
2025
2026 let project_transaction = reload.await?;
2027 let project_transaction = this.update(&mut cx, |this, cx| {
2028 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2029 })?;
2030 Ok(proto::ReloadBuffersResponse {
2031 transaction: Some(project_transaction),
2032 })
2033 }
2034
2035 pub fn create_buffer_for_peer(
2036 &mut self,
2037 buffer: &Entity<Buffer>,
2038 peer_id: proto::PeerId,
2039 cx: &mut Context<Self>,
2040 ) -> Task<Result<()>> {
2041 let buffer_id = buffer.read(cx).remote_id();
2042 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2043 if shared_buffers.contains_key(&buffer_id) {
2044 return Task::ready(Ok(()));
2045 }
2046 shared_buffers.insert(
2047 buffer_id,
2048 SharedBuffer {
2049 buffer: buffer.clone(),
2050 unstaged_changes: None,
2051 lsp_handle: None,
2052 },
2053 );
2054
2055 let Some((client, project_id)) = self.downstream_client.clone() else {
2056 return Task::ready(Ok(()));
2057 };
2058
2059 cx.spawn(|this, mut cx| async move {
2060 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2061 return anyhow::Ok(());
2062 };
2063
2064 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2065 let operations = operations.await;
2066 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2067
2068 let initial_state = proto::CreateBufferForPeer {
2069 project_id,
2070 peer_id: Some(peer_id),
2071 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2072 };
2073
2074 if client.send(initial_state).log_err().is_some() {
2075 let client = client.clone();
2076 cx.background_executor()
2077 .spawn(async move {
2078 let mut chunks = split_operations(operations).peekable();
2079 while let Some(chunk) = chunks.next() {
2080 let is_last = chunks.peek().is_none();
2081 client.send(proto::CreateBufferForPeer {
2082 project_id,
2083 peer_id: Some(peer_id),
2084 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2085 proto::BufferChunk {
2086 buffer_id: buffer_id.into(),
2087 operations: chunk,
2088 is_last,
2089 },
2090 )),
2091 })?;
2092 }
2093 anyhow::Ok(())
2094 })
2095 .await
2096 .log_err();
2097 }
2098 Ok(())
2099 })
2100 }
2101
2102 pub fn forget_shared_buffers(&mut self) {
2103 self.shared_buffers.clear();
2104 }
2105
2106 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2107 self.shared_buffers.remove(peer_id);
2108 }
2109
2110 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2111 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2112 self.shared_buffers.insert(new_peer_id, buffers);
2113 }
2114 }
2115
2116 pub fn has_shared_buffers(&self) -> bool {
2117 !self.shared_buffers.is_empty()
2118 }
2119
2120 pub fn create_local_buffer(
2121 &mut self,
2122 text: &str,
2123 language: Option<Arc<Language>>,
2124 cx: &mut Context<Self>,
2125 ) -> Entity<Buffer> {
2126 let buffer = cx.new(|cx| {
2127 Buffer::local(text, cx)
2128 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2129 });
2130
2131 self.add_buffer(buffer.clone(), cx).log_err();
2132 let buffer_id = buffer.read(cx).remote_id();
2133
2134 let this = self
2135 .as_local_mut()
2136 .expect("local-only method called in a non-local context");
2137 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2138 this.local_buffer_ids_by_path.insert(
2139 ProjectPath {
2140 worktree_id: file.worktree_id(cx),
2141 path: file.path.clone(),
2142 },
2143 buffer_id,
2144 );
2145
2146 if let Some(entry_id) = file.entry_id {
2147 this.local_buffer_ids_by_entry_id
2148 .insert(entry_id, buffer_id);
2149 }
2150 }
2151 buffer
2152 }
2153
2154 pub fn deserialize_project_transaction(
2155 &mut self,
2156 message: proto::ProjectTransaction,
2157 push_to_history: bool,
2158 cx: &mut Context<Self>,
2159 ) -> Task<Result<ProjectTransaction>> {
2160 if let Some(this) = self.as_remote_mut() {
2161 this.deserialize_project_transaction(message, push_to_history, cx)
2162 } else {
2163 debug_panic!("not a remote buffer store");
2164 Task::ready(Err(anyhow!("not a remote buffer store")))
2165 }
2166 }
2167
2168 pub fn wait_for_remote_buffer(
2169 &mut self,
2170 id: BufferId,
2171 cx: &mut Context<BufferStore>,
2172 ) -> Task<Result<Entity<Buffer>>> {
2173 if let Some(this) = self.as_remote_mut() {
2174 this.wait_for_remote_buffer(id, cx)
2175 } else {
2176 debug_panic!("not a remote buffer store");
2177 Task::ready(Err(anyhow!("not a remote buffer store")))
2178 }
2179 }
2180
2181 pub fn serialize_project_transaction_for_peer(
2182 &mut self,
2183 project_transaction: ProjectTransaction,
2184 peer_id: proto::PeerId,
2185 cx: &mut Context<Self>,
2186 ) -> proto::ProjectTransaction {
2187 let mut serialized_transaction = proto::ProjectTransaction {
2188 buffer_ids: Default::default(),
2189 transactions: Default::default(),
2190 };
2191 for (buffer, transaction) in project_transaction.0 {
2192 self.create_buffer_for_peer(&buffer, peer_id, cx)
2193 .detach_and_log_err(cx);
2194 serialized_transaction
2195 .buffer_ids
2196 .push(buffer.read(cx).remote_id().into());
2197 serialized_transaction
2198 .transactions
2199 .push(language::proto::serialize_transaction(&transaction));
2200 }
2201 serialized_transaction
2202 }
2203}
2204
2205impl BufferChangeSet {
2206 pub fn new(buffer: &Entity<Buffer>, cx: &mut Context<Self>) -> Self {
2207 cx.subscribe(buffer, |this, buffer, event, cx| match event {
2208 BufferEvent::LanguageChanged => {
2209 this.language = buffer.read(cx).language().cloned();
2210 if let Some(base_text) = &this.base_text {
2211 let snapshot = language::Buffer::build_snapshot(
2212 base_text.as_rope().clone(),
2213 this.language.clone(),
2214 this.language_registry.clone(),
2215 cx,
2216 );
2217 this.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2218 let base_text = cx.background_executor().spawn(snapshot).await;
2219 this.update(&mut cx, |this, cx| {
2220 this.base_text = Some(base_text);
2221 cx.notify();
2222 })
2223 }));
2224 }
2225 }
2226 _ => {}
2227 })
2228 .detach();
2229
2230 let buffer = buffer.read(cx);
2231
2232 Self {
2233 buffer_id: buffer.remote_id(),
2234 base_text: None,
2235 diff_to_buffer: git::diff::BufferDiff::new(buffer),
2236 recalculate_diff_task: None,
2237 diff_updated_futures: Vec::new(),
2238 language: buffer.language().cloned(),
2239 language_registry: buffer.language_registry(),
2240 }
2241 }
2242
2243 #[cfg(any(test, feature = "test-support"))]
2244 pub fn new_with_base_text(
2245 base_text: String,
2246 buffer: &Entity<Buffer>,
2247 cx: &mut Context<Self>,
2248 ) -> Self {
2249 let mut this = Self::new(&buffer, cx);
2250 let _ = this.set_base_text(base_text, buffer.read(cx).text_snapshot(), cx);
2251 this
2252 }
2253
2254 pub fn diff_hunks_intersecting_range<'a>(
2255 &'a self,
2256 range: Range<text::Anchor>,
2257 buffer_snapshot: &'a text::BufferSnapshot,
2258 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2259 self.diff_to_buffer
2260 .hunks_intersecting_range(range, buffer_snapshot)
2261 }
2262
2263 pub fn diff_hunks_intersecting_range_rev<'a>(
2264 &'a self,
2265 range: Range<text::Anchor>,
2266 buffer_snapshot: &'a text::BufferSnapshot,
2267 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2268 self.diff_to_buffer
2269 .hunks_intersecting_range_rev(range, buffer_snapshot)
2270 }
2271
2272 #[cfg(any(test, feature = "test-support"))]
2273 pub fn base_text_string(&self) -> Option<String> {
2274 self.base_text.as_ref().map(|buffer| buffer.text())
2275 }
2276
2277 pub fn set_base_text(
2278 &mut self,
2279 mut base_text: String,
2280 buffer_snapshot: text::BufferSnapshot,
2281 cx: &mut Context<Self>,
2282 ) -> oneshot::Receiver<()> {
2283 LineEnding::normalize(&mut base_text);
2284 self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
2285 }
2286
2287 pub fn unset_base_text(
2288 &mut self,
2289 buffer_snapshot: text::BufferSnapshot,
2290 cx: &mut Context<Self>,
2291 ) {
2292 if self.base_text.is_some() {
2293 self.base_text = None;
2294 self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
2295 self.recalculate_diff_task.take();
2296 cx.notify();
2297 }
2298 }
2299
2300 pub fn recalculate_diff(
2301 &mut self,
2302 buffer_snapshot: text::BufferSnapshot,
2303 cx: &mut Context<Self>,
2304 ) -> oneshot::Receiver<()> {
2305 if let Some(base_text) = self.base_text.clone() {
2306 self.recalculate_diff_internal(base_text.text(), buffer_snapshot, false, cx)
2307 } else {
2308 oneshot::channel().1
2309 }
2310 }
2311
2312 fn recalculate_diff_internal(
2313 &mut self,
2314 base_text: String,
2315 buffer_snapshot: text::BufferSnapshot,
2316 base_text_changed: bool,
2317 cx: &mut Context<Self>,
2318 ) -> oneshot::Receiver<()> {
2319 let (tx, rx) = oneshot::channel();
2320 self.diff_updated_futures.push(tx);
2321 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2322 let new_base_text = if base_text_changed {
2323 let base_text_rope: Rope = base_text.as_str().into();
2324 let snapshot = this.update(&mut cx, |this, cx| {
2325 language::Buffer::build_snapshot(
2326 base_text_rope,
2327 this.language.clone(),
2328 this.language_registry.clone(),
2329 cx,
2330 )
2331 })?;
2332 Some(cx.background_executor().spawn(snapshot).await)
2333 } else {
2334 None
2335 };
2336 let diff = cx
2337 .background_executor()
2338 .spawn({
2339 let buffer_snapshot = buffer_snapshot.clone();
2340 async move { BufferDiff::build(&base_text, &buffer_snapshot) }
2341 })
2342 .await;
2343 this.update(&mut cx, |this, cx| {
2344 if let Some(new_base_text) = new_base_text {
2345 this.base_text = Some(new_base_text)
2346 }
2347 this.diff_to_buffer = diff;
2348 this.recalculate_diff_task.take();
2349 for tx in this.diff_updated_futures.drain(..) {
2350 tx.send(()).ok();
2351 }
2352 cx.notify();
2353 })?;
2354 Ok(())
2355 }));
2356 rx
2357 }
2358
2359 #[cfg(any(test, feature = "test-support"))]
2360 pub fn recalculate_diff_sync(
2361 &mut self,
2362 mut base_text: String,
2363 buffer_snapshot: text::BufferSnapshot,
2364 base_text_changed: bool,
2365 cx: &mut Context<Self>,
2366 ) {
2367 LineEnding::normalize(&mut base_text);
2368 let diff = BufferDiff::build(&base_text, &buffer_snapshot);
2369 if base_text_changed {
2370 self.base_text = Some(
2371 cx.background_executor()
2372 .clone()
2373 .block(Buffer::build_snapshot(
2374 base_text.into(),
2375 self.language.clone(),
2376 self.language_registry.clone(),
2377 cx,
2378 )),
2379 );
2380 }
2381 self.diff_to_buffer = diff;
2382 self.recalculate_diff_task.take();
2383 cx.notify();
2384 }
2385}
2386
2387impl OpenBuffer {
2388 fn upgrade(&self) -> Option<Entity<Buffer>> {
2389 match self {
2390 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2391 OpenBuffer::Operations(_) => None,
2392 }
2393 }
2394}
2395
2396fn is_not_found_error(error: &anyhow::Error) -> bool {
2397 error
2398 .root_cause()
2399 .downcast_ref::<io::Error>()
2400 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2401}
2402
2403fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2404 let Some(blame) = blame else {
2405 return proto::BlameBufferResponse {
2406 blame_response: None,
2407 };
2408 };
2409
2410 let entries = blame
2411 .entries
2412 .into_iter()
2413 .map(|entry| proto::BlameEntry {
2414 sha: entry.sha.as_bytes().into(),
2415 start_line: entry.range.start,
2416 end_line: entry.range.end,
2417 original_line_number: entry.original_line_number,
2418 author: entry.author.clone(),
2419 author_mail: entry.author_mail.clone(),
2420 author_time: entry.author_time,
2421 author_tz: entry.author_tz.clone(),
2422 committer: entry.committer.clone(),
2423 committer_mail: entry.committer_mail.clone(),
2424 committer_time: entry.committer_time,
2425 committer_tz: entry.committer_tz.clone(),
2426 summary: entry.summary.clone(),
2427 previous: entry.previous.clone(),
2428 filename: entry.filename.clone(),
2429 })
2430 .collect::<Vec<_>>();
2431
2432 let messages = blame
2433 .messages
2434 .into_iter()
2435 .map(|(oid, message)| proto::CommitMessage {
2436 oid: oid.as_bytes().into(),
2437 message,
2438 })
2439 .collect::<Vec<_>>();
2440
2441 let permalinks = blame
2442 .permalinks
2443 .into_iter()
2444 .map(|(oid, url)| proto::CommitPermalink {
2445 oid: oid.as_bytes().into(),
2446 permalink: url.to_string(),
2447 })
2448 .collect::<Vec<_>>();
2449
2450 proto::BlameBufferResponse {
2451 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2452 entries,
2453 messages,
2454 permalinks,
2455 remote_url: blame.remote_url,
2456 }),
2457 }
2458}
2459
2460fn deserialize_blame_buffer_response(
2461 response: proto::BlameBufferResponse,
2462) -> Option<git::blame::Blame> {
2463 let response = response.blame_response?;
2464 let entries = response
2465 .entries
2466 .into_iter()
2467 .filter_map(|entry| {
2468 Some(git::blame::BlameEntry {
2469 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2470 range: entry.start_line..entry.end_line,
2471 original_line_number: entry.original_line_number,
2472 committer: entry.committer,
2473 committer_time: entry.committer_time,
2474 committer_tz: entry.committer_tz,
2475 committer_mail: entry.committer_mail,
2476 author: entry.author,
2477 author_mail: entry.author_mail,
2478 author_time: entry.author_time,
2479 author_tz: entry.author_tz,
2480 summary: entry.summary,
2481 previous: entry.previous,
2482 filename: entry.filename,
2483 })
2484 })
2485 .collect::<Vec<_>>();
2486
2487 let messages = response
2488 .messages
2489 .into_iter()
2490 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2491 .collect::<HashMap<_, _>>();
2492
2493 let permalinks = response
2494 .permalinks
2495 .into_iter()
2496 .filter_map(|permalink| {
2497 Some((
2498 git::Oid::from_bytes(&permalink.oid).ok()?,
2499 Url::from_str(&permalink.permalink).ok()?,
2500 ))
2501 })
2502 .collect::<HashMap<_, _>>();
2503
2504 Some(Blame {
2505 entries,
2506 permalinks,
2507 messages,
2508 remote_url: response.remote_url,
2509 })
2510}
2511
2512fn get_permalink_in_rust_registry_src(
2513 provider_registry: Arc<GitHostingProviderRegistry>,
2514 path: PathBuf,
2515 selection: Range<u32>,
2516) -> Result<url::Url> {
2517 #[derive(Deserialize)]
2518 struct CargoVcsGit {
2519 sha1: String,
2520 }
2521
2522 #[derive(Deserialize)]
2523 struct CargoVcsInfo {
2524 git: CargoVcsGit,
2525 path_in_vcs: String,
2526 }
2527
2528 #[derive(Deserialize)]
2529 struct CargoPackage {
2530 repository: String,
2531 }
2532
2533 #[derive(Deserialize)]
2534 struct CargoToml {
2535 package: CargoPackage,
2536 }
2537
2538 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2539 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2540 Some((dir, json))
2541 }) else {
2542 bail!("No .cargo_vcs_info.json found in parent directories")
2543 };
2544 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2545 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2546 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2547 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2548 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2549 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2550 let permalink = provider.build_permalink(
2551 remote,
2552 BuildPermalinkParams {
2553 sha: &cargo_vcs_info.git.sha1,
2554 path: &path.to_string_lossy(),
2555 selection: Some(selection),
2556 },
2557 );
2558 Ok(permalink)
2559}