1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
13use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
14use gpui::{
15 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
16 Task, WeakModel,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
25};
26use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
27use serde::Deserialize;
28use smol::channel::Receiver;
29use std::{
30 io,
31 ops::Range,
32 path::{Path, PathBuf},
33 pin::pin,
34 str::FromStr as _,
35 sync::Arc,
36 time::Instant,
37};
38use text::{BufferId, LineEnding, Rope};
39use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
40use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
41
42/// A set of open buffers.
43pub struct BufferStore {
44 state: BufferStoreState,
45 #[allow(clippy::type_complexity)]
46 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Model<Buffer>, Arc<anyhow::Error>>>>>,
47 #[allow(clippy::type_complexity)]
48 loading_change_sets:
49 HashMap<BufferId, Shared<Task<Result<Model<BufferChangeSet>, Arc<anyhow::Error>>>>>,
50 worktree_store: Model<WorktreeStore>,
51 opened_buffers: HashMap<BufferId, OpenBuffer>,
52 downstream_client: Option<(AnyProtoClient, u64)>,
53 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
54}
55
56#[derive(Hash, Eq, PartialEq, Clone)]
57struct SharedBuffer {
58 buffer: Model<Buffer>,
59 unstaged_changes: Option<Model<BufferChangeSet>>,
60 lsp_handle: Option<OpenLspBufferHandle>,
61}
62
63#[derive(Debug)]
64pub struct BufferChangeSet {
65 pub buffer_id: BufferId,
66 pub base_text: Option<Model<Buffer>>,
67 pub diff_to_buffer: git::diff::BufferDiff,
68 pub recalculate_diff_task: Option<Task<Result<()>>>,
69 pub diff_updated_futures: Vec<oneshot::Sender<()>>,
70 pub base_text_version: usize,
71}
72
73enum BufferStoreState {
74 Local(LocalBufferStore),
75 Remote(RemoteBufferStore),
76}
77
78struct RemoteBufferStore {
79 shared_with_me: HashSet<Model<Buffer>>,
80 upstream_client: AnyProtoClient,
81 project_id: u64,
82 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
83 remote_buffer_listeners:
84 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
85 worktree_store: Model<WorktreeStore>,
86}
87
88struct LocalBufferStore {
89 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
90 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
91 worktree_store: Model<WorktreeStore>,
92 _subscription: Subscription,
93}
94
95enum OpenBuffer {
96 Complete {
97 buffer: WeakModel<Buffer>,
98 unstaged_changes: Option<WeakModel<BufferChangeSet>>,
99 },
100 Operations(Vec<Operation>),
101}
102
103pub enum BufferStoreEvent {
104 BufferAdded(Model<Buffer>),
105 BufferDropped(BufferId),
106 BufferChangedFilePath {
107 buffer: Model<Buffer>,
108 old_file: Option<Arc<dyn language::File>>,
109 },
110}
111
112#[derive(Default, Debug)]
113pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
114
115impl EventEmitter<BufferStoreEvent> for BufferStore {}
116
117impl RemoteBufferStore {
118 fn load_staged_text(
119 &self,
120 buffer_id: BufferId,
121 cx: &AppContext,
122 ) -> Task<Result<Option<String>>> {
123 let project_id = self.project_id;
124 let client = self.upstream_client.clone();
125 cx.background_executor().spawn(async move {
126 Ok(client
127 .request(proto::GetStagedText {
128 project_id,
129 buffer_id: buffer_id.to_proto(),
130 })
131 .await?
132 .staged_text)
133 })
134 }
135 pub fn wait_for_remote_buffer(
136 &mut self,
137 id: BufferId,
138 cx: &mut ModelContext<BufferStore>,
139 ) -> Task<Result<Model<Buffer>>> {
140 let (tx, rx) = oneshot::channel();
141 self.remote_buffer_listeners.entry(id).or_default().push(tx);
142
143 cx.spawn(|this, cx| async move {
144 if let Some(buffer) = this
145 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
146 .ok()
147 .flatten()
148 {
149 return Ok(buffer);
150 }
151
152 cx.background_executor()
153 .spawn(async move { rx.await? })
154 .await
155 })
156 }
157
158 fn save_remote_buffer(
159 &self,
160 buffer_handle: Model<Buffer>,
161 new_path: Option<proto::ProjectPath>,
162 cx: &ModelContext<BufferStore>,
163 ) -> Task<Result<()>> {
164 let buffer = buffer_handle.read(cx);
165 let buffer_id = buffer.remote_id().into();
166 let version = buffer.version();
167 let rpc = self.upstream_client.clone();
168 let project_id = self.project_id;
169 cx.spawn(move |_, mut cx| async move {
170 let response = rpc
171 .request(proto::SaveBuffer {
172 project_id,
173 buffer_id,
174 new_path,
175 version: serialize_version(&version),
176 })
177 .await?;
178 let version = deserialize_version(&response.version);
179 let mtime = response.mtime.map(|mtime| mtime.into());
180
181 buffer_handle.update(&mut cx, |buffer, cx| {
182 buffer.did_save(version.clone(), mtime, cx);
183 })?;
184
185 Ok(())
186 })
187 }
188
189 pub fn handle_create_buffer_for_peer(
190 &mut self,
191 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
192 replica_id: u16,
193 capability: Capability,
194 cx: &mut ModelContext<BufferStore>,
195 ) -> Result<Option<Model<Buffer>>> {
196 match envelope
197 .payload
198 .variant
199 .ok_or_else(|| anyhow!("missing variant"))?
200 {
201 proto::create_buffer_for_peer::Variant::State(mut state) => {
202 let buffer_id = BufferId::new(state.id)?;
203
204 let buffer_result = maybe!({
205 let mut buffer_file = None;
206 if let Some(file) = state.file.take() {
207 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
208 let worktree = self
209 .worktree_store
210 .read(cx)
211 .worktree_for_id(worktree_id, cx)
212 .ok_or_else(|| {
213 anyhow!("no worktree found for id {}", file.worktree_id)
214 })?;
215 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
216 as Arc<dyn language::File>);
217 }
218 Buffer::from_proto(replica_id, capability, state, buffer_file)
219 });
220
221 match buffer_result {
222 Ok(buffer) => {
223 let buffer = cx.new_model(|_| buffer);
224 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
225 }
226 Err(error) => {
227 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
228 for listener in listeners {
229 listener.send(Err(anyhow!(error.cloned()))).ok();
230 }
231 }
232 }
233 }
234 }
235 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
236 let buffer_id = BufferId::new(chunk.buffer_id)?;
237 let buffer = self
238 .loading_remote_buffers_by_id
239 .get(&buffer_id)
240 .cloned()
241 .ok_or_else(|| {
242 anyhow!(
243 "received chunk for buffer {} without initial state",
244 chunk.buffer_id
245 )
246 })?;
247
248 let result = maybe!({
249 let operations = chunk
250 .operations
251 .into_iter()
252 .map(language::proto::deserialize_operation)
253 .collect::<Result<Vec<_>>>()?;
254 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
255 anyhow::Ok(())
256 });
257
258 if let Err(error) = result {
259 self.loading_remote_buffers_by_id.remove(&buffer_id);
260 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
261 for listener in listeners {
262 listener.send(Err(error.cloned())).ok();
263 }
264 }
265 } else if chunk.is_last {
266 self.loading_remote_buffers_by_id.remove(&buffer_id);
267 if self.upstream_client.is_via_collab() {
268 // retain buffers sent by peers to avoid races.
269 self.shared_with_me.insert(buffer.clone());
270 }
271
272 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
273 for sender in senders {
274 sender.send(Ok(buffer.clone())).ok();
275 }
276 }
277 return Ok(Some(buffer));
278 }
279 }
280 }
281 return Ok(None);
282 }
283
284 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
285 self.loading_remote_buffers_by_id
286 .keys()
287 .copied()
288 .collect::<Vec<_>>()
289 }
290
291 pub fn deserialize_project_transaction(
292 &self,
293 message: proto::ProjectTransaction,
294 push_to_history: bool,
295 cx: &mut ModelContext<BufferStore>,
296 ) -> Task<Result<ProjectTransaction>> {
297 cx.spawn(|this, mut cx| async move {
298 let mut project_transaction = ProjectTransaction::default();
299 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
300 {
301 let buffer_id = BufferId::new(buffer_id)?;
302 let buffer = this
303 .update(&mut cx, |this, cx| {
304 this.wait_for_remote_buffer(buffer_id, cx)
305 })?
306 .await?;
307 let transaction = language::proto::deserialize_transaction(transaction)?;
308 project_transaction.0.insert(buffer, transaction);
309 }
310
311 for (buffer, transaction) in &project_transaction.0 {
312 buffer
313 .update(&mut cx, |buffer, _| {
314 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
315 })?
316 .await?;
317
318 if push_to_history {
319 buffer.update(&mut cx, |buffer, _| {
320 buffer.push_transaction(transaction.clone(), Instant::now());
321 })?;
322 }
323 }
324
325 Ok(project_transaction)
326 })
327 }
328
329 fn open_buffer(
330 &self,
331 path: Arc<Path>,
332 worktree: Model<Worktree>,
333 cx: &mut ModelContext<BufferStore>,
334 ) -> Task<Result<Model<Buffer>>> {
335 let worktree_id = worktree.read(cx).id().to_proto();
336 let project_id = self.project_id;
337 let client = self.upstream_client.clone();
338 let path_string = path.clone().to_string_lossy().to_string();
339 cx.spawn(move |this, mut cx| async move {
340 let response = client
341 .request(proto::OpenBufferByPath {
342 project_id,
343 worktree_id,
344 path: path_string,
345 })
346 .await?;
347 let buffer_id = BufferId::new(response.buffer_id)?;
348
349 let buffer = this
350 .update(&mut cx, {
351 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
352 })?
353 .await?;
354
355 Ok(buffer)
356 })
357 }
358
359 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
360 let create = self.upstream_client.request(proto::OpenNewBuffer {
361 project_id: self.project_id,
362 });
363 cx.spawn(|this, mut cx| async move {
364 let response = create.await?;
365 let buffer_id = BufferId::new(response.buffer_id)?;
366
367 this.update(&mut cx, |this, cx| {
368 this.wait_for_remote_buffer(buffer_id, cx)
369 })?
370 .await
371 })
372 }
373
374 fn reload_buffers(
375 &self,
376 buffers: HashSet<Model<Buffer>>,
377 push_to_history: bool,
378 cx: &mut ModelContext<BufferStore>,
379 ) -> Task<Result<ProjectTransaction>> {
380 let request = self.upstream_client.request(proto::ReloadBuffers {
381 project_id: self.project_id,
382 buffer_ids: buffers
383 .iter()
384 .map(|buffer| buffer.read(cx).remote_id().to_proto())
385 .collect(),
386 });
387
388 cx.spawn(|this, mut cx| async move {
389 let response = request
390 .await?
391 .transaction
392 .ok_or_else(|| anyhow!("missing transaction"))?;
393 this.update(&mut cx, |this, cx| {
394 this.deserialize_project_transaction(response, push_to_history, cx)
395 })?
396 .await
397 })
398 }
399}
400
401impl LocalBufferStore {
402 fn load_staged_text(
403 &self,
404 buffer: &Model<Buffer>,
405 cx: &AppContext,
406 ) -> Task<Result<Option<String>>> {
407 let Some(file) = buffer.read(cx).file() else {
408 return Task::ready(Ok(None));
409 };
410 let worktree_id = file.worktree_id(cx);
411 let path = file.path().clone();
412 let Some(worktree) = self
413 .worktree_store
414 .read(cx)
415 .worktree_for_id(worktree_id, cx)
416 else {
417 return Task::ready(Err(anyhow!("no such worktree")));
418 };
419
420 worktree.read(cx).load_staged_file(path.as_ref(), cx)
421 }
422
423 fn save_local_buffer(
424 &self,
425 buffer_handle: Model<Buffer>,
426 worktree: Model<Worktree>,
427 path: Arc<Path>,
428 mut has_changed_file: bool,
429 cx: &mut ModelContext<BufferStore>,
430 ) -> Task<Result<()>> {
431 let buffer = buffer_handle.read(cx);
432
433 let text = buffer.as_rope().clone();
434 let line_ending = buffer.line_ending();
435 let version = buffer.version();
436 let buffer_id = buffer.remote_id();
437 if buffer
438 .file()
439 .is_some_and(|file| file.disk_state() == DiskState::New)
440 {
441 has_changed_file = true;
442 }
443
444 let save = worktree.update(cx, |worktree, cx| {
445 worktree.write_file(path.as_ref(), text, line_ending, cx)
446 });
447
448 cx.spawn(move |this, mut cx| async move {
449 let new_file = save.await?;
450 let mtime = new_file.disk_state().mtime();
451 this.update(&mut cx, |this, cx| {
452 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
453 if has_changed_file {
454 downstream_client
455 .send(proto::UpdateBufferFile {
456 project_id,
457 buffer_id: buffer_id.to_proto(),
458 file: Some(language::File::to_proto(&*new_file, cx)),
459 })
460 .log_err();
461 }
462 downstream_client
463 .send(proto::BufferSaved {
464 project_id,
465 buffer_id: buffer_id.to_proto(),
466 version: serialize_version(&version),
467 mtime: mtime.map(|time| time.into()),
468 })
469 .log_err();
470 }
471 })?;
472 buffer_handle.update(&mut cx, |buffer, cx| {
473 if has_changed_file {
474 buffer.file_updated(new_file, cx);
475 }
476 buffer.did_save(version.clone(), mtime, cx);
477 })
478 })
479 }
480
481 fn subscribe_to_worktree(
482 &mut self,
483 worktree: &Model<Worktree>,
484 cx: &mut ModelContext<BufferStore>,
485 ) {
486 cx.subscribe(worktree, |this, worktree, event, cx| {
487 if worktree.read(cx).is_local() {
488 match event {
489 worktree::Event::UpdatedEntries(changes) => {
490 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
491 }
492 worktree::Event::UpdatedGitRepositories(updated_repos) => {
493 Self::local_worktree_git_repos_changed(
494 this,
495 worktree.clone(),
496 updated_repos,
497 cx,
498 )
499 }
500 _ => {}
501 }
502 }
503 })
504 .detach();
505 }
506
507 fn local_worktree_entries_changed(
508 this: &mut BufferStore,
509 worktree_handle: &Model<Worktree>,
510 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
511 cx: &mut ModelContext<BufferStore>,
512 ) {
513 let snapshot = worktree_handle.read(cx).snapshot();
514 for (path, entry_id, _) in changes {
515 Self::local_worktree_entry_changed(
516 this,
517 *entry_id,
518 path,
519 worktree_handle,
520 &snapshot,
521 cx,
522 );
523 }
524 }
525
526 fn local_worktree_git_repos_changed(
527 this: &mut BufferStore,
528 worktree_handle: Model<Worktree>,
529 changed_repos: &UpdatedGitRepositoriesSet,
530 cx: &mut ModelContext<BufferStore>,
531 ) {
532 debug_assert!(worktree_handle.read(cx).is_local());
533
534 let buffer_change_sets = this
535 .opened_buffers
536 .values()
537 .filter_map(|buffer| {
538 if let OpenBuffer::Complete {
539 buffer,
540 unstaged_changes,
541 } = buffer
542 {
543 let buffer = buffer.upgrade()?.read(cx);
544 let file = File::from_dyn(buffer.file())?;
545 if file.worktree != worktree_handle {
546 return None;
547 }
548 changed_repos
549 .iter()
550 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
551 let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
552 let snapshot = buffer.text_snapshot();
553 Some((unstaged_changes, snapshot, file.path.clone()))
554 } else {
555 None
556 }
557 })
558 .collect::<Vec<_>>();
559
560 if buffer_change_sets.is_empty() {
561 return;
562 }
563
564 cx.spawn(move |this, mut cx| async move {
565 let snapshot =
566 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
567 let diff_bases_by_buffer = cx
568 .background_executor()
569 .spawn(async move {
570 buffer_change_sets
571 .into_iter()
572 .filter_map(|(change_set, buffer_snapshot, path)| {
573 let local_repo = snapshot.local_repo_for_path(&path)?;
574 let relative_path = local_repo.relativize(&path).ok()?;
575 let base_text = local_repo.repo().load_index_text(&relative_path);
576 Some((change_set, buffer_snapshot, base_text))
577 })
578 .collect::<Vec<_>>()
579 })
580 .await;
581
582 this.update(&mut cx, |this, cx| {
583 for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
584 change_set.update(cx, |change_set, cx| {
585 if let Some(staged_text) = staged_text.clone() {
586 let _ =
587 change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
588 } else {
589 change_set.unset_base_text(buffer_snapshot.clone(), cx);
590 }
591 });
592
593 if let Some((client, project_id)) = &this.downstream_client.clone() {
594 client
595 .send(proto::UpdateDiffBase {
596 project_id: *project_id,
597 buffer_id: buffer_snapshot.remote_id().to_proto(),
598 staged_text,
599 })
600 .log_err();
601 }
602 }
603 })
604 })
605 .detach_and_log_err(cx);
606 }
607
608 fn local_worktree_entry_changed(
609 this: &mut BufferStore,
610 entry_id: ProjectEntryId,
611 path: &Arc<Path>,
612 worktree: &Model<worktree::Worktree>,
613 snapshot: &worktree::Snapshot,
614 cx: &mut ModelContext<BufferStore>,
615 ) -> Option<()> {
616 let project_path = ProjectPath {
617 worktree_id: snapshot.id(),
618 path: path.clone(),
619 };
620
621 let buffer_id = {
622 let local = this.as_local_mut()?;
623 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
624 Some(&buffer_id) => buffer_id,
625 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
626 }
627 };
628
629 let buffer = if let Some(buffer) = this.get(buffer_id) {
630 Some(buffer)
631 } else {
632 this.opened_buffers.remove(&buffer_id);
633 None
634 };
635
636 let buffer = if let Some(buffer) = buffer {
637 buffer
638 } else {
639 let this = this.as_local_mut()?;
640 this.local_buffer_ids_by_path.remove(&project_path);
641 this.local_buffer_ids_by_entry_id.remove(&entry_id);
642 return None;
643 };
644
645 let events = buffer.update(cx, |buffer, cx| {
646 let local = this.as_local_mut()?;
647 let file = buffer.file()?;
648 let old_file = File::from_dyn(Some(file))?;
649 if old_file.worktree != *worktree {
650 return None;
651 }
652
653 let snapshot_entry = old_file
654 .entry_id
655 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
656 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
657
658 let new_file = if let Some(entry) = snapshot_entry {
659 File {
660 disk_state: match entry.mtime {
661 Some(mtime) => DiskState::Present { mtime },
662 None => old_file.disk_state,
663 },
664 is_local: true,
665 entry_id: Some(entry.id),
666 path: entry.path.clone(),
667 worktree: worktree.clone(),
668 is_private: entry.is_private,
669 }
670 } else {
671 File {
672 disk_state: DiskState::Deleted,
673 is_local: true,
674 entry_id: old_file.entry_id,
675 path: old_file.path.clone(),
676 worktree: worktree.clone(),
677 is_private: old_file.is_private,
678 }
679 };
680
681 if new_file == *old_file {
682 return None;
683 }
684
685 let mut events = Vec::new();
686 if new_file.path != old_file.path {
687 local.local_buffer_ids_by_path.remove(&ProjectPath {
688 path: old_file.path.clone(),
689 worktree_id: old_file.worktree_id(cx),
690 });
691 local.local_buffer_ids_by_path.insert(
692 ProjectPath {
693 worktree_id: new_file.worktree_id(cx),
694 path: new_file.path.clone(),
695 },
696 buffer_id,
697 );
698 events.push(BufferStoreEvent::BufferChangedFilePath {
699 buffer: cx.handle(),
700 old_file: buffer.file().cloned(),
701 });
702 }
703
704 if new_file.entry_id != old_file.entry_id {
705 if let Some(entry_id) = old_file.entry_id {
706 local.local_buffer_ids_by_entry_id.remove(&entry_id);
707 }
708 if let Some(entry_id) = new_file.entry_id {
709 local
710 .local_buffer_ids_by_entry_id
711 .insert(entry_id, buffer_id);
712 }
713 }
714
715 if let Some((client, project_id)) = &this.downstream_client {
716 client
717 .send(proto::UpdateBufferFile {
718 project_id: *project_id,
719 buffer_id: buffer_id.to_proto(),
720 file: Some(new_file.to_proto(cx)),
721 })
722 .ok();
723 }
724
725 buffer.file_updated(Arc::new(new_file), cx);
726 Some(events)
727 })?;
728
729 for event in events {
730 cx.emit(event);
731 }
732
733 None
734 }
735
736 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
737 let file = File::from_dyn(buffer.read(cx).file())?;
738
739 let remote_id = buffer.read(cx).remote_id();
740 if let Some(entry_id) = file.entry_id {
741 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
742 Some(_) => {
743 return None;
744 }
745 None => {
746 self.local_buffer_ids_by_entry_id
747 .insert(entry_id, remote_id);
748 }
749 }
750 };
751 self.local_buffer_ids_by_path.insert(
752 ProjectPath {
753 worktree_id: file.worktree_id(cx),
754 path: file.path.clone(),
755 },
756 remote_id,
757 );
758
759 Some(())
760 }
761
762 fn save_buffer(
763 &self,
764 buffer: Model<Buffer>,
765 cx: &mut ModelContext<BufferStore>,
766 ) -> Task<Result<()>> {
767 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
768 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
769 };
770 let worktree = file.worktree.clone();
771 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
772 }
773
774 fn save_buffer_as(
775 &self,
776 buffer: Model<Buffer>,
777 path: ProjectPath,
778 cx: &mut ModelContext<BufferStore>,
779 ) -> Task<Result<()>> {
780 let Some(worktree) = self
781 .worktree_store
782 .read(cx)
783 .worktree_for_id(path.worktree_id, cx)
784 else {
785 return Task::ready(Err(anyhow!("no such worktree")));
786 };
787 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
788 }
789
790 fn open_buffer(
791 &self,
792 path: Arc<Path>,
793 worktree: Model<Worktree>,
794 cx: &mut ModelContext<BufferStore>,
795 ) -> Task<Result<Model<Buffer>>> {
796 let load_buffer = worktree.update(cx, |worktree, cx| {
797 let load_file = worktree.load_file(path.as_ref(), cx);
798 let reservation = cx.reserve_model();
799 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
800 cx.spawn(move |_, mut cx| async move {
801 let loaded = load_file.await?;
802 let text_buffer = cx
803 .background_executor()
804 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
805 .await;
806 cx.insert_model(reservation, |_| {
807 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
808 })
809 })
810 });
811
812 cx.spawn(move |this, mut cx| async move {
813 let buffer = match load_buffer.await {
814 Ok(buffer) => Ok(buffer),
815 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
816 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
817 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
818 Buffer::build(
819 text_buffer,
820 Some(Arc::new(File {
821 worktree,
822 path,
823 disk_state: DiskState::New,
824 entry_id: None,
825 is_local: true,
826 is_private: false,
827 })),
828 Capability::ReadWrite,
829 )
830 }),
831 Err(e) => Err(e),
832 }?;
833 this.update(&mut cx, |this, cx| {
834 this.add_buffer(buffer.clone(), cx)?;
835 let buffer_id = buffer.read(cx).remote_id();
836 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
837 let this = this.as_local_mut().unwrap();
838 this.local_buffer_ids_by_path.insert(
839 ProjectPath {
840 worktree_id: file.worktree_id(cx),
841 path: file.path.clone(),
842 },
843 buffer_id,
844 );
845
846 if let Some(entry_id) = file.entry_id {
847 this.local_buffer_ids_by_entry_id
848 .insert(entry_id, buffer_id);
849 }
850 }
851
852 anyhow::Ok(())
853 })??;
854
855 Ok(buffer)
856 })
857 }
858
859 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
860 cx.spawn(|buffer_store, mut cx| async move {
861 let buffer = cx.new_model(|cx| {
862 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
863 })?;
864 buffer_store.update(&mut cx, |buffer_store, cx| {
865 buffer_store.add_buffer(buffer.clone(), cx).log_err();
866 })?;
867 Ok(buffer)
868 })
869 }
870
871 fn reload_buffers(
872 &self,
873 buffers: HashSet<Model<Buffer>>,
874 push_to_history: bool,
875 cx: &mut ModelContext<BufferStore>,
876 ) -> Task<Result<ProjectTransaction>> {
877 cx.spawn(move |_, mut cx| async move {
878 let mut project_transaction = ProjectTransaction::default();
879 for buffer in buffers {
880 let transaction = buffer
881 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
882 .await?;
883 buffer.update(&mut cx, |buffer, cx| {
884 if let Some(transaction) = transaction {
885 if !push_to_history {
886 buffer.forget_transaction(transaction.id);
887 }
888 project_transaction.0.insert(cx.handle(), transaction);
889 }
890 })?;
891 }
892
893 Ok(project_transaction)
894 })
895 }
896}
897
898impl BufferStore {
899 pub fn init(client: &AnyProtoClient) {
900 client.add_model_message_handler(Self::handle_buffer_reloaded);
901 client.add_model_message_handler(Self::handle_buffer_saved);
902 client.add_model_message_handler(Self::handle_update_buffer_file);
903 client.add_model_request_handler(Self::handle_save_buffer);
904 client.add_model_request_handler(Self::handle_blame_buffer);
905 client.add_model_request_handler(Self::handle_reload_buffers);
906 client.add_model_request_handler(Self::handle_get_permalink_to_line);
907 client.add_model_request_handler(Self::handle_get_staged_text);
908 client.add_model_message_handler(Self::handle_update_diff_base);
909 }
910
911 /// Creates a buffer store, optionally retaining its buffers.
912 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
913 Self {
914 state: BufferStoreState::Local(LocalBufferStore {
915 local_buffer_ids_by_path: Default::default(),
916 local_buffer_ids_by_entry_id: Default::default(),
917 worktree_store: worktree_store.clone(),
918 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
919 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
920 let this = this.as_local_mut().unwrap();
921 this.subscribe_to_worktree(worktree, cx);
922 }
923 }),
924 }),
925 downstream_client: None,
926 opened_buffers: Default::default(),
927 shared_buffers: Default::default(),
928 loading_buffers: Default::default(),
929 loading_change_sets: Default::default(),
930 worktree_store,
931 }
932 }
933
934 pub fn remote(
935 worktree_store: Model<WorktreeStore>,
936 upstream_client: AnyProtoClient,
937 remote_id: u64,
938 _cx: &mut ModelContext<Self>,
939 ) -> Self {
940 Self {
941 state: BufferStoreState::Remote(RemoteBufferStore {
942 shared_with_me: Default::default(),
943 loading_remote_buffers_by_id: Default::default(),
944 remote_buffer_listeners: Default::default(),
945 project_id: remote_id,
946 upstream_client,
947 worktree_store: worktree_store.clone(),
948 }),
949 downstream_client: None,
950 opened_buffers: Default::default(),
951 loading_buffers: Default::default(),
952 loading_change_sets: Default::default(),
953 shared_buffers: Default::default(),
954 worktree_store,
955 }
956 }
957
958 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
959 match &mut self.state {
960 BufferStoreState::Local(state) => Some(state),
961 _ => None,
962 }
963 }
964
965 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
966 match &mut self.state {
967 BufferStoreState::Remote(state) => Some(state),
968 _ => None,
969 }
970 }
971
972 fn as_remote(&self) -> Option<&RemoteBufferStore> {
973 match &self.state {
974 BufferStoreState::Remote(state) => Some(state),
975 _ => None,
976 }
977 }
978
979 pub fn open_buffer(
980 &mut self,
981 project_path: ProjectPath,
982 cx: &mut ModelContext<Self>,
983 ) -> Task<Result<Model<Buffer>>> {
984 if let Some(buffer) = self.get_by_path(&project_path, cx) {
985 return Task::ready(Ok(buffer));
986 }
987
988 let task = match self.loading_buffers.entry(project_path.clone()) {
989 hash_map::Entry::Occupied(e) => e.get().clone(),
990 hash_map::Entry::Vacant(entry) => {
991 let path = project_path.path.clone();
992 let Some(worktree) = self
993 .worktree_store
994 .read(cx)
995 .worktree_for_id(project_path.worktree_id, cx)
996 else {
997 return Task::ready(Err(anyhow!("no such worktree")));
998 };
999 let load_buffer = match &self.state {
1000 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1001 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1002 };
1003
1004 entry
1005 .insert(
1006 cx.spawn(move |this, mut cx| async move {
1007 let load_result = load_buffer.await;
1008 this.update(&mut cx, |this, _cx| {
1009 // Record the fact that the buffer is no longer loading.
1010 this.loading_buffers.remove(&project_path);
1011 })
1012 .ok();
1013 load_result.map_err(Arc::new)
1014 })
1015 .shared(),
1016 )
1017 .clone()
1018 }
1019 };
1020
1021 cx.background_executor()
1022 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1023 }
1024
1025 pub fn open_unstaged_changes(
1026 &mut self,
1027 buffer: Model<Buffer>,
1028 cx: &mut ModelContext<Self>,
1029 ) -> Task<Result<Model<BufferChangeSet>>> {
1030 let buffer_id = buffer.read(cx).remote_id();
1031 if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
1032 return Task::ready(Ok(change_set));
1033 }
1034
1035 let task = match self.loading_change_sets.entry(buffer_id) {
1036 hash_map::Entry::Occupied(e) => e.get().clone(),
1037 hash_map::Entry::Vacant(entry) => {
1038 let load = match &self.state {
1039 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1040 BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
1041 };
1042
1043 entry
1044 .insert(
1045 cx.spawn(move |this, cx| async move {
1046 Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
1047 .await
1048 .map_err(Arc::new)
1049 })
1050 .shared(),
1051 )
1052 .clone()
1053 }
1054 };
1055
1056 cx.background_executor()
1057 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1058 }
1059
1060 #[cfg(any(test, feature = "test-support"))]
1061 pub fn set_change_set(&mut self, buffer_id: BufferId, change_set: Model<BufferChangeSet>) {
1062 self.loading_change_sets
1063 .insert(buffer_id, Task::ready(Ok(change_set)).shared());
1064 }
1065
1066 pub async fn open_unstaged_changes_internal(
1067 this: WeakModel<Self>,
1068 text: Result<Option<String>>,
1069 buffer: Model<Buffer>,
1070 mut cx: AsyncAppContext,
1071 ) -> Result<Model<BufferChangeSet>> {
1072 let text = match text {
1073 Err(e) => {
1074 this.update(&mut cx, |this, cx| {
1075 let buffer_id = buffer.read(cx).remote_id();
1076 this.loading_change_sets.remove(&buffer_id);
1077 })?;
1078 return Err(e);
1079 }
1080 Ok(text) => text,
1081 };
1082
1083 let change_set = buffer.update(&mut cx, |buffer, cx| {
1084 cx.new_model(|_| BufferChangeSet::new(buffer))
1085 })?;
1086
1087 if let Some(text) = text {
1088 change_set
1089 .update(&mut cx, |change_set, cx| {
1090 let snapshot = buffer.read(cx).text_snapshot();
1091 change_set.set_base_text(text, snapshot, cx)
1092 })?
1093 .await
1094 .ok();
1095 }
1096
1097 this.update(&mut cx, |this, cx| {
1098 let buffer_id = buffer.read(cx).remote_id();
1099 this.loading_change_sets.remove(&buffer_id);
1100 if let Some(OpenBuffer::Complete {
1101 unstaged_changes, ..
1102 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1103 {
1104 *unstaged_changes = Some(change_set.downgrade());
1105 }
1106 })?;
1107
1108 Ok(change_set)
1109 }
1110
1111 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1112 match &self.state {
1113 BufferStoreState::Local(this) => this.create_buffer(cx),
1114 BufferStoreState::Remote(this) => this.create_buffer(cx),
1115 }
1116 }
1117
1118 pub fn save_buffer(
1119 &mut self,
1120 buffer: Model<Buffer>,
1121 cx: &mut ModelContext<Self>,
1122 ) -> Task<Result<()>> {
1123 match &mut self.state {
1124 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1125 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1126 }
1127 }
1128
1129 pub fn save_buffer_as(
1130 &mut self,
1131 buffer: Model<Buffer>,
1132 path: ProjectPath,
1133 cx: &mut ModelContext<Self>,
1134 ) -> Task<Result<()>> {
1135 let old_file = buffer.read(cx).file().cloned();
1136 let task = match &self.state {
1137 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1138 BufferStoreState::Remote(this) => {
1139 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1140 }
1141 };
1142 cx.spawn(|this, mut cx| async move {
1143 task.await?;
1144 this.update(&mut cx, |_, cx| {
1145 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1146 })
1147 })
1148 }
1149
1150 pub fn blame_buffer(
1151 &self,
1152 buffer: &Model<Buffer>,
1153 version: Option<clock::Global>,
1154 cx: &AppContext,
1155 ) -> Task<Result<Option<Blame>>> {
1156 let buffer = buffer.read(cx);
1157 let Some(file) = File::from_dyn(buffer.file()) else {
1158 return Task::ready(Err(anyhow!("buffer has no file")));
1159 };
1160
1161 match file.worktree.clone().read(cx) {
1162 Worktree::Local(worktree) => {
1163 let worktree = worktree.snapshot();
1164 let blame_params = maybe!({
1165 let local_repo = match worktree.local_repo_for_path(&file.path) {
1166 Some(repo_for_path) => repo_for_path,
1167 None => return Ok(None),
1168 };
1169
1170 let relative_path = local_repo
1171 .relativize(&file.path)
1172 .context("failed to relativize buffer path")?;
1173
1174 let repo = local_repo.repo().clone();
1175
1176 let content = match version {
1177 Some(version) => buffer.rope_for_version(&version).clone(),
1178 None => buffer.as_rope().clone(),
1179 };
1180
1181 anyhow::Ok(Some((repo, relative_path, content)))
1182 });
1183
1184 cx.background_executor().spawn(async move {
1185 let Some((repo, relative_path, content)) = blame_params? else {
1186 return Ok(None);
1187 };
1188 repo.blame(&relative_path, content)
1189 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1190 .map(Some)
1191 })
1192 }
1193 Worktree::Remote(worktree) => {
1194 let buffer_id = buffer.remote_id();
1195 let version = buffer.version();
1196 let project_id = worktree.project_id();
1197 let client = worktree.client();
1198 cx.spawn(|_| async move {
1199 let response = client
1200 .request(proto::BlameBuffer {
1201 project_id,
1202 buffer_id: buffer_id.into(),
1203 version: serialize_version(&version),
1204 })
1205 .await?;
1206 Ok(deserialize_blame_buffer_response(response))
1207 })
1208 }
1209 }
1210 }
1211
1212 pub fn get_permalink_to_line(
1213 &self,
1214 buffer: &Model<Buffer>,
1215 selection: Range<u32>,
1216 cx: &AppContext,
1217 ) -> Task<Result<url::Url>> {
1218 let buffer = buffer.read(cx);
1219 let Some(file) = File::from_dyn(buffer.file()) else {
1220 return Task::ready(Err(anyhow!("buffer has no file")));
1221 };
1222
1223 match file.worktree.read(cx) {
1224 Worktree::Local(worktree) => {
1225 let worktree_path = worktree.abs_path().clone();
1226 let Some((repo_entry, repo)) =
1227 worktree.repository_for_path(file.path()).and_then(|entry| {
1228 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1229 Some((entry, repo))
1230 })
1231 else {
1232 // If we're not in a Git repo, check whether this is a Rust source
1233 // file in the Cargo registry (presumably opened with go-to-definition
1234 // from a normal Rust file). If so, we can put together a permalink
1235 // using crate metadata.
1236 if !buffer
1237 .language()
1238 .is_some_and(|lang| lang.name() == "Rust".into())
1239 {
1240 return Task::ready(Err(anyhow!("no permalink available")));
1241 }
1242 let file_path = worktree_path.join(file.path());
1243 return cx.spawn(|cx| async move {
1244 let provider_registry =
1245 cx.update(GitHostingProviderRegistry::default_global)?;
1246 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1247 .map_err(|_| anyhow!("no permalink available"))
1248 });
1249 };
1250
1251 let path = match repo_entry.relativize(file.path()) {
1252 Ok(RepoPath(path)) => path,
1253 Err(e) => return Task::ready(Err(e)),
1254 };
1255
1256 cx.spawn(|cx| async move {
1257 const REMOTE_NAME: &str = "origin";
1258 let origin_url = repo
1259 .remote_url(REMOTE_NAME)
1260 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1261
1262 let sha = repo
1263 .head_sha()
1264 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1265
1266 let provider_registry =
1267 cx.update(GitHostingProviderRegistry::default_global)?;
1268
1269 let (provider, remote) =
1270 parse_git_remote_url(provider_registry, &origin_url)
1271 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1272
1273 let path = path
1274 .to_str()
1275 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1276
1277 Ok(provider.build_permalink(
1278 remote,
1279 BuildPermalinkParams {
1280 sha: &sha,
1281 path,
1282 selection: Some(selection),
1283 },
1284 ))
1285 })
1286 }
1287 Worktree::Remote(worktree) => {
1288 let buffer_id = buffer.remote_id();
1289 let project_id = worktree.project_id();
1290 let client = worktree.client();
1291 cx.spawn(|_| async move {
1292 let response = client
1293 .request(proto::GetPermalinkToLine {
1294 project_id,
1295 buffer_id: buffer_id.into(),
1296 selection: Some(proto::Range {
1297 start: selection.start as u64,
1298 end: selection.end as u64,
1299 }),
1300 })
1301 .await?;
1302
1303 url::Url::parse(&response.permalink).context("failed to parse permalink")
1304 })
1305 }
1306 }
1307 }
1308
1309 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1310 let remote_id = buffer.read(cx).remote_id();
1311 let is_remote = buffer.read(cx).replica_id() != 0;
1312 let open_buffer = OpenBuffer::Complete {
1313 buffer: buffer.downgrade(),
1314 unstaged_changes: None,
1315 };
1316
1317 let handle = cx.handle().downgrade();
1318 buffer.update(cx, move |_, cx| {
1319 cx.on_release(move |buffer, cx| {
1320 handle
1321 .update(cx, |_, cx| {
1322 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1323 })
1324 .ok();
1325 })
1326 .detach()
1327 });
1328
1329 match self.opened_buffers.entry(remote_id) {
1330 hash_map::Entry::Vacant(entry) => {
1331 entry.insert(open_buffer);
1332 }
1333 hash_map::Entry::Occupied(mut entry) => {
1334 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1335 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1336 } else if entry.get().upgrade().is_some() {
1337 if is_remote {
1338 return Ok(());
1339 } else {
1340 debug_panic!("buffer {} was already registered", remote_id);
1341 Err(anyhow!("buffer {} was already registered", remote_id))?;
1342 }
1343 }
1344 entry.insert(open_buffer);
1345 }
1346 }
1347
1348 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1349 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1350 Ok(())
1351 }
1352
1353 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1354 self.opened_buffers
1355 .values()
1356 .filter_map(|buffer| buffer.upgrade())
1357 }
1358
1359 pub fn loading_buffers(
1360 &self,
1361 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Model<Buffer>>>)> {
1362 self.loading_buffers.iter().map(|(path, task)| {
1363 let task = task.clone();
1364 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1365 })
1366 }
1367
1368 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1369 self.buffers().find_map(|buffer| {
1370 let file = File::from_dyn(buffer.read(cx).file())?;
1371 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1372 Some(buffer)
1373 } else {
1374 None
1375 }
1376 })
1377 }
1378
1379 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1380 self.opened_buffers.get(&buffer_id)?.upgrade()
1381 }
1382
1383 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1384 self.get(buffer_id)
1385 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1386 }
1387
1388 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1389 self.get(buffer_id).or_else(|| {
1390 self.as_remote()
1391 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1392 })
1393 }
1394
1395 pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Model<BufferChangeSet>> {
1396 if let OpenBuffer::Complete {
1397 unstaged_changes, ..
1398 } = self.opened_buffers.get(&buffer_id)?
1399 {
1400 unstaged_changes.as_ref()?.upgrade()
1401 } else {
1402 None
1403 }
1404 }
1405
1406 pub fn buffer_version_info(
1407 &self,
1408 cx: &AppContext,
1409 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1410 let buffers = self
1411 .buffers()
1412 .map(|buffer| {
1413 let buffer = buffer.read(cx);
1414 proto::BufferVersion {
1415 id: buffer.remote_id().into(),
1416 version: language::proto::serialize_version(&buffer.version),
1417 }
1418 })
1419 .collect();
1420 let incomplete_buffer_ids = self
1421 .as_remote()
1422 .map(|remote| remote.incomplete_buffer_ids())
1423 .unwrap_or_default();
1424 (buffers, incomplete_buffer_ids)
1425 }
1426
1427 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1428 for open_buffer in self.opened_buffers.values_mut() {
1429 if let Some(buffer) = open_buffer.upgrade() {
1430 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1431 }
1432 }
1433
1434 for buffer in self.buffers() {
1435 buffer.update(cx, |buffer, cx| {
1436 buffer.set_capability(Capability::ReadOnly, cx)
1437 });
1438 }
1439
1440 if let Some(remote) = self.as_remote_mut() {
1441 // Wake up all futures currently waiting on a buffer to get opened,
1442 // to give them a chance to fail now that we've disconnected.
1443 remote.remote_buffer_listeners.clear()
1444 }
1445 }
1446
1447 pub fn shared(
1448 &mut self,
1449 remote_id: u64,
1450 downstream_client: AnyProtoClient,
1451 _cx: &mut AppContext,
1452 ) {
1453 self.downstream_client = Some((downstream_client, remote_id));
1454 }
1455
1456 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1457 self.downstream_client.take();
1458 self.forget_shared_buffers();
1459 }
1460
1461 pub fn discard_incomplete(&mut self) {
1462 self.opened_buffers
1463 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1464 }
1465
1466 pub fn find_search_candidates(
1467 &mut self,
1468 query: &SearchQuery,
1469 mut limit: usize,
1470 fs: Arc<dyn Fs>,
1471 cx: &mut ModelContext<Self>,
1472 ) -> Receiver<Model<Buffer>> {
1473 let (tx, rx) = smol::channel::unbounded();
1474 let mut open_buffers = HashSet::default();
1475 let mut unnamed_buffers = Vec::new();
1476 for handle in self.buffers() {
1477 let buffer = handle.read(cx);
1478 if let Some(entry_id) = buffer.entry_id(cx) {
1479 open_buffers.insert(entry_id);
1480 } else {
1481 limit = limit.saturating_sub(1);
1482 unnamed_buffers.push(handle)
1483 };
1484 }
1485
1486 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1487 let project_paths_rx = self
1488 .worktree_store
1489 .update(cx, |worktree_store, cx| {
1490 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1491 })
1492 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1493
1494 cx.spawn(|this, mut cx| async move {
1495 for buffer in unnamed_buffers {
1496 tx.send(buffer).await.ok();
1497 }
1498
1499 let mut project_paths_rx = pin!(project_paths_rx);
1500 while let Some(project_paths) = project_paths_rx.next().await {
1501 let buffers = this.update(&mut cx, |this, cx| {
1502 project_paths
1503 .into_iter()
1504 .map(|project_path| this.open_buffer(project_path, cx))
1505 .collect::<Vec<_>>()
1506 })?;
1507 for buffer_task in buffers {
1508 if let Some(buffer) = buffer_task.await.log_err() {
1509 if tx.send(buffer).await.is_err() {
1510 return anyhow::Ok(());
1511 }
1512 }
1513 }
1514 }
1515 anyhow::Ok(())
1516 })
1517 .detach();
1518 rx
1519 }
1520
1521 pub fn recalculate_buffer_diffs(
1522 &mut self,
1523 buffers: Vec<Model<Buffer>>,
1524 cx: &mut ModelContext<Self>,
1525 ) -> impl Future<Output = ()> {
1526 let mut futures = Vec::new();
1527 for buffer in buffers {
1528 let buffer = buffer.read(cx).text_snapshot();
1529 if let Some(OpenBuffer::Complete {
1530 unstaged_changes, ..
1531 }) = self.opened_buffers.get_mut(&buffer.remote_id())
1532 {
1533 if let Some(unstaged_changes) = unstaged_changes
1534 .as_ref()
1535 .and_then(|changes| changes.upgrade())
1536 {
1537 unstaged_changes.update(cx, |unstaged_changes, cx| {
1538 futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
1539 });
1540 } else {
1541 unstaged_changes.take();
1542 }
1543 }
1544 }
1545 async move {
1546 futures::future::join_all(futures).await;
1547 }
1548 }
1549
1550 fn on_buffer_event(
1551 &mut self,
1552 buffer: Model<Buffer>,
1553 event: &BufferEvent,
1554 cx: &mut ModelContext<Self>,
1555 ) {
1556 match event {
1557 BufferEvent::FileHandleChanged => {
1558 if let Some(local) = self.as_local_mut() {
1559 local.buffer_changed_file(buffer, cx);
1560 }
1561 }
1562 BufferEvent::Reloaded => {
1563 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1564 return;
1565 };
1566 let buffer = buffer.read(cx);
1567 downstream_client
1568 .send(proto::BufferReloaded {
1569 project_id: *project_id,
1570 buffer_id: buffer.remote_id().to_proto(),
1571 version: serialize_version(&buffer.version()),
1572 mtime: buffer.saved_mtime().map(|t| t.into()),
1573 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1574 })
1575 .log_err();
1576 }
1577 _ => {}
1578 }
1579 }
1580
1581 pub async fn handle_update_buffer(
1582 this: Model<Self>,
1583 envelope: TypedEnvelope<proto::UpdateBuffer>,
1584 mut cx: AsyncAppContext,
1585 ) -> Result<proto::Ack> {
1586 let payload = envelope.payload.clone();
1587 let buffer_id = BufferId::new(payload.buffer_id)?;
1588 let ops = payload
1589 .operations
1590 .into_iter()
1591 .map(language::proto::deserialize_operation)
1592 .collect::<Result<Vec<_>, _>>()?;
1593 this.update(&mut cx, |this, cx| {
1594 match this.opened_buffers.entry(buffer_id) {
1595 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1596 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1597 OpenBuffer::Complete { buffer, .. } => {
1598 if let Some(buffer) = buffer.upgrade() {
1599 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1600 }
1601 }
1602 },
1603 hash_map::Entry::Vacant(e) => {
1604 e.insert(OpenBuffer::Operations(ops));
1605 }
1606 }
1607 Ok(proto::Ack {})
1608 })?
1609 }
1610
1611 pub fn register_shared_lsp_handle(
1612 &mut self,
1613 peer_id: proto::PeerId,
1614 buffer_id: BufferId,
1615 handle: OpenLspBufferHandle,
1616 ) {
1617 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
1618 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
1619 buffer.lsp_handle = Some(handle);
1620 return;
1621 }
1622 }
1623 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1624 }
1625
1626 pub fn handle_synchronize_buffers(
1627 &mut self,
1628 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1629 cx: &mut ModelContext<Self>,
1630 client: Arc<Client>,
1631 ) -> Result<proto::SynchronizeBuffersResponse> {
1632 let project_id = envelope.payload.project_id;
1633 let mut response = proto::SynchronizeBuffersResponse {
1634 buffers: Default::default(),
1635 };
1636 let Some(guest_id) = envelope.original_sender_id else {
1637 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1638 };
1639
1640 self.shared_buffers.entry(guest_id).or_default().clear();
1641 for buffer in envelope.payload.buffers {
1642 let buffer_id = BufferId::new(buffer.id)?;
1643 let remote_version = language::proto::deserialize_version(&buffer.version);
1644 if let Some(buffer) = self.get(buffer_id) {
1645 self.shared_buffers
1646 .entry(guest_id)
1647 .or_default()
1648 .entry(buffer_id)
1649 .or_insert_with(|| SharedBuffer {
1650 buffer: buffer.clone(),
1651 unstaged_changes: None,
1652 lsp_handle: None,
1653 });
1654
1655 let buffer = buffer.read(cx);
1656 response.buffers.push(proto::BufferVersion {
1657 id: buffer_id.into(),
1658 version: language::proto::serialize_version(&buffer.version),
1659 });
1660
1661 let operations = buffer.serialize_ops(Some(remote_version), cx);
1662 let client = client.clone();
1663 if let Some(file) = buffer.file() {
1664 client
1665 .send(proto::UpdateBufferFile {
1666 project_id,
1667 buffer_id: buffer_id.into(),
1668 file: Some(file.to_proto(cx)),
1669 })
1670 .log_err();
1671 }
1672
1673 // TODO(max): do something
1674 // client
1675 // .send(proto::UpdateStagedText {
1676 // project_id,
1677 // buffer_id: buffer_id.into(),
1678 // diff_base: buffer.diff_base().map(ToString::to_string),
1679 // })
1680 // .log_err();
1681
1682 client
1683 .send(proto::BufferReloaded {
1684 project_id,
1685 buffer_id: buffer_id.into(),
1686 version: language::proto::serialize_version(buffer.saved_version()),
1687 mtime: buffer.saved_mtime().map(|time| time.into()),
1688 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1689 as i32,
1690 })
1691 .log_err();
1692
1693 cx.background_executor()
1694 .spawn(
1695 async move {
1696 let operations = operations.await;
1697 for chunk in split_operations(operations) {
1698 client
1699 .request(proto::UpdateBuffer {
1700 project_id,
1701 buffer_id: buffer_id.into(),
1702 operations: chunk,
1703 })
1704 .await?;
1705 }
1706 anyhow::Ok(())
1707 }
1708 .log_err(),
1709 )
1710 .detach();
1711 }
1712 }
1713 Ok(response)
1714 }
1715
1716 pub fn handle_create_buffer_for_peer(
1717 &mut self,
1718 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1719 replica_id: u16,
1720 capability: Capability,
1721 cx: &mut ModelContext<Self>,
1722 ) -> Result<()> {
1723 let Some(remote) = self.as_remote_mut() else {
1724 return Err(anyhow!("buffer store is not a remote"));
1725 };
1726
1727 if let Some(buffer) =
1728 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1729 {
1730 self.add_buffer(buffer, cx)?;
1731 }
1732
1733 Ok(())
1734 }
1735
1736 pub async fn handle_update_buffer_file(
1737 this: Model<Self>,
1738 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1739 mut cx: AsyncAppContext,
1740 ) -> Result<()> {
1741 let buffer_id = envelope.payload.buffer_id;
1742 let buffer_id = BufferId::new(buffer_id)?;
1743
1744 this.update(&mut cx, |this, cx| {
1745 let payload = envelope.payload.clone();
1746 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1747 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1748 let worktree = this
1749 .worktree_store
1750 .read(cx)
1751 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1752 .ok_or_else(|| anyhow!("no such worktree"))?;
1753 let file = File::from_proto(file, worktree, cx)?;
1754 let old_file = buffer.update(cx, |buffer, cx| {
1755 let old_file = buffer.file().cloned();
1756 let new_path = file.path.clone();
1757 buffer.file_updated(Arc::new(file), cx);
1758 if old_file
1759 .as_ref()
1760 .map_or(true, |old| *old.path() != new_path)
1761 {
1762 Some(old_file)
1763 } else {
1764 None
1765 }
1766 });
1767 if let Some(old_file) = old_file {
1768 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1769 }
1770 }
1771 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1772 downstream_client
1773 .send(proto::UpdateBufferFile {
1774 project_id: *project_id,
1775 buffer_id: buffer_id.into(),
1776 file: envelope.payload.file,
1777 })
1778 .log_err();
1779 }
1780 Ok(())
1781 })?
1782 }
1783
1784 pub async fn handle_save_buffer(
1785 this: Model<Self>,
1786 envelope: TypedEnvelope<proto::SaveBuffer>,
1787 mut cx: AsyncAppContext,
1788 ) -> Result<proto::BufferSaved> {
1789 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1790 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1791 anyhow::Ok((
1792 this.get_existing(buffer_id)?,
1793 this.downstream_client
1794 .as_ref()
1795 .map(|(_, project_id)| *project_id)
1796 .context("project is not shared")?,
1797 ))
1798 })??;
1799 buffer
1800 .update(&mut cx, |buffer, _| {
1801 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1802 })?
1803 .await?;
1804 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1805
1806 if let Some(new_path) = envelope.payload.new_path {
1807 let new_path = ProjectPath::from_proto(new_path);
1808 this.update(&mut cx, |this, cx| {
1809 this.save_buffer_as(buffer.clone(), new_path, cx)
1810 })?
1811 .await?;
1812 } else {
1813 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1814 .await?;
1815 }
1816
1817 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1818 project_id,
1819 buffer_id: buffer_id.into(),
1820 version: serialize_version(buffer.saved_version()),
1821 mtime: buffer.saved_mtime().map(|time| time.into()),
1822 })
1823 }
1824
1825 pub async fn handle_close_buffer(
1826 this: Model<Self>,
1827 envelope: TypedEnvelope<proto::CloseBuffer>,
1828 mut cx: AsyncAppContext,
1829 ) -> Result<()> {
1830 let peer_id = envelope.sender_id;
1831 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1832 this.update(&mut cx, |this, _| {
1833 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1834 if shared.remove(&buffer_id).is_some() {
1835 if shared.is_empty() {
1836 this.shared_buffers.remove(&peer_id);
1837 }
1838 return;
1839 }
1840 }
1841 debug_panic!(
1842 "peer_id {} closed buffer_id {} which was either not open or already closed",
1843 peer_id,
1844 buffer_id
1845 )
1846 })
1847 }
1848
1849 pub async fn handle_buffer_saved(
1850 this: Model<Self>,
1851 envelope: TypedEnvelope<proto::BufferSaved>,
1852 mut cx: AsyncAppContext,
1853 ) -> Result<()> {
1854 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1855 let version = deserialize_version(&envelope.payload.version);
1856 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1857 this.update(&mut cx, move |this, cx| {
1858 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1859 buffer.update(cx, |buffer, cx| {
1860 buffer.did_save(version, mtime, cx);
1861 });
1862 }
1863
1864 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1865 downstream_client
1866 .send(proto::BufferSaved {
1867 project_id: *project_id,
1868 buffer_id: buffer_id.into(),
1869 mtime: envelope.payload.mtime,
1870 version: envelope.payload.version,
1871 })
1872 .log_err();
1873 }
1874 })
1875 }
1876
1877 pub async fn handle_buffer_reloaded(
1878 this: Model<Self>,
1879 envelope: TypedEnvelope<proto::BufferReloaded>,
1880 mut cx: AsyncAppContext,
1881 ) -> Result<()> {
1882 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1883 let version = deserialize_version(&envelope.payload.version);
1884 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1885 let line_ending = deserialize_line_ending(
1886 proto::LineEnding::from_i32(envelope.payload.line_ending)
1887 .ok_or_else(|| anyhow!("missing line ending"))?,
1888 );
1889 this.update(&mut cx, |this, cx| {
1890 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1891 buffer.update(cx, |buffer, cx| {
1892 buffer.did_reload(version, line_ending, mtime, cx);
1893 });
1894 }
1895
1896 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1897 downstream_client
1898 .send(proto::BufferReloaded {
1899 project_id: *project_id,
1900 buffer_id: buffer_id.into(),
1901 mtime: envelope.payload.mtime,
1902 version: envelope.payload.version,
1903 line_ending: envelope.payload.line_ending,
1904 })
1905 .log_err();
1906 }
1907 })
1908 }
1909
1910 pub async fn handle_blame_buffer(
1911 this: Model<Self>,
1912 envelope: TypedEnvelope<proto::BlameBuffer>,
1913 mut cx: AsyncAppContext,
1914 ) -> Result<proto::BlameBufferResponse> {
1915 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1916 let version = deserialize_version(&envelope.payload.version);
1917 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1918 buffer
1919 .update(&mut cx, |buffer, _| {
1920 buffer.wait_for_version(version.clone())
1921 })?
1922 .await?;
1923 let blame = this
1924 .update(&mut cx, |this, cx| {
1925 this.blame_buffer(&buffer, Some(version), cx)
1926 })?
1927 .await?;
1928 Ok(serialize_blame_buffer_response(blame))
1929 }
1930
1931 pub async fn handle_get_permalink_to_line(
1932 this: Model<Self>,
1933 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1934 mut cx: AsyncAppContext,
1935 ) -> Result<proto::GetPermalinkToLineResponse> {
1936 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1937 // let version = deserialize_version(&envelope.payload.version);
1938 let selection = {
1939 let proto_selection = envelope
1940 .payload
1941 .selection
1942 .context("no selection to get permalink for defined")?;
1943 proto_selection.start as u32..proto_selection.end as u32
1944 };
1945 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1946 let permalink = this
1947 .update(&mut cx, |this, cx| {
1948 this.get_permalink_to_line(&buffer, selection, cx)
1949 })?
1950 .await?;
1951 Ok(proto::GetPermalinkToLineResponse {
1952 permalink: permalink.to_string(),
1953 })
1954 }
1955
1956 pub async fn handle_get_staged_text(
1957 this: Model<Self>,
1958 request: TypedEnvelope<proto::GetStagedText>,
1959 mut cx: AsyncAppContext,
1960 ) -> Result<proto::GetStagedTextResponse> {
1961 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1962 let change_set = this
1963 .update(&mut cx, |this, cx| {
1964 let buffer = this.get(buffer_id)?;
1965 Some(this.open_unstaged_changes(buffer, cx))
1966 })?
1967 .ok_or_else(|| anyhow!("no such buffer"))?
1968 .await?;
1969 this.update(&mut cx, |this, _| {
1970 let shared_buffers = this
1971 .shared_buffers
1972 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1973 .or_default();
1974 debug_assert!(shared_buffers.contains_key(&buffer_id));
1975 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
1976 shared.unstaged_changes = Some(change_set.clone());
1977 }
1978 })?;
1979 let staged_text = change_set.read_with(&cx, |change_set, cx| {
1980 change_set
1981 .base_text
1982 .as_ref()
1983 .map(|buffer| buffer.read(cx).text())
1984 })?;
1985 Ok(proto::GetStagedTextResponse { staged_text })
1986 }
1987
1988 pub async fn handle_update_diff_base(
1989 this: Model<Self>,
1990 request: TypedEnvelope<proto::UpdateDiffBase>,
1991 mut cx: AsyncAppContext,
1992 ) -> Result<()> {
1993 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1994 let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
1995 if let OpenBuffer::Complete {
1996 unstaged_changes,
1997 buffer,
1998 } = this.opened_buffers.get(&buffer_id)?
1999 {
2000 Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
2001 } else {
2002 None
2003 }
2004 })?
2005 else {
2006 return Ok(());
2007 };
2008 change_set.update(&mut cx, |change_set, cx| {
2009 if let Some(staged_text) = request.payload.staged_text {
2010 let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
2011 } else {
2012 change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
2013 }
2014 })?;
2015 Ok(())
2016 }
2017
2018 pub fn reload_buffers(
2019 &self,
2020 buffers: HashSet<Model<Buffer>>,
2021 push_to_history: bool,
2022 cx: &mut ModelContext<Self>,
2023 ) -> Task<Result<ProjectTransaction>> {
2024 if buffers.is_empty() {
2025 return Task::ready(Ok(ProjectTransaction::default()));
2026 }
2027 match &self.state {
2028 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2029 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2030 }
2031 }
2032
2033 async fn handle_reload_buffers(
2034 this: Model<Self>,
2035 envelope: TypedEnvelope<proto::ReloadBuffers>,
2036 mut cx: AsyncAppContext,
2037 ) -> Result<proto::ReloadBuffersResponse> {
2038 let sender_id = envelope.original_sender_id().unwrap_or_default();
2039 let reload = this.update(&mut cx, |this, cx| {
2040 let mut buffers = HashSet::default();
2041 for buffer_id in &envelope.payload.buffer_ids {
2042 let buffer_id = BufferId::new(*buffer_id)?;
2043 buffers.insert(this.get_existing(buffer_id)?);
2044 }
2045 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2046 })??;
2047
2048 let project_transaction = reload.await?;
2049 let project_transaction = this.update(&mut cx, |this, cx| {
2050 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2051 })?;
2052 Ok(proto::ReloadBuffersResponse {
2053 transaction: Some(project_transaction),
2054 })
2055 }
2056
2057 pub fn create_buffer_for_peer(
2058 &mut self,
2059 buffer: &Model<Buffer>,
2060 peer_id: proto::PeerId,
2061 cx: &mut ModelContext<Self>,
2062 ) -> Task<Result<()>> {
2063 let buffer_id = buffer.read(cx).remote_id();
2064 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2065 if shared_buffers.contains_key(&buffer_id) {
2066 return Task::ready(Ok(()));
2067 }
2068 shared_buffers.insert(
2069 buffer_id,
2070 SharedBuffer {
2071 buffer: buffer.clone(),
2072 unstaged_changes: None,
2073 lsp_handle: None,
2074 },
2075 );
2076
2077 let Some((client, project_id)) = self.downstream_client.clone() else {
2078 return Task::ready(Ok(()));
2079 };
2080
2081 cx.spawn(|this, mut cx| async move {
2082 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2083 return anyhow::Ok(());
2084 };
2085
2086 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2087 let operations = operations.await;
2088 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2089
2090 let initial_state = proto::CreateBufferForPeer {
2091 project_id,
2092 peer_id: Some(peer_id),
2093 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2094 };
2095
2096 if client.send(initial_state).log_err().is_some() {
2097 let client = client.clone();
2098 cx.background_executor()
2099 .spawn(async move {
2100 let mut chunks = split_operations(operations).peekable();
2101 while let Some(chunk) = chunks.next() {
2102 let is_last = chunks.peek().is_none();
2103 client.send(proto::CreateBufferForPeer {
2104 project_id,
2105 peer_id: Some(peer_id),
2106 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2107 proto::BufferChunk {
2108 buffer_id: buffer_id.into(),
2109 operations: chunk,
2110 is_last,
2111 },
2112 )),
2113 })?;
2114 }
2115 anyhow::Ok(())
2116 })
2117 .await
2118 .log_err();
2119 }
2120 Ok(())
2121 })
2122 }
2123
2124 pub fn forget_shared_buffers(&mut self) {
2125 self.shared_buffers.clear();
2126 }
2127
2128 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2129 self.shared_buffers.remove(peer_id);
2130 }
2131
2132 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2133 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2134 self.shared_buffers.insert(new_peer_id, buffers);
2135 }
2136 }
2137
2138 pub fn has_shared_buffers(&self) -> bool {
2139 !self.shared_buffers.is_empty()
2140 }
2141
2142 pub fn create_local_buffer(
2143 &mut self,
2144 text: &str,
2145 language: Option<Arc<Language>>,
2146 cx: &mut ModelContext<Self>,
2147 ) -> Model<Buffer> {
2148 let buffer = cx.new_model(|cx| {
2149 Buffer::local(text, cx)
2150 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2151 });
2152
2153 self.add_buffer(buffer.clone(), cx).log_err();
2154 let buffer_id = buffer.read(cx).remote_id();
2155
2156 let this = self
2157 .as_local_mut()
2158 .expect("local-only method called in a non-local context");
2159 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2160 this.local_buffer_ids_by_path.insert(
2161 ProjectPath {
2162 worktree_id: file.worktree_id(cx),
2163 path: file.path.clone(),
2164 },
2165 buffer_id,
2166 );
2167
2168 if let Some(entry_id) = file.entry_id {
2169 this.local_buffer_ids_by_entry_id
2170 .insert(entry_id, buffer_id);
2171 }
2172 }
2173 buffer
2174 }
2175
2176 pub fn deserialize_project_transaction(
2177 &mut self,
2178 message: proto::ProjectTransaction,
2179 push_to_history: bool,
2180 cx: &mut ModelContext<Self>,
2181 ) -> Task<Result<ProjectTransaction>> {
2182 if let Some(this) = self.as_remote_mut() {
2183 this.deserialize_project_transaction(message, push_to_history, cx)
2184 } else {
2185 debug_panic!("not a remote buffer store");
2186 Task::ready(Err(anyhow!("not a remote buffer store")))
2187 }
2188 }
2189
2190 pub fn wait_for_remote_buffer(
2191 &mut self,
2192 id: BufferId,
2193 cx: &mut ModelContext<BufferStore>,
2194 ) -> Task<Result<Model<Buffer>>> {
2195 if let Some(this) = self.as_remote_mut() {
2196 this.wait_for_remote_buffer(id, cx)
2197 } else {
2198 debug_panic!("not a remote buffer store");
2199 Task::ready(Err(anyhow!("not a remote buffer store")))
2200 }
2201 }
2202
2203 pub fn serialize_project_transaction_for_peer(
2204 &mut self,
2205 project_transaction: ProjectTransaction,
2206 peer_id: proto::PeerId,
2207 cx: &mut ModelContext<Self>,
2208 ) -> proto::ProjectTransaction {
2209 let mut serialized_transaction = proto::ProjectTransaction {
2210 buffer_ids: Default::default(),
2211 transactions: Default::default(),
2212 };
2213 for (buffer, transaction) in project_transaction.0 {
2214 self.create_buffer_for_peer(&buffer, peer_id, cx)
2215 .detach_and_log_err(cx);
2216 serialized_transaction
2217 .buffer_ids
2218 .push(buffer.read(cx).remote_id().into());
2219 serialized_transaction
2220 .transactions
2221 .push(language::proto::serialize_transaction(&transaction));
2222 }
2223 serialized_transaction
2224 }
2225}
2226
2227impl BufferChangeSet {
2228 pub fn new(buffer: &text::BufferSnapshot) -> Self {
2229 Self {
2230 buffer_id: buffer.remote_id(),
2231 base_text: None,
2232 diff_to_buffer: git::diff::BufferDiff::new(buffer),
2233 recalculate_diff_task: None,
2234 diff_updated_futures: Vec::new(),
2235 base_text_version: 0,
2236 }
2237 }
2238
2239 #[cfg(any(test, feature = "test-support"))]
2240 pub fn new_with_base_text(
2241 base_text: String,
2242 buffer: text::BufferSnapshot,
2243 cx: &mut ModelContext<Self>,
2244 ) -> Self {
2245 let mut this = Self::new(&buffer);
2246 let _ = this.set_base_text(base_text, buffer, cx);
2247 this
2248 }
2249
2250 pub fn diff_hunks_intersecting_range<'a>(
2251 &'a self,
2252 range: Range<text::Anchor>,
2253 buffer_snapshot: &'a text::BufferSnapshot,
2254 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2255 self.diff_to_buffer
2256 .hunks_intersecting_range(range, buffer_snapshot)
2257 }
2258
2259 pub fn diff_hunks_intersecting_range_rev<'a>(
2260 &'a self,
2261 range: Range<text::Anchor>,
2262 buffer_snapshot: &'a text::BufferSnapshot,
2263 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2264 self.diff_to_buffer
2265 .hunks_intersecting_range_rev(range, buffer_snapshot)
2266 }
2267
2268 #[cfg(any(test, feature = "test-support"))]
2269 pub fn base_text_string(&self, cx: &AppContext) -> Option<String> {
2270 self.base_text.as_ref().map(|buffer| buffer.read(cx).text())
2271 }
2272
2273 pub fn set_base_text(
2274 &mut self,
2275 mut base_text: String,
2276 buffer_snapshot: text::BufferSnapshot,
2277 cx: &mut ModelContext<Self>,
2278 ) -> oneshot::Receiver<()> {
2279 LineEnding::normalize(&mut base_text);
2280 self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
2281 }
2282
2283 pub fn unset_base_text(
2284 &mut self,
2285 buffer_snapshot: text::BufferSnapshot,
2286 cx: &mut ModelContext<Self>,
2287 ) {
2288 if self.base_text.is_some() {
2289 self.base_text = None;
2290 self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
2291 self.recalculate_diff_task.take();
2292 self.base_text_version += 1;
2293 cx.notify();
2294 }
2295 }
2296
2297 pub fn recalculate_diff(
2298 &mut self,
2299 buffer_snapshot: text::BufferSnapshot,
2300 cx: &mut ModelContext<Self>,
2301 ) -> oneshot::Receiver<()> {
2302 if let Some(base_text) = self.base_text.clone() {
2303 self.recalculate_diff_internal(base_text.read(cx).text(), buffer_snapshot, false, cx)
2304 } else {
2305 oneshot::channel().1
2306 }
2307 }
2308
2309 fn recalculate_diff_internal(
2310 &mut self,
2311 base_text: String,
2312 buffer_snapshot: text::BufferSnapshot,
2313 base_text_changed: bool,
2314 cx: &mut ModelContext<Self>,
2315 ) -> oneshot::Receiver<()> {
2316 let (tx, rx) = oneshot::channel();
2317 self.diff_updated_futures.push(tx);
2318 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2319 let (base_text, diff) = cx
2320 .background_executor()
2321 .spawn(async move {
2322 let diff = BufferDiff::build(&base_text, &buffer_snapshot).await;
2323 (base_text, diff)
2324 })
2325 .await;
2326 this.update(&mut cx, |this, cx| {
2327 if base_text_changed {
2328 this.base_text_version += 1;
2329 this.base_text = Some(cx.new_model(|cx| {
2330 Buffer::local_normalized(Rope::from(base_text), LineEnding::default(), cx)
2331 }));
2332 }
2333 this.diff_to_buffer = diff;
2334 this.recalculate_diff_task.take();
2335 for tx in this.diff_updated_futures.drain(..) {
2336 tx.send(()).ok();
2337 }
2338 cx.notify();
2339 })?;
2340 Ok(())
2341 }));
2342 rx
2343 }
2344}
2345
2346impl OpenBuffer {
2347 fn upgrade(&self) -> Option<Model<Buffer>> {
2348 match self {
2349 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2350 OpenBuffer::Operations(_) => None,
2351 }
2352 }
2353}
2354
2355fn is_not_found_error(error: &anyhow::Error) -> bool {
2356 error
2357 .root_cause()
2358 .downcast_ref::<io::Error>()
2359 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2360}
2361
2362fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2363 let Some(blame) = blame else {
2364 return proto::BlameBufferResponse {
2365 blame_response: None,
2366 };
2367 };
2368
2369 let entries = blame
2370 .entries
2371 .into_iter()
2372 .map(|entry| proto::BlameEntry {
2373 sha: entry.sha.as_bytes().into(),
2374 start_line: entry.range.start,
2375 end_line: entry.range.end,
2376 original_line_number: entry.original_line_number,
2377 author: entry.author.clone(),
2378 author_mail: entry.author_mail.clone(),
2379 author_time: entry.author_time,
2380 author_tz: entry.author_tz.clone(),
2381 committer: entry.committer.clone(),
2382 committer_mail: entry.committer_mail.clone(),
2383 committer_time: entry.committer_time,
2384 committer_tz: entry.committer_tz.clone(),
2385 summary: entry.summary.clone(),
2386 previous: entry.previous.clone(),
2387 filename: entry.filename.clone(),
2388 })
2389 .collect::<Vec<_>>();
2390
2391 let messages = blame
2392 .messages
2393 .into_iter()
2394 .map(|(oid, message)| proto::CommitMessage {
2395 oid: oid.as_bytes().into(),
2396 message,
2397 })
2398 .collect::<Vec<_>>();
2399
2400 let permalinks = blame
2401 .permalinks
2402 .into_iter()
2403 .map(|(oid, url)| proto::CommitPermalink {
2404 oid: oid.as_bytes().into(),
2405 permalink: url.to_string(),
2406 })
2407 .collect::<Vec<_>>();
2408
2409 proto::BlameBufferResponse {
2410 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2411 entries,
2412 messages,
2413 permalinks,
2414 remote_url: blame.remote_url,
2415 }),
2416 }
2417}
2418
2419fn deserialize_blame_buffer_response(
2420 response: proto::BlameBufferResponse,
2421) -> Option<git::blame::Blame> {
2422 let response = response.blame_response?;
2423 let entries = response
2424 .entries
2425 .into_iter()
2426 .filter_map(|entry| {
2427 Some(git::blame::BlameEntry {
2428 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2429 range: entry.start_line..entry.end_line,
2430 original_line_number: entry.original_line_number,
2431 committer: entry.committer,
2432 committer_time: entry.committer_time,
2433 committer_tz: entry.committer_tz,
2434 committer_mail: entry.committer_mail,
2435 author: entry.author,
2436 author_mail: entry.author_mail,
2437 author_time: entry.author_time,
2438 author_tz: entry.author_tz,
2439 summary: entry.summary,
2440 previous: entry.previous,
2441 filename: entry.filename,
2442 })
2443 })
2444 .collect::<Vec<_>>();
2445
2446 let messages = response
2447 .messages
2448 .into_iter()
2449 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2450 .collect::<HashMap<_, _>>();
2451
2452 let permalinks = response
2453 .permalinks
2454 .into_iter()
2455 .filter_map(|permalink| {
2456 Some((
2457 git::Oid::from_bytes(&permalink.oid).ok()?,
2458 Url::from_str(&permalink.permalink).ok()?,
2459 ))
2460 })
2461 .collect::<HashMap<_, _>>();
2462
2463 Some(Blame {
2464 entries,
2465 permalinks,
2466 messages,
2467 remote_url: response.remote_url,
2468 })
2469}
2470
2471fn get_permalink_in_rust_registry_src(
2472 provider_registry: Arc<GitHostingProviderRegistry>,
2473 path: PathBuf,
2474 selection: Range<u32>,
2475) -> Result<url::Url> {
2476 #[derive(Deserialize)]
2477 struct CargoVcsGit {
2478 sha1: String,
2479 }
2480
2481 #[derive(Deserialize)]
2482 struct CargoVcsInfo {
2483 git: CargoVcsGit,
2484 path_in_vcs: String,
2485 }
2486
2487 #[derive(Deserialize)]
2488 struct CargoPackage {
2489 repository: String,
2490 }
2491
2492 #[derive(Deserialize)]
2493 struct CargoToml {
2494 package: CargoPackage,
2495 }
2496
2497 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2498 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2499 Some((dir, json))
2500 }) else {
2501 bail!("No .cargo_vcs_info.json found in parent directories")
2502 };
2503 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2504 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2505 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2506 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2507 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2508 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2509 let permalink = provider.build_permalink(
2510 remote,
2511 BuildPermalinkParams {
2512 sha: &cargo_vcs_info.git.sha1,
2513 path: &path.to_string_lossy(),
2514 selection: Some(selection),
2515 },
2516 );
2517 Ok(permalink)
2518}