1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
13use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
14use gpui::{
15 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
16 Task, WeakModel,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
25};
26use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
27use serde::Deserialize;
28use smol::channel::Receiver;
29use std::{
30 io,
31 ops::Range,
32 path::{Path, PathBuf},
33 str::FromStr as _,
34 sync::Arc,
35 time::Instant,
36};
37use text::{BufferId, LineEnding, Rope};
38use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
39use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
40
41/// A set of open buffers.
42pub struct BufferStore {
43 state: BufferStoreState,
44 #[allow(clippy::type_complexity)]
45 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Model<Buffer>, Arc<anyhow::Error>>>>>,
46 #[allow(clippy::type_complexity)]
47 loading_change_sets:
48 HashMap<BufferId, Shared<Task<Result<Model<BufferChangeSet>, Arc<anyhow::Error>>>>>,
49 worktree_store: Model<WorktreeStore>,
50 opened_buffers: HashMap<BufferId, OpenBuffer>,
51 downstream_client: Option<(AnyProtoClient, u64)>,
52 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
53}
54
55#[derive(Hash, Eq, PartialEq, Clone)]
56struct SharedBuffer {
57 buffer: Model<Buffer>,
58 unstaged_changes: Option<Model<BufferChangeSet>>,
59 lsp_handle: Option<OpenLspBufferHandle>,
60}
61
62#[derive(Debug)]
63pub struct BufferChangeSet {
64 pub buffer_id: BufferId,
65 pub base_text: Option<Model<Buffer>>,
66 pub diff_to_buffer: git::diff::BufferDiff,
67 pub recalculate_diff_task: Option<Task<Result<()>>>,
68 pub diff_updated_futures: Vec<oneshot::Sender<()>>,
69 pub base_text_version: usize,
70}
71
72enum BufferStoreState {
73 Local(LocalBufferStore),
74 Remote(RemoteBufferStore),
75}
76
77struct RemoteBufferStore {
78 shared_with_me: HashSet<Model<Buffer>>,
79 upstream_client: AnyProtoClient,
80 project_id: u64,
81 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
82 remote_buffer_listeners:
83 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
84 worktree_store: Model<WorktreeStore>,
85}
86
87struct LocalBufferStore {
88 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
89 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
90 worktree_store: Model<WorktreeStore>,
91 _subscription: Subscription,
92}
93
94enum OpenBuffer {
95 Complete {
96 buffer: WeakModel<Buffer>,
97 unstaged_changes: Option<WeakModel<BufferChangeSet>>,
98 },
99 Operations(Vec<Operation>),
100}
101
102pub enum BufferStoreEvent {
103 BufferAdded(Model<Buffer>),
104 BufferDropped(BufferId),
105 BufferChangedFilePath {
106 buffer: Model<Buffer>,
107 old_file: Option<Arc<dyn language::File>>,
108 },
109}
110
111#[derive(Default, Debug)]
112pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
113
114impl EventEmitter<BufferStoreEvent> for BufferStore {}
115
116impl RemoteBufferStore {
117 fn load_staged_text(
118 &self,
119 buffer_id: BufferId,
120 cx: &AppContext,
121 ) -> Task<Result<Option<String>>> {
122 let project_id = self.project_id;
123 let client = self.upstream_client.clone();
124 cx.background_executor().spawn(async move {
125 Ok(client
126 .request(proto::GetStagedText {
127 project_id,
128 buffer_id: buffer_id.to_proto(),
129 })
130 .await?
131 .staged_text)
132 })
133 }
134 pub fn wait_for_remote_buffer(
135 &mut self,
136 id: BufferId,
137 cx: &mut ModelContext<BufferStore>,
138 ) -> Task<Result<Model<Buffer>>> {
139 let (tx, rx) = oneshot::channel();
140 self.remote_buffer_listeners.entry(id).or_default().push(tx);
141
142 cx.spawn(|this, cx| async move {
143 if let Some(buffer) = this
144 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
145 .ok()
146 .flatten()
147 {
148 return Ok(buffer);
149 }
150
151 cx.background_executor()
152 .spawn(async move { rx.await? })
153 .await
154 })
155 }
156
157 fn save_remote_buffer(
158 &self,
159 buffer_handle: Model<Buffer>,
160 new_path: Option<proto::ProjectPath>,
161 cx: &ModelContext<BufferStore>,
162 ) -> Task<Result<()>> {
163 let buffer = buffer_handle.read(cx);
164 let buffer_id = buffer.remote_id().into();
165 let version = buffer.version();
166 let rpc = self.upstream_client.clone();
167 let project_id = self.project_id;
168 cx.spawn(move |_, mut cx| async move {
169 let response = rpc
170 .request(proto::SaveBuffer {
171 project_id,
172 buffer_id,
173 new_path,
174 version: serialize_version(&version),
175 })
176 .await?;
177 let version = deserialize_version(&response.version);
178 let mtime = response.mtime.map(|mtime| mtime.into());
179
180 buffer_handle.update(&mut cx, |buffer, cx| {
181 buffer.did_save(version.clone(), mtime, cx);
182 })?;
183
184 Ok(())
185 })
186 }
187
188 pub fn handle_create_buffer_for_peer(
189 &mut self,
190 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
191 replica_id: u16,
192 capability: Capability,
193 cx: &mut ModelContext<BufferStore>,
194 ) -> Result<Option<Model<Buffer>>> {
195 match envelope
196 .payload
197 .variant
198 .ok_or_else(|| anyhow!("missing variant"))?
199 {
200 proto::create_buffer_for_peer::Variant::State(mut state) => {
201 let buffer_id = BufferId::new(state.id)?;
202
203 let buffer_result = maybe!({
204 let mut buffer_file = None;
205 if let Some(file) = state.file.take() {
206 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
207 let worktree = self
208 .worktree_store
209 .read(cx)
210 .worktree_for_id(worktree_id, cx)
211 .ok_or_else(|| {
212 anyhow!("no worktree found for id {}", file.worktree_id)
213 })?;
214 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
215 as Arc<dyn language::File>);
216 }
217 Buffer::from_proto(replica_id, capability, state, buffer_file)
218 });
219
220 match buffer_result {
221 Ok(buffer) => {
222 let buffer = cx.new_model(|_| buffer);
223 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
224 }
225 Err(error) => {
226 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
227 for listener in listeners {
228 listener.send(Err(anyhow!(error.cloned()))).ok();
229 }
230 }
231 }
232 }
233 }
234 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
235 let buffer_id = BufferId::new(chunk.buffer_id)?;
236 let buffer = self
237 .loading_remote_buffers_by_id
238 .get(&buffer_id)
239 .cloned()
240 .ok_or_else(|| {
241 anyhow!(
242 "received chunk for buffer {} without initial state",
243 chunk.buffer_id
244 )
245 })?;
246
247 let result = maybe!({
248 let operations = chunk
249 .operations
250 .into_iter()
251 .map(language::proto::deserialize_operation)
252 .collect::<Result<Vec<_>>>()?;
253 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
254 anyhow::Ok(())
255 });
256
257 if let Err(error) = result {
258 self.loading_remote_buffers_by_id.remove(&buffer_id);
259 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
260 for listener in listeners {
261 listener.send(Err(error.cloned())).ok();
262 }
263 }
264 } else if chunk.is_last {
265 self.loading_remote_buffers_by_id.remove(&buffer_id);
266 if self.upstream_client.is_via_collab() {
267 // retain buffers sent by peers to avoid races.
268 self.shared_with_me.insert(buffer.clone());
269 }
270
271 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
272 for sender in senders {
273 sender.send(Ok(buffer.clone())).ok();
274 }
275 }
276 return Ok(Some(buffer));
277 }
278 }
279 }
280 return Ok(None);
281 }
282
283 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
284 self.loading_remote_buffers_by_id
285 .keys()
286 .copied()
287 .collect::<Vec<_>>()
288 }
289
290 pub fn deserialize_project_transaction(
291 &self,
292 message: proto::ProjectTransaction,
293 push_to_history: bool,
294 cx: &mut ModelContext<BufferStore>,
295 ) -> Task<Result<ProjectTransaction>> {
296 cx.spawn(|this, mut cx| async move {
297 let mut project_transaction = ProjectTransaction::default();
298 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
299 {
300 let buffer_id = BufferId::new(buffer_id)?;
301 let buffer = this
302 .update(&mut cx, |this, cx| {
303 this.wait_for_remote_buffer(buffer_id, cx)
304 })?
305 .await?;
306 let transaction = language::proto::deserialize_transaction(transaction)?;
307 project_transaction.0.insert(buffer, transaction);
308 }
309
310 for (buffer, transaction) in &project_transaction.0 {
311 buffer
312 .update(&mut cx, |buffer, _| {
313 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
314 })?
315 .await?;
316
317 if push_to_history {
318 buffer.update(&mut cx, |buffer, _| {
319 buffer.push_transaction(transaction.clone(), Instant::now());
320 })?;
321 }
322 }
323
324 Ok(project_transaction)
325 })
326 }
327
328 fn open_buffer(
329 &self,
330 path: Arc<Path>,
331 worktree: Model<Worktree>,
332 cx: &mut ModelContext<BufferStore>,
333 ) -> Task<Result<Model<Buffer>>> {
334 let worktree_id = worktree.read(cx).id().to_proto();
335 let project_id = self.project_id;
336 let client = self.upstream_client.clone();
337 let path_string = path.clone().to_string_lossy().to_string();
338 cx.spawn(move |this, mut cx| async move {
339 let response = client
340 .request(proto::OpenBufferByPath {
341 project_id,
342 worktree_id,
343 path: path_string,
344 })
345 .await?;
346 let buffer_id = BufferId::new(response.buffer_id)?;
347
348 let buffer = this
349 .update(&mut cx, {
350 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
351 })?
352 .await?;
353
354 Ok(buffer)
355 })
356 }
357
358 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
359 let create = self.upstream_client.request(proto::OpenNewBuffer {
360 project_id: self.project_id,
361 });
362 cx.spawn(|this, mut cx| async move {
363 let response = create.await?;
364 let buffer_id = BufferId::new(response.buffer_id)?;
365
366 this.update(&mut cx, |this, cx| {
367 this.wait_for_remote_buffer(buffer_id, cx)
368 })?
369 .await
370 })
371 }
372
373 fn reload_buffers(
374 &self,
375 buffers: HashSet<Model<Buffer>>,
376 push_to_history: bool,
377 cx: &mut ModelContext<BufferStore>,
378 ) -> Task<Result<ProjectTransaction>> {
379 let request = self.upstream_client.request(proto::ReloadBuffers {
380 project_id: self.project_id,
381 buffer_ids: buffers
382 .iter()
383 .map(|buffer| buffer.read(cx).remote_id().to_proto())
384 .collect(),
385 });
386
387 cx.spawn(|this, mut cx| async move {
388 let response = request
389 .await?
390 .transaction
391 .ok_or_else(|| anyhow!("missing transaction"))?;
392 this.update(&mut cx, |this, cx| {
393 this.deserialize_project_transaction(response, push_to_history, cx)
394 })?
395 .await
396 })
397 }
398}
399
400impl LocalBufferStore {
401 fn load_staged_text(
402 &self,
403 buffer: &Model<Buffer>,
404 cx: &AppContext,
405 ) -> Task<Result<Option<String>>> {
406 let Some(file) = buffer.read(cx).file() else {
407 return Task::ready(Ok(None));
408 };
409 let worktree_id = file.worktree_id(cx);
410 let path = file.path().clone();
411 let Some(worktree) = self
412 .worktree_store
413 .read(cx)
414 .worktree_for_id(worktree_id, cx)
415 else {
416 return Task::ready(Err(anyhow!("no such worktree")));
417 };
418
419 worktree.read(cx).load_staged_file(path.as_ref(), cx)
420 }
421
422 fn save_local_buffer(
423 &self,
424 buffer_handle: Model<Buffer>,
425 worktree: Model<Worktree>,
426 path: Arc<Path>,
427 mut has_changed_file: bool,
428 cx: &mut ModelContext<BufferStore>,
429 ) -> Task<Result<()>> {
430 let buffer = buffer_handle.read(cx);
431
432 let text = buffer.as_rope().clone();
433 let line_ending = buffer.line_ending();
434 let version = buffer.version();
435 let buffer_id = buffer.remote_id();
436 if buffer
437 .file()
438 .is_some_and(|file| file.disk_state() == DiskState::New)
439 {
440 has_changed_file = true;
441 }
442
443 let save = worktree.update(cx, |worktree, cx| {
444 worktree.write_file(path.as_ref(), text, line_ending, cx)
445 });
446
447 cx.spawn(move |this, mut cx| async move {
448 let new_file = save.await?;
449 let mtime = new_file.disk_state().mtime();
450 this.update(&mut cx, |this, cx| {
451 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
452 if has_changed_file {
453 downstream_client
454 .send(proto::UpdateBufferFile {
455 project_id,
456 buffer_id: buffer_id.to_proto(),
457 file: Some(language::File::to_proto(&*new_file, cx)),
458 })
459 .log_err();
460 }
461 downstream_client
462 .send(proto::BufferSaved {
463 project_id,
464 buffer_id: buffer_id.to_proto(),
465 version: serialize_version(&version),
466 mtime: mtime.map(|time| time.into()),
467 })
468 .log_err();
469 }
470 })?;
471 buffer_handle.update(&mut cx, |buffer, cx| {
472 if has_changed_file {
473 buffer.file_updated(new_file, cx);
474 }
475 buffer.did_save(version.clone(), mtime, cx);
476 })
477 })
478 }
479
480 fn subscribe_to_worktree(
481 &mut self,
482 worktree: &Model<Worktree>,
483 cx: &mut ModelContext<BufferStore>,
484 ) {
485 cx.subscribe(worktree, |this, worktree, event, cx| {
486 if worktree.read(cx).is_local() {
487 match event {
488 worktree::Event::UpdatedEntries(changes) => {
489 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
490 }
491 worktree::Event::UpdatedGitRepositories(updated_repos) => {
492 Self::local_worktree_git_repos_changed(
493 this,
494 worktree.clone(),
495 updated_repos,
496 cx,
497 )
498 }
499 _ => {}
500 }
501 }
502 })
503 .detach();
504 }
505
506 fn local_worktree_entries_changed(
507 this: &mut BufferStore,
508 worktree_handle: &Model<Worktree>,
509 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
510 cx: &mut ModelContext<BufferStore>,
511 ) {
512 let snapshot = worktree_handle.read(cx).snapshot();
513 for (path, entry_id, _) in changes {
514 Self::local_worktree_entry_changed(
515 this,
516 *entry_id,
517 path,
518 worktree_handle,
519 &snapshot,
520 cx,
521 );
522 }
523 }
524
525 fn local_worktree_git_repos_changed(
526 this: &mut BufferStore,
527 worktree_handle: Model<Worktree>,
528 changed_repos: &UpdatedGitRepositoriesSet,
529 cx: &mut ModelContext<BufferStore>,
530 ) {
531 debug_assert!(worktree_handle.read(cx).is_local());
532
533 let buffer_change_sets = this
534 .opened_buffers
535 .values()
536 .filter_map(|buffer| {
537 if let OpenBuffer::Complete {
538 buffer,
539 unstaged_changes,
540 } = buffer
541 {
542 let buffer = buffer.upgrade()?.read(cx);
543 let file = File::from_dyn(buffer.file())?;
544 if file.worktree != worktree_handle {
545 return None;
546 }
547 changed_repos
548 .iter()
549 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
550 let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
551 let snapshot = buffer.text_snapshot();
552 Some((unstaged_changes, snapshot, file.path.clone()))
553 } else {
554 None
555 }
556 })
557 .collect::<Vec<_>>();
558
559 if buffer_change_sets.is_empty() {
560 return;
561 }
562
563 cx.spawn(move |this, mut cx| async move {
564 let snapshot =
565 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
566 let diff_bases_by_buffer = cx
567 .background_executor()
568 .spawn(async move {
569 buffer_change_sets
570 .into_iter()
571 .filter_map(|(change_set, buffer_snapshot, path)| {
572 let local_repo = snapshot.local_repo_for_path(&path)?;
573 let relative_path = local_repo.relativize(&path).ok()?;
574 let base_text = local_repo.repo().load_index_text(&relative_path);
575 Some((change_set, buffer_snapshot, base_text))
576 })
577 .collect::<Vec<_>>()
578 })
579 .await;
580
581 this.update(&mut cx, |this, cx| {
582 for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
583 change_set.update(cx, |change_set, cx| {
584 if let Some(staged_text) = staged_text.clone() {
585 let _ =
586 change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
587 } else {
588 change_set.unset_base_text(buffer_snapshot.clone(), cx);
589 }
590 });
591
592 if let Some((client, project_id)) = &this.downstream_client.clone() {
593 client
594 .send(proto::UpdateDiffBase {
595 project_id: *project_id,
596 buffer_id: buffer_snapshot.remote_id().to_proto(),
597 staged_text,
598 })
599 .log_err();
600 }
601 }
602 })
603 })
604 .detach_and_log_err(cx);
605 }
606
607 fn local_worktree_entry_changed(
608 this: &mut BufferStore,
609 entry_id: ProjectEntryId,
610 path: &Arc<Path>,
611 worktree: &Model<worktree::Worktree>,
612 snapshot: &worktree::Snapshot,
613 cx: &mut ModelContext<BufferStore>,
614 ) -> Option<()> {
615 let project_path = ProjectPath {
616 worktree_id: snapshot.id(),
617 path: path.clone(),
618 };
619
620 let buffer_id = {
621 let local = this.as_local_mut()?;
622 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
623 Some(&buffer_id) => buffer_id,
624 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
625 }
626 };
627
628 let buffer = if let Some(buffer) = this.get(buffer_id) {
629 Some(buffer)
630 } else {
631 this.opened_buffers.remove(&buffer_id);
632 None
633 };
634
635 let buffer = if let Some(buffer) = buffer {
636 buffer
637 } else {
638 let this = this.as_local_mut()?;
639 this.local_buffer_ids_by_path.remove(&project_path);
640 this.local_buffer_ids_by_entry_id.remove(&entry_id);
641 return None;
642 };
643
644 let events = buffer.update(cx, |buffer, cx| {
645 let local = this.as_local_mut()?;
646 let file = buffer.file()?;
647 let old_file = File::from_dyn(Some(file))?;
648 if old_file.worktree != *worktree {
649 return None;
650 }
651
652 let snapshot_entry = old_file
653 .entry_id
654 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
655 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
656
657 let new_file = if let Some(entry) = snapshot_entry {
658 File {
659 disk_state: match entry.mtime {
660 Some(mtime) => DiskState::Present { mtime },
661 None => old_file.disk_state,
662 },
663 is_local: true,
664 entry_id: Some(entry.id),
665 path: entry.path.clone(),
666 worktree: worktree.clone(),
667 is_private: entry.is_private,
668 }
669 } else {
670 File {
671 disk_state: DiskState::Deleted,
672 is_local: true,
673 entry_id: old_file.entry_id,
674 path: old_file.path.clone(),
675 worktree: worktree.clone(),
676 is_private: old_file.is_private,
677 }
678 };
679
680 if new_file == *old_file {
681 return None;
682 }
683
684 let mut events = Vec::new();
685 if new_file.path != old_file.path {
686 local.local_buffer_ids_by_path.remove(&ProjectPath {
687 path: old_file.path.clone(),
688 worktree_id: old_file.worktree_id(cx),
689 });
690 local.local_buffer_ids_by_path.insert(
691 ProjectPath {
692 worktree_id: new_file.worktree_id(cx),
693 path: new_file.path.clone(),
694 },
695 buffer_id,
696 );
697 events.push(BufferStoreEvent::BufferChangedFilePath {
698 buffer: cx.handle(),
699 old_file: buffer.file().cloned(),
700 });
701 }
702
703 if new_file.entry_id != old_file.entry_id {
704 if let Some(entry_id) = old_file.entry_id {
705 local.local_buffer_ids_by_entry_id.remove(&entry_id);
706 }
707 if let Some(entry_id) = new_file.entry_id {
708 local
709 .local_buffer_ids_by_entry_id
710 .insert(entry_id, buffer_id);
711 }
712 }
713
714 if let Some((client, project_id)) = &this.downstream_client {
715 client
716 .send(proto::UpdateBufferFile {
717 project_id: *project_id,
718 buffer_id: buffer_id.to_proto(),
719 file: Some(new_file.to_proto(cx)),
720 })
721 .ok();
722 }
723
724 buffer.file_updated(Arc::new(new_file), cx);
725 Some(events)
726 })?;
727
728 for event in events {
729 cx.emit(event);
730 }
731
732 None
733 }
734
735 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
736 let file = File::from_dyn(buffer.read(cx).file())?;
737
738 let remote_id = buffer.read(cx).remote_id();
739 if let Some(entry_id) = file.entry_id {
740 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
741 Some(_) => {
742 return None;
743 }
744 None => {
745 self.local_buffer_ids_by_entry_id
746 .insert(entry_id, remote_id);
747 }
748 }
749 };
750 self.local_buffer_ids_by_path.insert(
751 ProjectPath {
752 worktree_id: file.worktree_id(cx),
753 path: file.path.clone(),
754 },
755 remote_id,
756 );
757
758 Some(())
759 }
760
761 fn save_buffer(
762 &self,
763 buffer: Model<Buffer>,
764 cx: &mut ModelContext<BufferStore>,
765 ) -> Task<Result<()>> {
766 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
767 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
768 };
769 let worktree = file.worktree.clone();
770 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
771 }
772
773 fn save_buffer_as(
774 &self,
775 buffer: Model<Buffer>,
776 path: ProjectPath,
777 cx: &mut ModelContext<BufferStore>,
778 ) -> Task<Result<()>> {
779 let Some(worktree) = self
780 .worktree_store
781 .read(cx)
782 .worktree_for_id(path.worktree_id, cx)
783 else {
784 return Task::ready(Err(anyhow!("no such worktree")));
785 };
786 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
787 }
788
789 fn open_buffer(
790 &self,
791 path: Arc<Path>,
792 worktree: Model<Worktree>,
793 cx: &mut ModelContext<BufferStore>,
794 ) -> Task<Result<Model<Buffer>>> {
795 let load_buffer = worktree.update(cx, |worktree, cx| {
796 let load_file = worktree.load_file(path.as_ref(), cx);
797 let reservation = cx.reserve_model();
798 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
799 cx.spawn(move |_, mut cx| async move {
800 let loaded = load_file.await?;
801 let text_buffer = cx
802 .background_executor()
803 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
804 .await;
805 cx.insert_model(reservation, |_| {
806 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
807 })
808 })
809 });
810
811 cx.spawn(move |this, mut cx| async move {
812 let buffer = match load_buffer.await {
813 Ok(buffer) => Ok(buffer),
814 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
815 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
816 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
817 Buffer::build(
818 text_buffer,
819 Some(Arc::new(File {
820 worktree,
821 path,
822 disk_state: DiskState::New,
823 entry_id: None,
824 is_local: true,
825 is_private: false,
826 })),
827 Capability::ReadWrite,
828 )
829 }),
830 Err(e) => Err(e),
831 }?;
832 this.update(&mut cx, |this, cx| {
833 this.add_buffer(buffer.clone(), cx)?;
834 let buffer_id = buffer.read(cx).remote_id();
835 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
836 let this = this.as_local_mut().unwrap();
837 this.local_buffer_ids_by_path.insert(
838 ProjectPath {
839 worktree_id: file.worktree_id(cx),
840 path: file.path.clone(),
841 },
842 buffer_id,
843 );
844
845 if let Some(entry_id) = file.entry_id {
846 this.local_buffer_ids_by_entry_id
847 .insert(entry_id, buffer_id);
848 }
849 }
850
851 anyhow::Ok(())
852 })??;
853
854 Ok(buffer)
855 })
856 }
857
858 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
859 cx.spawn(|buffer_store, mut cx| async move {
860 let buffer = cx.new_model(|cx| {
861 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
862 })?;
863 buffer_store.update(&mut cx, |buffer_store, cx| {
864 buffer_store.add_buffer(buffer.clone(), cx).log_err();
865 })?;
866 Ok(buffer)
867 })
868 }
869
870 fn reload_buffers(
871 &self,
872 buffers: HashSet<Model<Buffer>>,
873 push_to_history: bool,
874 cx: &mut ModelContext<BufferStore>,
875 ) -> Task<Result<ProjectTransaction>> {
876 cx.spawn(move |_, mut cx| async move {
877 let mut project_transaction = ProjectTransaction::default();
878 for buffer in buffers {
879 let transaction = buffer
880 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
881 .await?;
882 buffer.update(&mut cx, |buffer, cx| {
883 if let Some(transaction) = transaction {
884 if !push_to_history {
885 buffer.forget_transaction(transaction.id);
886 }
887 project_transaction.0.insert(cx.handle(), transaction);
888 }
889 })?;
890 }
891
892 Ok(project_transaction)
893 })
894 }
895}
896
897impl BufferStore {
898 pub fn init(client: &AnyProtoClient) {
899 client.add_model_message_handler(Self::handle_buffer_reloaded);
900 client.add_model_message_handler(Self::handle_buffer_saved);
901 client.add_model_message_handler(Self::handle_update_buffer_file);
902 client.add_model_request_handler(Self::handle_save_buffer);
903 client.add_model_request_handler(Self::handle_blame_buffer);
904 client.add_model_request_handler(Self::handle_reload_buffers);
905 client.add_model_request_handler(Self::handle_get_permalink_to_line);
906 client.add_model_request_handler(Self::handle_get_staged_text);
907 client.add_model_message_handler(Self::handle_update_diff_base);
908 }
909
910 /// Creates a buffer store, optionally retaining its buffers.
911 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
912 Self {
913 state: BufferStoreState::Local(LocalBufferStore {
914 local_buffer_ids_by_path: Default::default(),
915 local_buffer_ids_by_entry_id: Default::default(),
916 worktree_store: worktree_store.clone(),
917 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
918 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
919 let this = this.as_local_mut().unwrap();
920 this.subscribe_to_worktree(worktree, cx);
921 }
922 }),
923 }),
924 downstream_client: None,
925 opened_buffers: Default::default(),
926 shared_buffers: Default::default(),
927 loading_buffers: Default::default(),
928 loading_change_sets: Default::default(),
929 worktree_store,
930 }
931 }
932
933 pub fn remote(
934 worktree_store: Model<WorktreeStore>,
935 upstream_client: AnyProtoClient,
936 remote_id: u64,
937 _cx: &mut ModelContext<Self>,
938 ) -> Self {
939 Self {
940 state: BufferStoreState::Remote(RemoteBufferStore {
941 shared_with_me: Default::default(),
942 loading_remote_buffers_by_id: Default::default(),
943 remote_buffer_listeners: Default::default(),
944 project_id: remote_id,
945 upstream_client,
946 worktree_store: worktree_store.clone(),
947 }),
948 downstream_client: None,
949 opened_buffers: Default::default(),
950 loading_buffers: Default::default(),
951 loading_change_sets: Default::default(),
952 shared_buffers: Default::default(),
953 worktree_store,
954 }
955 }
956
957 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
958 match &mut self.state {
959 BufferStoreState::Local(state) => Some(state),
960 _ => None,
961 }
962 }
963
964 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
965 match &mut self.state {
966 BufferStoreState::Remote(state) => Some(state),
967 _ => None,
968 }
969 }
970
971 fn as_remote(&self) -> Option<&RemoteBufferStore> {
972 match &self.state {
973 BufferStoreState::Remote(state) => Some(state),
974 _ => None,
975 }
976 }
977
978 pub fn open_buffer(
979 &mut self,
980 project_path: ProjectPath,
981 cx: &mut ModelContext<Self>,
982 ) -> Task<Result<Model<Buffer>>> {
983 if let Some(buffer) = self.get_by_path(&project_path, cx) {
984 return Task::ready(Ok(buffer));
985 }
986
987 let task = match self.loading_buffers.entry(project_path.clone()) {
988 hash_map::Entry::Occupied(e) => e.get().clone(),
989 hash_map::Entry::Vacant(entry) => {
990 let path = project_path.path.clone();
991 let Some(worktree) = self
992 .worktree_store
993 .read(cx)
994 .worktree_for_id(project_path.worktree_id, cx)
995 else {
996 return Task::ready(Err(anyhow!("no such worktree")));
997 };
998 let load_buffer = match &self.state {
999 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1000 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1001 };
1002
1003 entry
1004 .insert(
1005 cx.spawn(move |this, mut cx| async move {
1006 let load_result = load_buffer.await;
1007 this.update(&mut cx, |this, _cx| {
1008 // Record the fact that the buffer is no longer loading.
1009 this.loading_buffers.remove(&project_path);
1010 })
1011 .ok();
1012 load_result.map_err(Arc::new)
1013 })
1014 .shared(),
1015 )
1016 .clone()
1017 }
1018 };
1019
1020 cx.background_executor()
1021 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1022 }
1023
1024 pub fn open_unstaged_changes(
1025 &mut self,
1026 buffer: Model<Buffer>,
1027 cx: &mut ModelContext<Self>,
1028 ) -> Task<Result<Model<BufferChangeSet>>> {
1029 let buffer_id = buffer.read(cx).remote_id();
1030 if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
1031 return Task::ready(Ok(change_set));
1032 }
1033
1034 let task = match self.loading_change_sets.entry(buffer_id) {
1035 hash_map::Entry::Occupied(e) => e.get().clone(),
1036 hash_map::Entry::Vacant(entry) => {
1037 let load = match &self.state {
1038 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1039 BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
1040 };
1041
1042 entry
1043 .insert(
1044 cx.spawn(move |this, cx| async move {
1045 Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
1046 .await
1047 .map_err(Arc::new)
1048 })
1049 .shared(),
1050 )
1051 .clone()
1052 }
1053 };
1054
1055 cx.background_executor()
1056 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1057 }
1058
1059 #[cfg(any(test, feature = "test-support"))]
1060 pub fn set_change_set(&mut self, buffer_id: BufferId, change_set: Model<BufferChangeSet>) {
1061 self.loading_change_sets
1062 .insert(buffer_id, Task::ready(Ok(change_set)).shared());
1063 }
1064
1065 pub async fn open_unstaged_changes_internal(
1066 this: WeakModel<Self>,
1067 text: Result<Option<String>>,
1068 buffer: Model<Buffer>,
1069 mut cx: AsyncAppContext,
1070 ) -> Result<Model<BufferChangeSet>> {
1071 let text = match text {
1072 Err(e) => {
1073 this.update(&mut cx, |this, cx| {
1074 let buffer_id = buffer.read(cx).remote_id();
1075 this.loading_change_sets.remove(&buffer_id);
1076 })?;
1077 return Err(e);
1078 }
1079 Ok(text) => text,
1080 };
1081
1082 let change_set = buffer.update(&mut cx, |buffer, cx| {
1083 cx.new_model(|_| BufferChangeSet::new(buffer))
1084 })?;
1085
1086 if let Some(text) = text {
1087 change_set
1088 .update(&mut cx, |change_set, cx| {
1089 let snapshot = buffer.read(cx).text_snapshot();
1090 change_set.set_base_text(text, snapshot, cx)
1091 })?
1092 .await
1093 .ok();
1094 }
1095
1096 this.update(&mut cx, |this, cx| {
1097 let buffer_id = buffer.read(cx).remote_id();
1098 this.loading_change_sets.remove(&buffer_id);
1099 if let Some(OpenBuffer::Complete {
1100 unstaged_changes, ..
1101 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1102 {
1103 *unstaged_changes = Some(change_set.downgrade());
1104 }
1105 })?;
1106
1107 Ok(change_set)
1108 }
1109
1110 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1111 match &self.state {
1112 BufferStoreState::Local(this) => this.create_buffer(cx),
1113 BufferStoreState::Remote(this) => this.create_buffer(cx),
1114 }
1115 }
1116
1117 pub fn save_buffer(
1118 &mut self,
1119 buffer: Model<Buffer>,
1120 cx: &mut ModelContext<Self>,
1121 ) -> Task<Result<()>> {
1122 match &mut self.state {
1123 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1124 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1125 }
1126 }
1127
1128 pub fn save_buffer_as(
1129 &mut self,
1130 buffer: Model<Buffer>,
1131 path: ProjectPath,
1132 cx: &mut ModelContext<Self>,
1133 ) -> Task<Result<()>> {
1134 let old_file = buffer.read(cx).file().cloned();
1135 let task = match &self.state {
1136 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1137 BufferStoreState::Remote(this) => {
1138 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1139 }
1140 };
1141 cx.spawn(|this, mut cx| async move {
1142 task.await?;
1143 this.update(&mut cx, |_, cx| {
1144 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1145 })
1146 })
1147 }
1148
1149 pub fn blame_buffer(
1150 &self,
1151 buffer: &Model<Buffer>,
1152 version: Option<clock::Global>,
1153 cx: &AppContext,
1154 ) -> Task<Result<Option<Blame>>> {
1155 let buffer = buffer.read(cx);
1156 let Some(file) = File::from_dyn(buffer.file()) else {
1157 return Task::ready(Err(anyhow!("buffer has no file")));
1158 };
1159
1160 match file.worktree.clone().read(cx) {
1161 Worktree::Local(worktree) => {
1162 let worktree = worktree.snapshot();
1163 let blame_params = maybe!({
1164 let local_repo = match worktree.local_repo_for_path(&file.path) {
1165 Some(repo_for_path) => repo_for_path,
1166 None => return Ok(None),
1167 };
1168
1169 let relative_path = local_repo
1170 .relativize(&file.path)
1171 .context("failed to relativize buffer path")?;
1172
1173 let repo = local_repo.repo().clone();
1174
1175 let content = match version {
1176 Some(version) => buffer.rope_for_version(&version).clone(),
1177 None => buffer.as_rope().clone(),
1178 };
1179
1180 anyhow::Ok(Some((repo, relative_path, content)))
1181 });
1182
1183 cx.background_executor().spawn(async move {
1184 let Some((repo, relative_path, content)) = blame_params? else {
1185 return Ok(None);
1186 };
1187 repo.blame(&relative_path, content)
1188 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1189 .map(Some)
1190 })
1191 }
1192 Worktree::Remote(worktree) => {
1193 let buffer_id = buffer.remote_id();
1194 let version = buffer.version();
1195 let project_id = worktree.project_id();
1196 let client = worktree.client();
1197 cx.spawn(|_| async move {
1198 let response = client
1199 .request(proto::BlameBuffer {
1200 project_id,
1201 buffer_id: buffer_id.into(),
1202 version: serialize_version(&version),
1203 })
1204 .await?;
1205 Ok(deserialize_blame_buffer_response(response))
1206 })
1207 }
1208 }
1209 }
1210
1211 pub fn get_permalink_to_line(
1212 &self,
1213 buffer: &Model<Buffer>,
1214 selection: Range<u32>,
1215 cx: &AppContext,
1216 ) -> Task<Result<url::Url>> {
1217 let buffer = buffer.read(cx);
1218 let Some(file) = File::from_dyn(buffer.file()) else {
1219 return Task::ready(Err(anyhow!("buffer has no file")));
1220 };
1221
1222 match file.worktree.read(cx) {
1223 Worktree::Local(worktree) => {
1224 let worktree_path = worktree.abs_path().clone();
1225 let Some((repo_entry, repo)) =
1226 worktree.repository_for_path(file.path()).and_then(|entry| {
1227 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1228 Some((entry, repo))
1229 })
1230 else {
1231 // If we're not in a Git repo, check whether this is a Rust source
1232 // file in the Cargo registry (presumably opened with go-to-definition
1233 // from a normal Rust file). If so, we can put together a permalink
1234 // using crate metadata.
1235 if !buffer
1236 .language()
1237 .is_some_and(|lang| lang.name() == "Rust".into())
1238 {
1239 return Task::ready(Err(anyhow!("no permalink available")));
1240 }
1241 let file_path = worktree_path.join(file.path());
1242 return cx.spawn(|cx| async move {
1243 let provider_registry =
1244 cx.update(GitHostingProviderRegistry::default_global)?;
1245 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1246 .map_err(|_| anyhow!("no permalink available"))
1247 });
1248 };
1249
1250 let path = match repo_entry.relativize(file.path()) {
1251 Ok(RepoPath(path)) => path,
1252 Err(e) => return Task::ready(Err(e)),
1253 };
1254
1255 cx.spawn(|cx| async move {
1256 const REMOTE_NAME: &str = "origin";
1257 let origin_url = repo
1258 .remote_url(REMOTE_NAME)
1259 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1260
1261 let sha = repo
1262 .head_sha()
1263 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1264
1265 let provider_registry =
1266 cx.update(GitHostingProviderRegistry::default_global)?;
1267
1268 let (provider, remote) =
1269 parse_git_remote_url(provider_registry, &origin_url)
1270 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1271
1272 let path = path
1273 .to_str()
1274 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1275
1276 Ok(provider.build_permalink(
1277 remote,
1278 BuildPermalinkParams {
1279 sha: &sha,
1280 path,
1281 selection: Some(selection),
1282 },
1283 ))
1284 })
1285 }
1286 Worktree::Remote(worktree) => {
1287 let buffer_id = buffer.remote_id();
1288 let project_id = worktree.project_id();
1289 let client = worktree.client();
1290 cx.spawn(|_| async move {
1291 let response = client
1292 .request(proto::GetPermalinkToLine {
1293 project_id,
1294 buffer_id: buffer_id.into(),
1295 selection: Some(proto::Range {
1296 start: selection.start as u64,
1297 end: selection.end as u64,
1298 }),
1299 })
1300 .await?;
1301
1302 url::Url::parse(&response.permalink).context("failed to parse permalink")
1303 })
1304 }
1305 }
1306 }
1307
1308 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1309 let remote_id = buffer.read(cx).remote_id();
1310 let is_remote = buffer.read(cx).replica_id() != 0;
1311 let open_buffer = OpenBuffer::Complete {
1312 buffer: buffer.downgrade(),
1313 unstaged_changes: None,
1314 };
1315
1316 let handle = cx.handle().downgrade();
1317 buffer.update(cx, move |_, cx| {
1318 cx.on_release(move |buffer, cx| {
1319 handle
1320 .update(cx, |_, cx| {
1321 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1322 })
1323 .ok();
1324 })
1325 .detach()
1326 });
1327
1328 match self.opened_buffers.entry(remote_id) {
1329 hash_map::Entry::Vacant(entry) => {
1330 entry.insert(open_buffer);
1331 }
1332 hash_map::Entry::Occupied(mut entry) => {
1333 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1334 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1335 } else if entry.get().upgrade().is_some() {
1336 if is_remote {
1337 return Ok(());
1338 } else {
1339 debug_panic!("buffer {} was already registered", remote_id);
1340 Err(anyhow!("buffer {} was already registered", remote_id))?;
1341 }
1342 }
1343 entry.insert(open_buffer);
1344 }
1345 }
1346
1347 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1348 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1349 Ok(())
1350 }
1351
1352 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1353 self.opened_buffers
1354 .values()
1355 .filter_map(|buffer| buffer.upgrade())
1356 }
1357
1358 pub fn loading_buffers(
1359 &self,
1360 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Model<Buffer>>>)> {
1361 self.loading_buffers.iter().map(|(path, task)| {
1362 let task = task.clone();
1363 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1364 })
1365 }
1366
1367 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1368 self.buffers().find_map(|buffer| {
1369 let file = File::from_dyn(buffer.read(cx).file())?;
1370 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1371 Some(buffer)
1372 } else {
1373 None
1374 }
1375 })
1376 }
1377
1378 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1379 self.opened_buffers.get(&buffer_id)?.upgrade()
1380 }
1381
1382 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1383 self.get(buffer_id)
1384 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1385 }
1386
1387 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1388 self.get(buffer_id).or_else(|| {
1389 self.as_remote()
1390 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1391 })
1392 }
1393
1394 pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Model<BufferChangeSet>> {
1395 if let OpenBuffer::Complete {
1396 unstaged_changes, ..
1397 } = self.opened_buffers.get(&buffer_id)?
1398 {
1399 unstaged_changes.as_ref()?.upgrade()
1400 } else {
1401 None
1402 }
1403 }
1404
1405 pub fn buffer_version_info(
1406 &self,
1407 cx: &AppContext,
1408 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1409 let buffers = self
1410 .buffers()
1411 .map(|buffer| {
1412 let buffer = buffer.read(cx);
1413 proto::BufferVersion {
1414 id: buffer.remote_id().into(),
1415 version: language::proto::serialize_version(&buffer.version),
1416 }
1417 })
1418 .collect();
1419 let incomplete_buffer_ids = self
1420 .as_remote()
1421 .map(|remote| remote.incomplete_buffer_ids())
1422 .unwrap_or_default();
1423 (buffers, incomplete_buffer_ids)
1424 }
1425
1426 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1427 for open_buffer in self.opened_buffers.values_mut() {
1428 if let Some(buffer) = open_buffer.upgrade() {
1429 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1430 }
1431 }
1432
1433 for buffer in self.buffers() {
1434 buffer.update(cx, |buffer, cx| {
1435 buffer.set_capability(Capability::ReadOnly, cx)
1436 });
1437 }
1438
1439 if let Some(remote) = self.as_remote_mut() {
1440 // Wake up all futures currently waiting on a buffer to get opened,
1441 // to give them a chance to fail now that we've disconnected.
1442 remote.remote_buffer_listeners.clear()
1443 }
1444 }
1445
1446 pub fn shared(
1447 &mut self,
1448 remote_id: u64,
1449 downstream_client: AnyProtoClient,
1450 _cx: &mut AppContext,
1451 ) {
1452 self.downstream_client = Some((downstream_client, remote_id));
1453 }
1454
1455 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1456 self.downstream_client.take();
1457 self.forget_shared_buffers();
1458 }
1459
1460 pub fn discard_incomplete(&mut self) {
1461 self.opened_buffers
1462 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1463 }
1464
1465 pub fn find_search_candidates(
1466 &mut self,
1467 query: &SearchQuery,
1468 mut limit: usize,
1469 fs: Arc<dyn Fs>,
1470 cx: &mut ModelContext<Self>,
1471 ) -> Receiver<Model<Buffer>> {
1472 let (tx, rx) = smol::channel::unbounded();
1473 let mut open_buffers = HashSet::default();
1474 let mut unnamed_buffers = Vec::new();
1475 for handle in self.buffers() {
1476 let buffer = handle.read(cx);
1477 if let Some(entry_id) = buffer.entry_id(cx) {
1478 open_buffers.insert(entry_id);
1479 } else {
1480 limit = limit.saturating_sub(1);
1481 unnamed_buffers.push(handle)
1482 };
1483 }
1484
1485 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1486 let mut project_paths_rx = self
1487 .worktree_store
1488 .update(cx, |worktree_store, cx| {
1489 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1490 })
1491 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1492
1493 cx.spawn(|this, mut cx| async move {
1494 for buffer in unnamed_buffers {
1495 tx.send(buffer).await.ok();
1496 }
1497
1498 while let Some(project_paths) = project_paths_rx.next().await {
1499 let buffers = this.update(&mut cx, |this, cx| {
1500 project_paths
1501 .into_iter()
1502 .map(|project_path| this.open_buffer(project_path, cx))
1503 .collect::<Vec<_>>()
1504 })?;
1505 for buffer_task in buffers {
1506 if let Some(buffer) = buffer_task.await.log_err() {
1507 if tx.send(buffer).await.is_err() {
1508 return anyhow::Ok(());
1509 }
1510 }
1511 }
1512 }
1513 anyhow::Ok(())
1514 })
1515 .detach();
1516 rx
1517 }
1518
1519 pub fn recalculate_buffer_diffs(
1520 &mut self,
1521 buffers: Vec<Model<Buffer>>,
1522 cx: &mut ModelContext<Self>,
1523 ) -> impl Future<Output = ()> {
1524 let mut futures = Vec::new();
1525 for buffer in buffers {
1526 let buffer = buffer.read(cx).text_snapshot();
1527 if let Some(OpenBuffer::Complete {
1528 unstaged_changes, ..
1529 }) = self.opened_buffers.get_mut(&buffer.remote_id())
1530 {
1531 if let Some(unstaged_changes) = unstaged_changes
1532 .as_ref()
1533 .and_then(|changes| changes.upgrade())
1534 {
1535 unstaged_changes.update(cx, |unstaged_changes, cx| {
1536 futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
1537 });
1538 } else {
1539 unstaged_changes.take();
1540 }
1541 }
1542 }
1543 async move {
1544 futures::future::join_all(futures).await;
1545 }
1546 }
1547
1548 fn on_buffer_event(
1549 &mut self,
1550 buffer: Model<Buffer>,
1551 event: &BufferEvent,
1552 cx: &mut ModelContext<Self>,
1553 ) {
1554 match event {
1555 BufferEvent::FileHandleChanged => {
1556 if let Some(local) = self.as_local_mut() {
1557 local.buffer_changed_file(buffer, cx);
1558 }
1559 }
1560 BufferEvent::Reloaded => {
1561 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1562 return;
1563 };
1564 let buffer = buffer.read(cx);
1565 downstream_client
1566 .send(proto::BufferReloaded {
1567 project_id: *project_id,
1568 buffer_id: buffer.remote_id().to_proto(),
1569 version: serialize_version(&buffer.version()),
1570 mtime: buffer.saved_mtime().map(|t| t.into()),
1571 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1572 })
1573 .log_err();
1574 }
1575 _ => {}
1576 }
1577 }
1578
1579 pub async fn handle_update_buffer(
1580 this: Model<Self>,
1581 envelope: TypedEnvelope<proto::UpdateBuffer>,
1582 mut cx: AsyncAppContext,
1583 ) -> Result<proto::Ack> {
1584 let payload = envelope.payload.clone();
1585 let buffer_id = BufferId::new(payload.buffer_id)?;
1586 let ops = payload
1587 .operations
1588 .into_iter()
1589 .map(language::proto::deserialize_operation)
1590 .collect::<Result<Vec<_>, _>>()?;
1591 this.update(&mut cx, |this, cx| {
1592 match this.opened_buffers.entry(buffer_id) {
1593 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1594 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1595 OpenBuffer::Complete { buffer, .. } => {
1596 if let Some(buffer) = buffer.upgrade() {
1597 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1598 }
1599 }
1600 },
1601 hash_map::Entry::Vacant(e) => {
1602 e.insert(OpenBuffer::Operations(ops));
1603 }
1604 }
1605 Ok(proto::Ack {})
1606 })?
1607 }
1608
1609 pub fn register_shared_lsp_handle(
1610 &mut self,
1611 peer_id: proto::PeerId,
1612 buffer_id: BufferId,
1613 handle: OpenLspBufferHandle,
1614 ) {
1615 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
1616 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
1617 buffer.lsp_handle = Some(handle);
1618 return;
1619 }
1620 }
1621 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1622 }
1623
1624 pub fn handle_synchronize_buffers(
1625 &mut self,
1626 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1627 cx: &mut ModelContext<Self>,
1628 client: Arc<Client>,
1629 ) -> Result<proto::SynchronizeBuffersResponse> {
1630 let project_id = envelope.payload.project_id;
1631 let mut response = proto::SynchronizeBuffersResponse {
1632 buffers: Default::default(),
1633 };
1634 let Some(guest_id) = envelope.original_sender_id else {
1635 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1636 };
1637
1638 self.shared_buffers.entry(guest_id).or_default().clear();
1639 for buffer in envelope.payload.buffers {
1640 let buffer_id = BufferId::new(buffer.id)?;
1641 let remote_version = language::proto::deserialize_version(&buffer.version);
1642 if let Some(buffer) = self.get(buffer_id) {
1643 self.shared_buffers
1644 .entry(guest_id)
1645 .or_default()
1646 .entry(buffer_id)
1647 .or_insert_with(|| SharedBuffer {
1648 buffer: buffer.clone(),
1649 unstaged_changes: None,
1650 lsp_handle: None,
1651 });
1652
1653 let buffer = buffer.read(cx);
1654 response.buffers.push(proto::BufferVersion {
1655 id: buffer_id.into(),
1656 version: language::proto::serialize_version(&buffer.version),
1657 });
1658
1659 let operations = buffer.serialize_ops(Some(remote_version), cx);
1660 let client = client.clone();
1661 if let Some(file) = buffer.file() {
1662 client
1663 .send(proto::UpdateBufferFile {
1664 project_id,
1665 buffer_id: buffer_id.into(),
1666 file: Some(file.to_proto(cx)),
1667 })
1668 .log_err();
1669 }
1670
1671 // TODO(max): do something
1672 // client
1673 // .send(proto::UpdateStagedText {
1674 // project_id,
1675 // buffer_id: buffer_id.into(),
1676 // diff_base: buffer.diff_base().map(ToString::to_string),
1677 // })
1678 // .log_err();
1679
1680 client
1681 .send(proto::BufferReloaded {
1682 project_id,
1683 buffer_id: buffer_id.into(),
1684 version: language::proto::serialize_version(buffer.saved_version()),
1685 mtime: buffer.saved_mtime().map(|time| time.into()),
1686 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1687 as i32,
1688 })
1689 .log_err();
1690
1691 cx.background_executor()
1692 .spawn(
1693 async move {
1694 let operations = operations.await;
1695 for chunk in split_operations(operations) {
1696 client
1697 .request(proto::UpdateBuffer {
1698 project_id,
1699 buffer_id: buffer_id.into(),
1700 operations: chunk,
1701 })
1702 .await?;
1703 }
1704 anyhow::Ok(())
1705 }
1706 .log_err(),
1707 )
1708 .detach();
1709 }
1710 }
1711 Ok(response)
1712 }
1713
1714 pub fn handle_create_buffer_for_peer(
1715 &mut self,
1716 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1717 replica_id: u16,
1718 capability: Capability,
1719 cx: &mut ModelContext<Self>,
1720 ) -> Result<()> {
1721 let Some(remote) = self.as_remote_mut() else {
1722 return Err(anyhow!("buffer store is not a remote"));
1723 };
1724
1725 if let Some(buffer) =
1726 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1727 {
1728 self.add_buffer(buffer, cx)?;
1729 }
1730
1731 Ok(())
1732 }
1733
1734 pub async fn handle_update_buffer_file(
1735 this: Model<Self>,
1736 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1737 mut cx: AsyncAppContext,
1738 ) -> Result<()> {
1739 let buffer_id = envelope.payload.buffer_id;
1740 let buffer_id = BufferId::new(buffer_id)?;
1741
1742 this.update(&mut cx, |this, cx| {
1743 let payload = envelope.payload.clone();
1744 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1745 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1746 let worktree = this
1747 .worktree_store
1748 .read(cx)
1749 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1750 .ok_or_else(|| anyhow!("no such worktree"))?;
1751 let file = File::from_proto(file, worktree, cx)?;
1752 let old_file = buffer.update(cx, |buffer, cx| {
1753 let old_file = buffer.file().cloned();
1754 let new_path = file.path.clone();
1755 buffer.file_updated(Arc::new(file), cx);
1756 if old_file
1757 .as_ref()
1758 .map_or(true, |old| *old.path() != new_path)
1759 {
1760 Some(old_file)
1761 } else {
1762 None
1763 }
1764 });
1765 if let Some(old_file) = old_file {
1766 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1767 }
1768 }
1769 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1770 downstream_client
1771 .send(proto::UpdateBufferFile {
1772 project_id: *project_id,
1773 buffer_id: buffer_id.into(),
1774 file: envelope.payload.file,
1775 })
1776 .log_err();
1777 }
1778 Ok(())
1779 })?
1780 }
1781
1782 pub async fn handle_save_buffer(
1783 this: Model<Self>,
1784 envelope: TypedEnvelope<proto::SaveBuffer>,
1785 mut cx: AsyncAppContext,
1786 ) -> Result<proto::BufferSaved> {
1787 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1788 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1789 anyhow::Ok((
1790 this.get_existing(buffer_id)?,
1791 this.downstream_client
1792 .as_ref()
1793 .map(|(_, project_id)| *project_id)
1794 .context("project is not shared")?,
1795 ))
1796 })??;
1797 buffer
1798 .update(&mut cx, |buffer, _| {
1799 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1800 })?
1801 .await?;
1802 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1803
1804 if let Some(new_path) = envelope.payload.new_path {
1805 let new_path = ProjectPath::from_proto(new_path);
1806 this.update(&mut cx, |this, cx| {
1807 this.save_buffer_as(buffer.clone(), new_path, cx)
1808 })?
1809 .await?;
1810 } else {
1811 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1812 .await?;
1813 }
1814
1815 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1816 project_id,
1817 buffer_id: buffer_id.into(),
1818 version: serialize_version(buffer.saved_version()),
1819 mtime: buffer.saved_mtime().map(|time| time.into()),
1820 })
1821 }
1822
1823 pub async fn handle_close_buffer(
1824 this: Model<Self>,
1825 envelope: TypedEnvelope<proto::CloseBuffer>,
1826 mut cx: AsyncAppContext,
1827 ) -> Result<()> {
1828 let peer_id = envelope.sender_id;
1829 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1830 this.update(&mut cx, |this, _| {
1831 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1832 if shared.remove(&buffer_id).is_some() {
1833 if shared.is_empty() {
1834 this.shared_buffers.remove(&peer_id);
1835 }
1836 return;
1837 }
1838 }
1839 debug_panic!(
1840 "peer_id {} closed buffer_id {} which was either not open or already closed",
1841 peer_id,
1842 buffer_id
1843 )
1844 })
1845 }
1846
1847 pub async fn handle_buffer_saved(
1848 this: Model<Self>,
1849 envelope: TypedEnvelope<proto::BufferSaved>,
1850 mut cx: AsyncAppContext,
1851 ) -> Result<()> {
1852 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1853 let version = deserialize_version(&envelope.payload.version);
1854 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1855 this.update(&mut cx, move |this, cx| {
1856 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1857 buffer.update(cx, |buffer, cx| {
1858 buffer.did_save(version, mtime, cx);
1859 });
1860 }
1861
1862 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1863 downstream_client
1864 .send(proto::BufferSaved {
1865 project_id: *project_id,
1866 buffer_id: buffer_id.into(),
1867 mtime: envelope.payload.mtime,
1868 version: envelope.payload.version,
1869 })
1870 .log_err();
1871 }
1872 })
1873 }
1874
1875 pub async fn handle_buffer_reloaded(
1876 this: Model<Self>,
1877 envelope: TypedEnvelope<proto::BufferReloaded>,
1878 mut cx: AsyncAppContext,
1879 ) -> Result<()> {
1880 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1881 let version = deserialize_version(&envelope.payload.version);
1882 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1883 let line_ending = deserialize_line_ending(
1884 proto::LineEnding::from_i32(envelope.payload.line_ending)
1885 .ok_or_else(|| anyhow!("missing line ending"))?,
1886 );
1887 this.update(&mut cx, |this, cx| {
1888 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1889 buffer.update(cx, |buffer, cx| {
1890 buffer.did_reload(version, line_ending, mtime, cx);
1891 });
1892 }
1893
1894 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1895 downstream_client
1896 .send(proto::BufferReloaded {
1897 project_id: *project_id,
1898 buffer_id: buffer_id.into(),
1899 mtime: envelope.payload.mtime,
1900 version: envelope.payload.version,
1901 line_ending: envelope.payload.line_ending,
1902 })
1903 .log_err();
1904 }
1905 })
1906 }
1907
1908 pub async fn handle_blame_buffer(
1909 this: Model<Self>,
1910 envelope: TypedEnvelope<proto::BlameBuffer>,
1911 mut cx: AsyncAppContext,
1912 ) -> Result<proto::BlameBufferResponse> {
1913 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1914 let version = deserialize_version(&envelope.payload.version);
1915 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1916 buffer
1917 .update(&mut cx, |buffer, _| {
1918 buffer.wait_for_version(version.clone())
1919 })?
1920 .await?;
1921 let blame = this
1922 .update(&mut cx, |this, cx| {
1923 this.blame_buffer(&buffer, Some(version), cx)
1924 })?
1925 .await?;
1926 Ok(serialize_blame_buffer_response(blame))
1927 }
1928
1929 pub async fn handle_get_permalink_to_line(
1930 this: Model<Self>,
1931 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1932 mut cx: AsyncAppContext,
1933 ) -> Result<proto::GetPermalinkToLineResponse> {
1934 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1935 // let version = deserialize_version(&envelope.payload.version);
1936 let selection = {
1937 let proto_selection = envelope
1938 .payload
1939 .selection
1940 .context("no selection to get permalink for defined")?;
1941 proto_selection.start as u32..proto_selection.end as u32
1942 };
1943 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1944 let permalink = this
1945 .update(&mut cx, |this, cx| {
1946 this.get_permalink_to_line(&buffer, selection, cx)
1947 })?
1948 .await?;
1949 Ok(proto::GetPermalinkToLineResponse {
1950 permalink: permalink.to_string(),
1951 })
1952 }
1953
1954 pub async fn handle_get_staged_text(
1955 this: Model<Self>,
1956 request: TypedEnvelope<proto::GetStagedText>,
1957 mut cx: AsyncAppContext,
1958 ) -> Result<proto::GetStagedTextResponse> {
1959 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1960 let change_set = this
1961 .update(&mut cx, |this, cx| {
1962 let buffer = this.get(buffer_id)?;
1963 Some(this.open_unstaged_changes(buffer, cx))
1964 })?
1965 .ok_or_else(|| anyhow!("no such buffer"))?
1966 .await?;
1967 this.update(&mut cx, |this, _| {
1968 let shared_buffers = this
1969 .shared_buffers
1970 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1971 .or_default();
1972 debug_assert!(shared_buffers.contains_key(&buffer_id));
1973 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
1974 shared.unstaged_changes = Some(change_set.clone());
1975 }
1976 })?;
1977 let staged_text = change_set.read_with(&cx, |change_set, cx| {
1978 change_set
1979 .base_text
1980 .as_ref()
1981 .map(|buffer| buffer.read(cx).text())
1982 })?;
1983 Ok(proto::GetStagedTextResponse { staged_text })
1984 }
1985
1986 pub async fn handle_update_diff_base(
1987 this: Model<Self>,
1988 request: TypedEnvelope<proto::UpdateDiffBase>,
1989 mut cx: AsyncAppContext,
1990 ) -> Result<()> {
1991 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1992 let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
1993 if let OpenBuffer::Complete {
1994 unstaged_changes,
1995 buffer,
1996 } = this.opened_buffers.get(&buffer_id)?
1997 {
1998 Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
1999 } else {
2000 None
2001 }
2002 })?
2003 else {
2004 return Ok(());
2005 };
2006 change_set.update(&mut cx, |change_set, cx| {
2007 if let Some(staged_text) = request.payload.staged_text {
2008 let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
2009 } else {
2010 change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
2011 }
2012 })?;
2013 Ok(())
2014 }
2015
2016 pub fn reload_buffers(
2017 &self,
2018 buffers: HashSet<Model<Buffer>>,
2019 push_to_history: bool,
2020 cx: &mut ModelContext<Self>,
2021 ) -> Task<Result<ProjectTransaction>> {
2022 if buffers.is_empty() {
2023 return Task::ready(Ok(ProjectTransaction::default()));
2024 }
2025 match &self.state {
2026 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2027 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2028 }
2029 }
2030
2031 async fn handle_reload_buffers(
2032 this: Model<Self>,
2033 envelope: TypedEnvelope<proto::ReloadBuffers>,
2034 mut cx: AsyncAppContext,
2035 ) -> Result<proto::ReloadBuffersResponse> {
2036 let sender_id = envelope.original_sender_id().unwrap_or_default();
2037 let reload = this.update(&mut cx, |this, cx| {
2038 let mut buffers = HashSet::default();
2039 for buffer_id in &envelope.payload.buffer_ids {
2040 let buffer_id = BufferId::new(*buffer_id)?;
2041 buffers.insert(this.get_existing(buffer_id)?);
2042 }
2043 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2044 })??;
2045
2046 let project_transaction = reload.await?;
2047 let project_transaction = this.update(&mut cx, |this, cx| {
2048 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2049 })?;
2050 Ok(proto::ReloadBuffersResponse {
2051 transaction: Some(project_transaction),
2052 })
2053 }
2054
2055 pub fn create_buffer_for_peer(
2056 &mut self,
2057 buffer: &Model<Buffer>,
2058 peer_id: proto::PeerId,
2059 cx: &mut ModelContext<Self>,
2060 ) -> Task<Result<()>> {
2061 let buffer_id = buffer.read(cx).remote_id();
2062 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2063 if shared_buffers.contains_key(&buffer_id) {
2064 return Task::ready(Ok(()));
2065 }
2066 shared_buffers.insert(
2067 buffer_id,
2068 SharedBuffer {
2069 buffer: buffer.clone(),
2070 unstaged_changes: None,
2071 lsp_handle: None,
2072 },
2073 );
2074
2075 let Some((client, project_id)) = self.downstream_client.clone() else {
2076 return Task::ready(Ok(()));
2077 };
2078
2079 cx.spawn(|this, mut cx| async move {
2080 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2081 return anyhow::Ok(());
2082 };
2083
2084 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2085 let operations = operations.await;
2086 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2087
2088 let initial_state = proto::CreateBufferForPeer {
2089 project_id,
2090 peer_id: Some(peer_id),
2091 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2092 };
2093
2094 if client.send(initial_state).log_err().is_some() {
2095 let client = client.clone();
2096 cx.background_executor()
2097 .spawn(async move {
2098 let mut chunks = split_operations(operations).peekable();
2099 while let Some(chunk) = chunks.next() {
2100 let is_last = chunks.peek().is_none();
2101 client.send(proto::CreateBufferForPeer {
2102 project_id,
2103 peer_id: Some(peer_id),
2104 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2105 proto::BufferChunk {
2106 buffer_id: buffer_id.into(),
2107 operations: chunk,
2108 is_last,
2109 },
2110 )),
2111 })?;
2112 }
2113 anyhow::Ok(())
2114 })
2115 .await
2116 .log_err();
2117 }
2118 Ok(())
2119 })
2120 }
2121
2122 pub fn forget_shared_buffers(&mut self) {
2123 self.shared_buffers.clear();
2124 }
2125
2126 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2127 self.shared_buffers.remove(peer_id);
2128 }
2129
2130 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2131 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2132 self.shared_buffers.insert(new_peer_id, buffers);
2133 }
2134 }
2135
2136 pub fn has_shared_buffers(&self) -> bool {
2137 !self.shared_buffers.is_empty()
2138 }
2139
2140 pub fn create_local_buffer(
2141 &mut self,
2142 text: &str,
2143 language: Option<Arc<Language>>,
2144 cx: &mut ModelContext<Self>,
2145 ) -> Model<Buffer> {
2146 let buffer = cx.new_model(|cx| {
2147 Buffer::local(text, cx)
2148 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2149 });
2150
2151 self.add_buffer(buffer.clone(), cx).log_err();
2152 let buffer_id = buffer.read(cx).remote_id();
2153
2154 let this = self
2155 .as_local_mut()
2156 .expect("local-only method called in a non-local context");
2157 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2158 this.local_buffer_ids_by_path.insert(
2159 ProjectPath {
2160 worktree_id: file.worktree_id(cx),
2161 path: file.path.clone(),
2162 },
2163 buffer_id,
2164 );
2165
2166 if let Some(entry_id) = file.entry_id {
2167 this.local_buffer_ids_by_entry_id
2168 .insert(entry_id, buffer_id);
2169 }
2170 }
2171 buffer
2172 }
2173
2174 pub fn deserialize_project_transaction(
2175 &mut self,
2176 message: proto::ProjectTransaction,
2177 push_to_history: bool,
2178 cx: &mut ModelContext<Self>,
2179 ) -> Task<Result<ProjectTransaction>> {
2180 if let Some(this) = self.as_remote_mut() {
2181 this.deserialize_project_transaction(message, push_to_history, cx)
2182 } else {
2183 debug_panic!("not a remote buffer store");
2184 Task::ready(Err(anyhow!("not a remote buffer store")))
2185 }
2186 }
2187
2188 pub fn wait_for_remote_buffer(
2189 &mut self,
2190 id: BufferId,
2191 cx: &mut ModelContext<BufferStore>,
2192 ) -> Task<Result<Model<Buffer>>> {
2193 if let Some(this) = self.as_remote_mut() {
2194 this.wait_for_remote_buffer(id, cx)
2195 } else {
2196 debug_panic!("not a remote buffer store");
2197 Task::ready(Err(anyhow!("not a remote buffer store")))
2198 }
2199 }
2200
2201 pub fn serialize_project_transaction_for_peer(
2202 &mut self,
2203 project_transaction: ProjectTransaction,
2204 peer_id: proto::PeerId,
2205 cx: &mut ModelContext<Self>,
2206 ) -> proto::ProjectTransaction {
2207 let mut serialized_transaction = proto::ProjectTransaction {
2208 buffer_ids: Default::default(),
2209 transactions: Default::default(),
2210 };
2211 for (buffer, transaction) in project_transaction.0 {
2212 self.create_buffer_for_peer(&buffer, peer_id, cx)
2213 .detach_and_log_err(cx);
2214 serialized_transaction
2215 .buffer_ids
2216 .push(buffer.read(cx).remote_id().into());
2217 serialized_transaction
2218 .transactions
2219 .push(language::proto::serialize_transaction(&transaction));
2220 }
2221 serialized_transaction
2222 }
2223}
2224
2225impl BufferChangeSet {
2226 pub fn new(buffer: &text::BufferSnapshot) -> Self {
2227 Self {
2228 buffer_id: buffer.remote_id(),
2229 base_text: None,
2230 diff_to_buffer: git::diff::BufferDiff::new(buffer),
2231 recalculate_diff_task: None,
2232 diff_updated_futures: Vec::new(),
2233 base_text_version: 0,
2234 }
2235 }
2236
2237 #[cfg(any(test, feature = "test-support"))]
2238 pub fn new_with_base_text(
2239 base_text: String,
2240 buffer: text::BufferSnapshot,
2241 cx: &mut ModelContext<Self>,
2242 ) -> Self {
2243 let mut this = Self::new(&buffer);
2244 let _ = this.set_base_text(base_text, buffer, cx);
2245 this
2246 }
2247
2248 pub fn diff_hunks_intersecting_range<'a>(
2249 &'a self,
2250 range: Range<text::Anchor>,
2251 buffer_snapshot: &'a text::BufferSnapshot,
2252 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2253 self.diff_to_buffer
2254 .hunks_intersecting_range(range, buffer_snapshot)
2255 }
2256
2257 pub fn diff_hunks_intersecting_range_rev<'a>(
2258 &'a self,
2259 range: Range<text::Anchor>,
2260 buffer_snapshot: &'a text::BufferSnapshot,
2261 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2262 self.diff_to_buffer
2263 .hunks_intersecting_range_rev(range, buffer_snapshot)
2264 }
2265
2266 #[cfg(any(test, feature = "test-support"))]
2267 pub fn base_text_string(&self, cx: &AppContext) -> Option<String> {
2268 self.base_text.as_ref().map(|buffer| buffer.read(cx).text())
2269 }
2270
2271 pub fn set_base_text(
2272 &mut self,
2273 mut base_text: String,
2274 buffer_snapshot: text::BufferSnapshot,
2275 cx: &mut ModelContext<Self>,
2276 ) -> oneshot::Receiver<()> {
2277 LineEnding::normalize(&mut base_text);
2278 self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
2279 }
2280
2281 pub fn unset_base_text(
2282 &mut self,
2283 buffer_snapshot: text::BufferSnapshot,
2284 cx: &mut ModelContext<Self>,
2285 ) {
2286 if self.base_text.is_some() {
2287 self.base_text = None;
2288 self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
2289 self.recalculate_diff_task.take();
2290 self.base_text_version += 1;
2291 cx.notify();
2292 }
2293 }
2294
2295 pub fn recalculate_diff(
2296 &mut self,
2297 buffer_snapshot: text::BufferSnapshot,
2298 cx: &mut ModelContext<Self>,
2299 ) -> oneshot::Receiver<()> {
2300 if let Some(base_text) = self.base_text.clone() {
2301 self.recalculate_diff_internal(base_text.read(cx).text(), buffer_snapshot, false, cx)
2302 } else {
2303 oneshot::channel().1
2304 }
2305 }
2306
2307 fn recalculate_diff_internal(
2308 &mut self,
2309 base_text: String,
2310 buffer_snapshot: text::BufferSnapshot,
2311 base_text_changed: bool,
2312 cx: &mut ModelContext<Self>,
2313 ) -> oneshot::Receiver<()> {
2314 let (tx, rx) = oneshot::channel();
2315 self.diff_updated_futures.push(tx);
2316 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2317 let (base_text, diff) = cx
2318 .background_executor()
2319 .spawn(async move {
2320 let diff = BufferDiff::build(&base_text, &buffer_snapshot).await;
2321 (base_text, diff)
2322 })
2323 .await;
2324 this.update(&mut cx, |this, cx| {
2325 if base_text_changed {
2326 this.base_text_version += 1;
2327 this.base_text = Some(cx.new_model(|cx| {
2328 Buffer::local_normalized(Rope::from(base_text), LineEnding::default(), cx)
2329 }));
2330 }
2331 this.diff_to_buffer = diff;
2332 this.recalculate_diff_task.take();
2333 for tx in this.diff_updated_futures.drain(..) {
2334 tx.send(()).ok();
2335 }
2336 cx.notify();
2337 })?;
2338 Ok(())
2339 }));
2340 rx
2341 }
2342}
2343
2344impl OpenBuffer {
2345 fn upgrade(&self) -> Option<Model<Buffer>> {
2346 match self {
2347 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2348 OpenBuffer::Operations(_) => None,
2349 }
2350 }
2351}
2352
2353fn is_not_found_error(error: &anyhow::Error) -> bool {
2354 error
2355 .root_cause()
2356 .downcast_ref::<io::Error>()
2357 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2358}
2359
2360fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2361 let Some(blame) = blame else {
2362 return proto::BlameBufferResponse {
2363 blame_response: None,
2364 };
2365 };
2366
2367 let entries = blame
2368 .entries
2369 .into_iter()
2370 .map(|entry| proto::BlameEntry {
2371 sha: entry.sha.as_bytes().into(),
2372 start_line: entry.range.start,
2373 end_line: entry.range.end,
2374 original_line_number: entry.original_line_number,
2375 author: entry.author.clone(),
2376 author_mail: entry.author_mail.clone(),
2377 author_time: entry.author_time,
2378 author_tz: entry.author_tz.clone(),
2379 committer: entry.committer.clone(),
2380 committer_mail: entry.committer_mail.clone(),
2381 committer_time: entry.committer_time,
2382 committer_tz: entry.committer_tz.clone(),
2383 summary: entry.summary.clone(),
2384 previous: entry.previous.clone(),
2385 filename: entry.filename.clone(),
2386 })
2387 .collect::<Vec<_>>();
2388
2389 let messages = blame
2390 .messages
2391 .into_iter()
2392 .map(|(oid, message)| proto::CommitMessage {
2393 oid: oid.as_bytes().into(),
2394 message,
2395 })
2396 .collect::<Vec<_>>();
2397
2398 let permalinks = blame
2399 .permalinks
2400 .into_iter()
2401 .map(|(oid, url)| proto::CommitPermalink {
2402 oid: oid.as_bytes().into(),
2403 permalink: url.to_string(),
2404 })
2405 .collect::<Vec<_>>();
2406
2407 proto::BlameBufferResponse {
2408 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2409 entries,
2410 messages,
2411 permalinks,
2412 remote_url: blame.remote_url,
2413 }),
2414 }
2415}
2416
2417fn deserialize_blame_buffer_response(
2418 response: proto::BlameBufferResponse,
2419) -> Option<git::blame::Blame> {
2420 let response = response.blame_response?;
2421 let entries = response
2422 .entries
2423 .into_iter()
2424 .filter_map(|entry| {
2425 Some(git::blame::BlameEntry {
2426 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2427 range: entry.start_line..entry.end_line,
2428 original_line_number: entry.original_line_number,
2429 committer: entry.committer,
2430 committer_time: entry.committer_time,
2431 committer_tz: entry.committer_tz,
2432 committer_mail: entry.committer_mail,
2433 author: entry.author,
2434 author_mail: entry.author_mail,
2435 author_time: entry.author_time,
2436 author_tz: entry.author_tz,
2437 summary: entry.summary,
2438 previous: entry.previous,
2439 filename: entry.filename,
2440 })
2441 })
2442 .collect::<Vec<_>>();
2443
2444 let messages = response
2445 .messages
2446 .into_iter()
2447 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2448 .collect::<HashMap<_, _>>();
2449
2450 let permalinks = response
2451 .permalinks
2452 .into_iter()
2453 .filter_map(|permalink| {
2454 Some((
2455 git::Oid::from_bytes(&permalink.oid).ok()?,
2456 Url::from_str(&permalink.permalink).ok()?,
2457 ))
2458 })
2459 .collect::<HashMap<_, _>>();
2460
2461 Some(Blame {
2462 entries,
2463 permalinks,
2464 messages,
2465 remote_url: response.remote_url,
2466 })
2467}
2468
2469fn get_permalink_in_rust_registry_src(
2470 provider_registry: Arc<GitHostingProviderRegistry>,
2471 path: PathBuf,
2472 selection: Range<u32>,
2473) -> Result<url::Url> {
2474 #[derive(Deserialize)]
2475 struct CargoVcsGit {
2476 sha1: String,
2477 }
2478
2479 #[derive(Deserialize)]
2480 struct CargoVcsInfo {
2481 git: CargoVcsGit,
2482 path_in_vcs: String,
2483 }
2484
2485 #[derive(Deserialize)]
2486 struct CargoPackage {
2487 repository: String,
2488 }
2489
2490 #[derive(Deserialize)]
2491 struct CargoToml {
2492 package: CargoPackage,
2493 }
2494
2495 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2496 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2497 Some((dir, json))
2498 }) else {
2499 bail!("No .cargo_vcs_info.json found in parent directories")
2500 };
2501 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2502 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2503 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2504 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2505 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2506 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2507 let permalink = provider.build_permalink(
2508 remote,
2509 BuildPermalinkParams {
2510 sha: &cargo_vcs_info.git.sha1,
2511 path: &path.to_string_lossy(),
2512 selection: Some(selection),
2513 },
2514 );
2515 Ok(permalink)
2516}