1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
13use git::{blame::Blame, diff::BufferDiff};
14use gpui::{
15 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
16 Task, WeakModel,
17};
18use http_client::Url;
19use language::{
20 proto::{
21 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
22 split_operations,
23 },
24 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
25};
26use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
27use smol::channel::Receiver;
28use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
29use text::{BufferId, LineEnding, Rope};
30use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
31use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
32
33/// A set of open buffers.
34pub struct BufferStore {
35 state: BufferStoreState,
36 #[allow(clippy::type_complexity)]
37 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Model<Buffer>, Arc<anyhow::Error>>>>>,
38 #[allow(clippy::type_complexity)]
39 loading_change_sets:
40 HashMap<BufferId, Shared<Task<Result<Model<BufferChangeSet>, Arc<anyhow::Error>>>>>,
41 worktree_store: Model<WorktreeStore>,
42 opened_buffers: HashMap<BufferId, OpenBuffer>,
43 downstream_client: Option<(AnyProtoClient, u64)>,
44 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
45}
46
47#[derive(Hash, Eq, PartialEq, Clone)]
48struct SharedBuffer {
49 buffer: Model<Buffer>,
50 unstaged_changes: Option<Model<BufferChangeSet>>,
51 lsp_handle: Option<OpenLspBufferHandle>,
52}
53
54#[derive(Debug)]
55pub struct BufferChangeSet {
56 pub buffer_id: BufferId,
57 pub base_text: Option<Model<Buffer>>,
58 pub diff_to_buffer: git::diff::BufferDiff,
59 pub recalculate_diff_task: Option<Task<Result<()>>>,
60 pub diff_updated_futures: Vec<oneshot::Sender<()>>,
61 pub base_text_version: usize,
62}
63
64enum BufferStoreState {
65 Local(LocalBufferStore),
66 Remote(RemoteBufferStore),
67}
68
69struct RemoteBufferStore {
70 shared_with_me: HashSet<Model<Buffer>>,
71 upstream_client: AnyProtoClient,
72 project_id: u64,
73 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
74 remote_buffer_listeners:
75 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
76 worktree_store: Model<WorktreeStore>,
77}
78
79struct LocalBufferStore {
80 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
81 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
82 worktree_store: Model<WorktreeStore>,
83 _subscription: Subscription,
84}
85
86enum OpenBuffer {
87 Complete {
88 buffer: WeakModel<Buffer>,
89 unstaged_changes: Option<WeakModel<BufferChangeSet>>,
90 },
91 Operations(Vec<Operation>),
92}
93
94pub enum BufferStoreEvent {
95 BufferAdded(Model<Buffer>),
96 BufferDropped(BufferId),
97 BufferChangedFilePath {
98 buffer: Model<Buffer>,
99 old_file: Option<Arc<dyn language::File>>,
100 },
101}
102
103#[derive(Default, Debug)]
104pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
105
106impl EventEmitter<BufferStoreEvent> for BufferStore {}
107
108impl RemoteBufferStore {
109 fn load_staged_text(
110 &self,
111 buffer_id: BufferId,
112 cx: &AppContext,
113 ) -> Task<Result<Option<String>>> {
114 let project_id = self.project_id;
115 let client = self.upstream_client.clone();
116 cx.background_executor().spawn(async move {
117 Ok(client
118 .request(proto::GetStagedText {
119 project_id,
120 buffer_id: buffer_id.to_proto(),
121 })
122 .await?
123 .staged_text)
124 })
125 }
126 pub fn wait_for_remote_buffer(
127 &mut self,
128 id: BufferId,
129 cx: &mut ModelContext<BufferStore>,
130 ) -> Task<Result<Model<Buffer>>> {
131 let (tx, rx) = oneshot::channel();
132 self.remote_buffer_listeners.entry(id).or_default().push(tx);
133
134 cx.spawn(|this, cx| async move {
135 if let Some(buffer) = this
136 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
137 .ok()
138 .flatten()
139 {
140 return Ok(buffer);
141 }
142
143 cx.background_executor()
144 .spawn(async move { rx.await? })
145 .await
146 })
147 }
148
149 fn save_remote_buffer(
150 &self,
151 buffer_handle: Model<Buffer>,
152 new_path: Option<proto::ProjectPath>,
153 cx: &ModelContext<BufferStore>,
154 ) -> Task<Result<()>> {
155 let buffer = buffer_handle.read(cx);
156 let buffer_id = buffer.remote_id().into();
157 let version = buffer.version();
158 let rpc = self.upstream_client.clone();
159 let project_id = self.project_id;
160 cx.spawn(move |_, mut cx| async move {
161 let response = rpc
162 .request(proto::SaveBuffer {
163 project_id,
164 buffer_id,
165 new_path,
166 version: serialize_version(&version),
167 })
168 .await?;
169 let version = deserialize_version(&response.version);
170 let mtime = response.mtime.map(|mtime| mtime.into());
171
172 buffer_handle.update(&mut cx, |buffer, cx| {
173 buffer.did_save(version.clone(), mtime, cx);
174 })?;
175
176 Ok(())
177 })
178 }
179
180 pub fn handle_create_buffer_for_peer(
181 &mut self,
182 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
183 replica_id: u16,
184 capability: Capability,
185 cx: &mut ModelContext<BufferStore>,
186 ) -> Result<Option<Model<Buffer>>> {
187 match envelope
188 .payload
189 .variant
190 .ok_or_else(|| anyhow!("missing variant"))?
191 {
192 proto::create_buffer_for_peer::Variant::State(mut state) => {
193 let buffer_id = BufferId::new(state.id)?;
194
195 let buffer_result = maybe!({
196 let mut buffer_file = None;
197 if let Some(file) = state.file.take() {
198 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
199 let worktree = self
200 .worktree_store
201 .read(cx)
202 .worktree_for_id(worktree_id, cx)
203 .ok_or_else(|| {
204 anyhow!("no worktree found for id {}", file.worktree_id)
205 })?;
206 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
207 as Arc<dyn language::File>);
208 }
209 Buffer::from_proto(replica_id, capability, state, buffer_file)
210 });
211
212 match buffer_result {
213 Ok(buffer) => {
214 let buffer = cx.new_model(|_| buffer);
215 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
216 }
217 Err(error) => {
218 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
219 for listener in listeners {
220 listener.send(Err(anyhow!(error.cloned()))).ok();
221 }
222 }
223 }
224 }
225 }
226 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
227 let buffer_id = BufferId::new(chunk.buffer_id)?;
228 let buffer = self
229 .loading_remote_buffers_by_id
230 .get(&buffer_id)
231 .cloned()
232 .ok_or_else(|| {
233 anyhow!(
234 "received chunk for buffer {} without initial state",
235 chunk.buffer_id
236 )
237 })?;
238
239 let result = maybe!({
240 let operations = chunk
241 .operations
242 .into_iter()
243 .map(language::proto::deserialize_operation)
244 .collect::<Result<Vec<_>>>()?;
245 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
246 anyhow::Ok(())
247 });
248
249 if let Err(error) = result {
250 self.loading_remote_buffers_by_id.remove(&buffer_id);
251 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
252 for listener in listeners {
253 listener.send(Err(error.cloned())).ok();
254 }
255 }
256 } else if chunk.is_last {
257 self.loading_remote_buffers_by_id.remove(&buffer_id);
258 if self.upstream_client.is_via_collab() {
259 // retain buffers sent by peers to avoid races.
260 self.shared_with_me.insert(buffer.clone());
261 }
262
263 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
264 for sender in senders {
265 sender.send(Ok(buffer.clone())).ok();
266 }
267 }
268 return Ok(Some(buffer));
269 }
270 }
271 }
272 return Ok(None);
273 }
274
275 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
276 self.loading_remote_buffers_by_id
277 .keys()
278 .copied()
279 .collect::<Vec<_>>()
280 }
281
282 pub fn deserialize_project_transaction(
283 &self,
284 message: proto::ProjectTransaction,
285 push_to_history: bool,
286 cx: &mut ModelContext<BufferStore>,
287 ) -> Task<Result<ProjectTransaction>> {
288 cx.spawn(|this, mut cx| async move {
289 let mut project_transaction = ProjectTransaction::default();
290 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
291 {
292 let buffer_id = BufferId::new(buffer_id)?;
293 let buffer = this
294 .update(&mut cx, |this, cx| {
295 this.wait_for_remote_buffer(buffer_id, cx)
296 })?
297 .await?;
298 let transaction = language::proto::deserialize_transaction(transaction)?;
299 project_transaction.0.insert(buffer, transaction);
300 }
301
302 for (buffer, transaction) in &project_transaction.0 {
303 buffer
304 .update(&mut cx, |buffer, _| {
305 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
306 })?
307 .await?;
308
309 if push_to_history {
310 buffer.update(&mut cx, |buffer, _| {
311 buffer.push_transaction(transaction.clone(), Instant::now());
312 })?;
313 }
314 }
315
316 Ok(project_transaction)
317 })
318 }
319
320 fn open_buffer(
321 &self,
322 path: Arc<Path>,
323 worktree: Model<Worktree>,
324 cx: &mut ModelContext<BufferStore>,
325 ) -> Task<Result<Model<Buffer>>> {
326 let worktree_id = worktree.read(cx).id().to_proto();
327 let project_id = self.project_id;
328 let client = self.upstream_client.clone();
329 let path_string = path.clone().to_string_lossy().to_string();
330 cx.spawn(move |this, mut cx| async move {
331 let response = client
332 .request(proto::OpenBufferByPath {
333 project_id,
334 worktree_id,
335 path: path_string,
336 })
337 .await?;
338 let buffer_id = BufferId::new(response.buffer_id)?;
339
340 let buffer = this
341 .update(&mut cx, {
342 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
343 })?
344 .await?;
345
346 Ok(buffer)
347 })
348 }
349
350 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
351 let create = self.upstream_client.request(proto::OpenNewBuffer {
352 project_id: self.project_id,
353 });
354 cx.spawn(|this, mut cx| async move {
355 let response = create.await?;
356 let buffer_id = BufferId::new(response.buffer_id)?;
357
358 this.update(&mut cx, |this, cx| {
359 this.wait_for_remote_buffer(buffer_id, cx)
360 })?
361 .await
362 })
363 }
364
365 fn reload_buffers(
366 &self,
367 buffers: HashSet<Model<Buffer>>,
368 push_to_history: bool,
369 cx: &mut ModelContext<BufferStore>,
370 ) -> Task<Result<ProjectTransaction>> {
371 let request = self.upstream_client.request(proto::ReloadBuffers {
372 project_id: self.project_id,
373 buffer_ids: buffers
374 .iter()
375 .map(|buffer| buffer.read(cx).remote_id().to_proto())
376 .collect(),
377 });
378
379 cx.spawn(|this, mut cx| async move {
380 let response = request
381 .await?
382 .transaction
383 .ok_or_else(|| anyhow!("missing transaction"))?;
384 this.update(&mut cx, |this, cx| {
385 this.deserialize_project_transaction(response, push_to_history, cx)
386 })?
387 .await
388 })
389 }
390}
391
392impl LocalBufferStore {
393 fn load_staged_text(
394 &self,
395 buffer: &Model<Buffer>,
396 cx: &AppContext,
397 ) -> Task<Result<Option<String>>> {
398 let Some(file) = buffer.read(cx).file() else {
399 return Task::ready(Err(anyhow!("buffer has no file")));
400 };
401 let worktree_id = file.worktree_id(cx);
402 let path = file.path().clone();
403 let Some(worktree) = self
404 .worktree_store
405 .read(cx)
406 .worktree_for_id(worktree_id, cx)
407 else {
408 return Task::ready(Err(anyhow!("no such worktree")));
409 };
410
411 worktree.read(cx).load_staged_file(path.as_ref(), cx)
412 }
413
414 fn save_local_buffer(
415 &self,
416 buffer_handle: Model<Buffer>,
417 worktree: Model<Worktree>,
418 path: Arc<Path>,
419 mut has_changed_file: bool,
420 cx: &mut ModelContext<BufferStore>,
421 ) -> Task<Result<()>> {
422 let buffer = buffer_handle.read(cx);
423
424 let text = buffer.as_rope().clone();
425 let line_ending = buffer.line_ending();
426 let version = buffer.version();
427 let buffer_id = buffer.remote_id();
428 if buffer
429 .file()
430 .is_some_and(|file| file.disk_state() == DiskState::New)
431 {
432 has_changed_file = true;
433 }
434
435 let save = worktree.update(cx, |worktree, cx| {
436 worktree.write_file(path.as_ref(), text, line_ending, cx)
437 });
438
439 cx.spawn(move |this, mut cx| async move {
440 let new_file = save.await?;
441 let mtime = new_file.disk_state().mtime();
442 this.update(&mut cx, |this, cx| {
443 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
444 if has_changed_file {
445 downstream_client
446 .send(proto::UpdateBufferFile {
447 project_id,
448 buffer_id: buffer_id.to_proto(),
449 file: Some(language::File::to_proto(&*new_file, cx)),
450 })
451 .log_err();
452 }
453 downstream_client
454 .send(proto::BufferSaved {
455 project_id,
456 buffer_id: buffer_id.to_proto(),
457 version: serialize_version(&version),
458 mtime: mtime.map(|time| time.into()),
459 })
460 .log_err();
461 }
462 })?;
463 buffer_handle.update(&mut cx, |buffer, cx| {
464 if has_changed_file {
465 buffer.file_updated(new_file, cx);
466 }
467 buffer.did_save(version.clone(), mtime, cx);
468 })
469 })
470 }
471
472 fn subscribe_to_worktree(
473 &mut self,
474 worktree: &Model<Worktree>,
475 cx: &mut ModelContext<BufferStore>,
476 ) {
477 cx.subscribe(worktree, |this, worktree, event, cx| {
478 if worktree.read(cx).is_local() {
479 match event {
480 worktree::Event::UpdatedEntries(changes) => {
481 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
482 }
483 worktree::Event::UpdatedGitRepositories(updated_repos) => {
484 Self::local_worktree_git_repos_changed(
485 this,
486 worktree.clone(),
487 updated_repos,
488 cx,
489 )
490 }
491 _ => {}
492 }
493 }
494 })
495 .detach();
496 }
497
498 fn local_worktree_entries_changed(
499 this: &mut BufferStore,
500 worktree_handle: &Model<Worktree>,
501 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
502 cx: &mut ModelContext<BufferStore>,
503 ) {
504 let snapshot = worktree_handle.read(cx).snapshot();
505 for (path, entry_id, _) in changes {
506 Self::local_worktree_entry_changed(
507 this,
508 *entry_id,
509 path,
510 worktree_handle,
511 &snapshot,
512 cx,
513 );
514 }
515 }
516
517 fn local_worktree_git_repos_changed(
518 this: &mut BufferStore,
519 worktree_handle: Model<Worktree>,
520 changed_repos: &UpdatedGitRepositoriesSet,
521 cx: &mut ModelContext<BufferStore>,
522 ) {
523 debug_assert!(worktree_handle.read(cx).is_local());
524
525 let buffer_change_sets = this
526 .opened_buffers
527 .values()
528 .filter_map(|buffer| {
529 if let OpenBuffer::Complete {
530 buffer,
531 unstaged_changes,
532 } = buffer
533 {
534 let buffer = buffer.upgrade()?.read(cx);
535 let file = File::from_dyn(buffer.file())?;
536 if file.worktree != worktree_handle {
537 return None;
538 }
539 changed_repos
540 .iter()
541 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
542 let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
543 let snapshot = buffer.text_snapshot();
544 Some((unstaged_changes, snapshot, file.path.clone()))
545 } else {
546 None
547 }
548 })
549 .collect::<Vec<_>>();
550
551 if buffer_change_sets.is_empty() {
552 return;
553 }
554
555 cx.spawn(move |this, mut cx| async move {
556 let snapshot =
557 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
558 let diff_bases_by_buffer = cx
559 .background_executor()
560 .spawn(async move {
561 buffer_change_sets
562 .into_iter()
563 .filter_map(|(change_set, buffer_snapshot, path)| {
564 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
565 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
566 let base_text = local_repo_entry.repo().load_index_text(&relative_path);
567 Some((change_set, buffer_snapshot, base_text))
568 })
569 .collect::<Vec<_>>()
570 })
571 .await;
572
573 this.update(&mut cx, |this, cx| {
574 for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
575 change_set.update(cx, |change_set, cx| {
576 if let Some(staged_text) = staged_text.clone() {
577 let _ =
578 change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
579 } else {
580 change_set.unset_base_text(buffer_snapshot.clone(), cx);
581 }
582 });
583
584 if let Some((client, project_id)) = &this.downstream_client.clone() {
585 client
586 .send(proto::UpdateDiffBase {
587 project_id: *project_id,
588 buffer_id: buffer_snapshot.remote_id().to_proto(),
589 staged_text,
590 })
591 .log_err();
592 }
593 }
594 })
595 })
596 .detach_and_log_err(cx);
597 }
598
599 fn local_worktree_entry_changed(
600 this: &mut BufferStore,
601 entry_id: ProjectEntryId,
602 path: &Arc<Path>,
603 worktree: &Model<worktree::Worktree>,
604 snapshot: &worktree::Snapshot,
605 cx: &mut ModelContext<BufferStore>,
606 ) -> Option<()> {
607 let project_path = ProjectPath {
608 worktree_id: snapshot.id(),
609 path: path.clone(),
610 };
611
612 let buffer_id = {
613 let local = this.as_local_mut()?;
614 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
615 Some(&buffer_id) => buffer_id,
616 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
617 }
618 };
619
620 let buffer = if let Some(buffer) = this.get(buffer_id) {
621 Some(buffer)
622 } else {
623 this.opened_buffers.remove(&buffer_id);
624 None
625 };
626
627 let buffer = if let Some(buffer) = buffer {
628 buffer
629 } else {
630 let this = this.as_local_mut()?;
631 this.local_buffer_ids_by_path.remove(&project_path);
632 this.local_buffer_ids_by_entry_id.remove(&entry_id);
633 return None;
634 };
635
636 let events = buffer.update(cx, |buffer, cx| {
637 let local = this.as_local_mut()?;
638 let file = buffer.file()?;
639 let old_file = File::from_dyn(Some(file))?;
640 if old_file.worktree != *worktree {
641 return None;
642 }
643
644 let snapshot_entry = old_file
645 .entry_id
646 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
647 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
648
649 let new_file = if let Some(entry) = snapshot_entry {
650 File {
651 disk_state: match entry.mtime {
652 Some(mtime) => DiskState::Present { mtime },
653 None => old_file.disk_state,
654 },
655 is_local: true,
656 entry_id: Some(entry.id),
657 path: entry.path.clone(),
658 worktree: worktree.clone(),
659 is_private: entry.is_private,
660 }
661 } else {
662 File {
663 disk_state: DiskState::Deleted,
664 is_local: true,
665 entry_id: old_file.entry_id,
666 path: old_file.path.clone(),
667 worktree: worktree.clone(),
668 is_private: old_file.is_private,
669 }
670 };
671
672 if new_file == *old_file {
673 return None;
674 }
675
676 let mut events = Vec::new();
677 if new_file.path != old_file.path {
678 local.local_buffer_ids_by_path.remove(&ProjectPath {
679 path: old_file.path.clone(),
680 worktree_id: old_file.worktree_id(cx),
681 });
682 local.local_buffer_ids_by_path.insert(
683 ProjectPath {
684 worktree_id: new_file.worktree_id(cx),
685 path: new_file.path.clone(),
686 },
687 buffer_id,
688 );
689 events.push(BufferStoreEvent::BufferChangedFilePath {
690 buffer: cx.handle(),
691 old_file: buffer.file().cloned(),
692 });
693 }
694
695 if new_file.entry_id != old_file.entry_id {
696 if let Some(entry_id) = old_file.entry_id {
697 local.local_buffer_ids_by_entry_id.remove(&entry_id);
698 }
699 if let Some(entry_id) = new_file.entry_id {
700 local
701 .local_buffer_ids_by_entry_id
702 .insert(entry_id, buffer_id);
703 }
704 }
705
706 if let Some((client, project_id)) = &this.downstream_client {
707 client
708 .send(proto::UpdateBufferFile {
709 project_id: *project_id,
710 buffer_id: buffer_id.to_proto(),
711 file: Some(new_file.to_proto(cx)),
712 })
713 .ok();
714 }
715
716 buffer.file_updated(Arc::new(new_file), cx);
717 Some(events)
718 })?;
719
720 for event in events {
721 cx.emit(event);
722 }
723
724 None
725 }
726
727 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
728 let file = File::from_dyn(buffer.read(cx).file())?;
729
730 let remote_id = buffer.read(cx).remote_id();
731 if let Some(entry_id) = file.entry_id {
732 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
733 Some(_) => {
734 return None;
735 }
736 None => {
737 self.local_buffer_ids_by_entry_id
738 .insert(entry_id, remote_id);
739 }
740 }
741 };
742 self.local_buffer_ids_by_path.insert(
743 ProjectPath {
744 worktree_id: file.worktree_id(cx),
745 path: file.path.clone(),
746 },
747 remote_id,
748 );
749
750 Some(())
751 }
752
753 fn save_buffer(
754 &self,
755 buffer: Model<Buffer>,
756 cx: &mut ModelContext<BufferStore>,
757 ) -> Task<Result<()>> {
758 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
759 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
760 };
761 let worktree = file.worktree.clone();
762 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
763 }
764
765 fn save_buffer_as(
766 &self,
767 buffer: Model<Buffer>,
768 path: ProjectPath,
769 cx: &mut ModelContext<BufferStore>,
770 ) -> Task<Result<()>> {
771 let Some(worktree) = self
772 .worktree_store
773 .read(cx)
774 .worktree_for_id(path.worktree_id, cx)
775 else {
776 return Task::ready(Err(anyhow!("no such worktree")));
777 };
778 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
779 }
780
781 fn open_buffer(
782 &self,
783 path: Arc<Path>,
784 worktree: Model<Worktree>,
785 cx: &mut ModelContext<BufferStore>,
786 ) -> Task<Result<Model<Buffer>>> {
787 let load_buffer = worktree.update(cx, |worktree, cx| {
788 let load_file = worktree.load_file(path.as_ref(), cx);
789 let reservation = cx.reserve_model();
790 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
791 cx.spawn(move |_, mut cx| async move {
792 let loaded = load_file.await?;
793 let text_buffer = cx
794 .background_executor()
795 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
796 .await;
797 cx.insert_model(reservation, |_| {
798 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
799 })
800 })
801 });
802
803 cx.spawn(move |this, mut cx| async move {
804 let buffer = match load_buffer.await {
805 Ok(buffer) => Ok(buffer),
806 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
807 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
808 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
809 Buffer::build(
810 text_buffer,
811 Some(Arc::new(File {
812 worktree,
813 path,
814 disk_state: DiskState::New,
815 entry_id: None,
816 is_local: true,
817 is_private: false,
818 })),
819 Capability::ReadWrite,
820 )
821 }),
822 Err(e) => Err(e),
823 }?;
824 this.update(&mut cx, |this, cx| {
825 this.add_buffer(buffer.clone(), cx)?;
826 let buffer_id = buffer.read(cx).remote_id();
827 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
828 let this = this.as_local_mut().unwrap();
829 this.local_buffer_ids_by_path.insert(
830 ProjectPath {
831 worktree_id: file.worktree_id(cx),
832 path: file.path.clone(),
833 },
834 buffer_id,
835 );
836
837 if let Some(entry_id) = file.entry_id {
838 this.local_buffer_ids_by_entry_id
839 .insert(entry_id, buffer_id);
840 }
841 }
842
843 anyhow::Ok(())
844 })??;
845
846 Ok(buffer)
847 })
848 }
849
850 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
851 cx.spawn(|buffer_store, mut cx| async move {
852 let buffer = cx.new_model(|cx| {
853 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
854 })?;
855 buffer_store.update(&mut cx, |buffer_store, cx| {
856 buffer_store.add_buffer(buffer.clone(), cx).log_err();
857 })?;
858 Ok(buffer)
859 })
860 }
861
862 fn reload_buffers(
863 &self,
864 buffers: HashSet<Model<Buffer>>,
865 push_to_history: bool,
866 cx: &mut ModelContext<BufferStore>,
867 ) -> Task<Result<ProjectTransaction>> {
868 cx.spawn(move |_, mut cx| async move {
869 let mut project_transaction = ProjectTransaction::default();
870 for buffer in buffers {
871 let transaction = buffer
872 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
873 .await?;
874 buffer.update(&mut cx, |buffer, cx| {
875 if let Some(transaction) = transaction {
876 if !push_to_history {
877 buffer.forget_transaction(transaction.id);
878 }
879 project_transaction.0.insert(cx.handle(), transaction);
880 }
881 })?;
882 }
883
884 Ok(project_transaction)
885 })
886 }
887}
888
889impl BufferStore {
890 pub fn init(client: &AnyProtoClient) {
891 client.add_model_message_handler(Self::handle_buffer_reloaded);
892 client.add_model_message_handler(Self::handle_buffer_saved);
893 client.add_model_message_handler(Self::handle_update_buffer_file);
894 client.add_model_request_handler(Self::handle_save_buffer);
895 client.add_model_request_handler(Self::handle_blame_buffer);
896 client.add_model_request_handler(Self::handle_reload_buffers);
897 client.add_model_request_handler(Self::handle_get_permalink_to_line);
898 client.add_model_request_handler(Self::handle_get_staged_text);
899 client.add_model_message_handler(Self::handle_update_diff_base);
900 }
901
902 /// Creates a buffer store, optionally retaining its buffers.
903 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
904 Self {
905 state: BufferStoreState::Local(LocalBufferStore {
906 local_buffer_ids_by_path: Default::default(),
907 local_buffer_ids_by_entry_id: Default::default(),
908 worktree_store: worktree_store.clone(),
909 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
910 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
911 let this = this.as_local_mut().unwrap();
912 this.subscribe_to_worktree(worktree, cx);
913 }
914 }),
915 }),
916 downstream_client: None,
917 opened_buffers: Default::default(),
918 shared_buffers: Default::default(),
919 loading_buffers: Default::default(),
920 loading_change_sets: Default::default(),
921 worktree_store,
922 }
923 }
924
925 pub fn remote(
926 worktree_store: Model<WorktreeStore>,
927 upstream_client: AnyProtoClient,
928 remote_id: u64,
929 _cx: &mut ModelContext<Self>,
930 ) -> Self {
931 Self {
932 state: BufferStoreState::Remote(RemoteBufferStore {
933 shared_with_me: Default::default(),
934 loading_remote_buffers_by_id: Default::default(),
935 remote_buffer_listeners: Default::default(),
936 project_id: remote_id,
937 upstream_client,
938 worktree_store: worktree_store.clone(),
939 }),
940 downstream_client: None,
941 opened_buffers: Default::default(),
942 loading_buffers: Default::default(),
943 loading_change_sets: Default::default(),
944 shared_buffers: Default::default(),
945 worktree_store,
946 }
947 }
948
949 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
950 match &mut self.state {
951 BufferStoreState::Local(state) => Some(state),
952 _ => None,
953 }
954 }
955
956 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
957 match &mut self.state {
958 BufferStoreState::Remote(state) => Some(state),
959 _ => None,
960 }
961 }
962
963 fn as_remote(&self) -> Option<&RemoteBufferStore> {
964 match &self.state {
965 BufferStoreState::Remote(state) => Some(state),
966 _ => None,
967 }
968 }
969
970 pub fn open_buffer(
971 &mut self,
972 project_path: ProjectPath,
973 cx: &mut ModelContext<Self>,
974 ) -> Task<Result<Model<Buffer>>> {
975 if let Some(buffer) = self.get_by_path(&project_path, cx) {
976 return Task::ready(Ok(buffer));
977 }
978
979 let task = match self.loading_buffers.entry(project_path.clone()) {
980 hash_map::Entry::Occupied(e) => e.get().clone(),
981 hash_map::Entry::Vacant(entry) => {
982 let path = project_path.path.clone();
983 let Some(worktree) = self
984 .worktree_store
985 .read(cx)
986 .worktree_for_id(project_path.worktree_id, cx)
987 else {
988 return Task::ready(Err(anyhow!("no such worktree")));
989 };
990 let load_buffer = match &self.state {
991 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
992 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
993 };
994
995 entry
996 .insert(
997 cx.spawn(move |this, mut cx| async move {
998 let load_result = load_buffer.await;
999 this.update(&mut cx, |this, _cx| {
1000 // Record the fact that the buffer is no longer loading.
1001 this.loading_buffers.remove(&project_path);
1002 })
1003 .ok();
1004 load_result.map_err(Arc::new)
1005 })
1006 .shared(),
1007 )
1008 .clone()
1009 }
1010 };
1011
1012 cx.background_executor()
1013 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1014 }
1015
1016 pub fn open_unstaged_changes(
1017 &mut self,
1018 buffer: Model<Buffer>,
1019 cx: &mut ModelContext<Self>,
1020 ) -> Task<Result<Model<BufferChangeSet>>> {
1021 let buffer_id = buffer.read(cx).remote_id();
1022 if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
1023 return Task::ready(Ok(change_set));
1024 }
1025
1026 let task = match self.loading_change_sets.entry(buffer_id) {
1027 hash_map::Entry::Occupied(e) => e.get().clone(),
1028 hash_map::Entry::Vacant(entry) => {
1029 let load = match &self.state {
1030 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1031 BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
1032 };
1033
1034 entry
1035 .insert(
1036 cx.spawn(move |this, cx| async move {
1037 Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
1038 .await
1039 .map_err(Arc::new)
1040 })
1041 .shared(),
1042 )
1043 .clone()
1044 }
1045 };
1046
1047 cx.background_executor()
1048 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1049 }
1050
1051 #[cfg(any(test, feature = "test-support"))]
1052 pub fn set_change_set(&mut self, buffer_id: BufferId, change_set: Model<BufferChangeSet>) {
1053 self.loading_change_sets
1054 .insert(buffer_id, Task::ready(Ok(change_set)).shared());
1055 }
1056
1057 pub async fn open_unstaged_changes_internal(
1058 this: WeakModel<Self>,
1059 text: Result<Option<String>>,
1060 buffer: Model<Buffer>,
1061 mut cx: AsyncAppContext,
1062 ) -> Result<Model<BufferChangeSet>> {
1063 let text = match text {
1064 Err(e) => {
1065 this.update(&mut cx, |this, cx| {
1066 let buffer_id = buffer.read(cx).remote_id();
1067 this.loading_change_sets.remove(&buffer_id);
1068 })?;
1069 return Err(e);
1070 }
1071 Ok(text) => text,
1072 };
1073
1074 let change_set = buffer.update(&mut cx, |buffer, cx| {
1075 cx.new_model(|_| BufferChangeSet::new(buffer))
1076 })?;
1077
1078 if let Some(text) = text {
1079 change_set
1080 .update(&mut cx, |change_set, cx| {
1081 let snapshot = buffer.read(cx).text_snapshot();
1082 change_set.set_base_text(text, snapshot, cx)
1083 })?
1084 .await
1085 .ok();
1086 }
1087
1088 this.update(&mut cx, |this, cx| {
1089 let buffer_id = buffer.read(cx).remote_id();
1090 this.loading_change_sets.remove(&buffer_id);
1091 if let Some(OpenBuffer::Complete {
1092 unstaged_changes, ..
1093 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1094 {
1095 *unstaged_changes = Some(change_set.downgrade());
1096 }
1097 })?;
1098
1099 Ok(change_set)
1100 }
1101
1102 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1103 match &self.state {
1104 BufferStoreState::Local(this) => this.create_buffer(cx),
1105 BufferStoreState::Remote(this) => this.create_buffer(cx),
1106 }
1107 }
1108
1109 pub fn save_buffer(
1110 &mut self,
1111 buffer: Model<Buffer>,
1112 cx: &mut ModelContext<Self>,
1113 ) -> Task<Result<()>> {
1114 match &mut self.state {
1115 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1116 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1117 }
1118 }
1119
1120 pub fn save_buffer_as(
1121 &mut self,
1122 buffer: Model<Buffer>,
1123 path: ProjectPath,
1124 cx: &mut ModelContext<Self>,
1125 ) -> Task<Result<()>> {
1126 let old_file = buffer.read(cx).file().cloned();
1127 let task = match &self.state {
1128 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1129 BufferStoreState::Remote(this) => {
1130 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1131 }
1132 };
1133 cx.spawn(|this, mut cx| async move {
1134 task.await?;
1135 this.update(&mut cx, |_, cx| {
1136 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1137 })
1138 })
1139 }
1140
1141 pub fn blame_buffer(
1142 &self,
1143 buffer: &Model<Buffer>,
1144 version: Option<clock::Global>,
1145 cx: &AppContext,
1146 ) -> Task<Result<Option<Blame>>> {
1147 let buffer = buffer.read(cx);
1148 let Some(file) = File::from_dyn(buffer.file()) else {
1149 return Task::ready(Err(anyhow!("buffer has no file")));
1150 };
1151
1152 match file.worktree.clone().read(cx) {
1153 Worktree::Local(worktree) => {
1154 let worktree = worktree.snapshot();
1155 let blame_params = maybe!({
1156 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1157 Some(repo_for_path) => repo_for_path,
1158 None => return Ok(None),
1159 };
1160
1161 let relative_path = repo_entry
1162 .relativize(&worktree, &file.path)
1163 .context("failed to relativize buffer path")?;
1164
1165 let repo = local_repo_entry.repo().clone();
1166
1167 let content = match version {
1168 Some(version) => buffer.rope_for_version(&version).clone(),
1169 None => buffer.as_rope().clone(),
1170 };
1171
1172 anyhow::Ok(Some((repo, relative_path, content)))
1173 });
1174
1175 cx.background_executor().spawn(async move {
1176 let Some((repo, relative_path, content)) = blame_params? else {
1177 return Ok(None);
1178 };
1179 repo.blame(&relative_path, content)
1180 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1181 .map(Some)
1182 })
1183 }
1184 Worktree::Remote(worktree) => {
1185 let buffer_id = buffer.remote_id();
1186 let version = buffer.version();
1187 let project_id = worktree.project_id();
1188 let client = worktree.client();
1189 cx.spawn(|_| async move {
1190 let response = client
1191 .request(proto::BlameBuffer {
1192 project_id,
1193 buffer_id: buffer_id.into(),
1194 version: serialize_version(&version),
1195 })
1196 .await?;
1197 Ok(deserialize_blame_buffer_response(response))
1198 })
1199 }
1200 }
1201 }
1202
1203 pub fn get_permalink_to_line(
1204 &self,
1205 buffer: &Model<Buffer>,
1206 selection: Range<u32>,
1207 cx: &AppContext,
1208 ) -> Task<Result<url::Url>> {
1209 let buffer = buffer.read(cx);
1210 let Some(file) = File::from_dyn(buffer.file()) else {
1211 return Task::ready(Err(anyhow!("buffer has no file")));
1212 };
1213
1214 match file.worktree.read(cx) {
1215 Worktree::Local(worktree) => {
1216 let Some(repo) = worktree.local_git_repo(file.path()) else {
1217 return Task::ready(Err(anyhow!("no repository for buffer found")));
1218 };
1219
1220 let path = file.path().clone();
1221
1222 cx.spawn(|cx| async move {
1223 const REMOTE_NAME: &str = "origin";
1224 let origin_url = repo
1225 .remote_url(REMOTE_NAME)
1226 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1227
1228 let sha = repo
1229 .head_sha()
1230 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1231
1232 let provider_registry =
1233 cx.update(GitHostingProviderRegistry::default_global)?;
1234
1235 let (provider, remote) =
1236 parse_git_remote_url(provider_registry, &origin_url)
1237 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1238
1239 let path = path
1240 .to_str()
1241 .context("failed to convert buffer path to string")?;
1242
1243 Ok(provider.build_permalink(
1244 remote,
1245 BuildPermalinkParams {
1246 sha: &sha,
1247 path,
1248 selection: Some(selection),
1249 },
1250 ))
1251 })
1252 }
1253 Worktree::Remote(worktree) => {
1254 let buffer_id = buffer.remote_id();
1255 let project_id = worktree.project_id();
1256 let client = worktree.client();
1257 cx.spawn(|_| async move {
1258 let response = client
1259 .request(proto::GetPermalinkToLine {
1260 project_id,
1261 buffer_id: buffer_id.into(),
1262 selection: Some(proto::Range {
1263 start: selection.start as u64,
1264 end: selection.end as u64,
1265 }),
1266 })
1267 .await?;
1268
1269 url::Url::parse(&response.permalink).context("failed to parse permalink")
1270 })
1271 }
1272 }
1273 }
1274
1275 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1276 let remote_id = buffer.read(cx).remote_id();
1277 let is_remote = buffer.read(cx).replica_id() != 0;
1278 let open_buffer = OpenBuffer::Complete {
1279 buffer: buffer.downgrade(),
1280 unstaged_changes: None,
1281 };
1282
1283 let handle = cx.handle().downgrade();
1284 buffer.update(cx, move |_, cx| {
1285 cx.on_release(move |buffer, cx| {
1286 handle
1287 .update(cx, |_, cx| {
1288 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1289 })
1290 .ok();
1291 })
1292 .detach()
1293 });
1294
1295 match self.opened_buffers.entry(remote_id) {
1296 hash_map::Entry::Vacant(entry) => {
1297 entry.insert(open_buffer);
1298 }
1299 hash_map::Entry::Occupied(mut entry) => {
1300 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1301 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1302 } else if entry.get().upgrade().is_some() {
1303 if is_remote {
1304 return Ok(());
1305 } else {
1306 debug_panic!("buffer {} was already registered", remote_id);
1307 Err(anyhow!("buffer {} was already registered", remote_id))?;
1308 }
1309 }
1310 entry.insert(open_buffer);
1311 }
1312 }
1313
1314 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1315 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1316 Ok(())
1317 }
1318
1319 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1320 self.opened_buffers
1321 .values()
1322 .filter_map(|buffer| buffer.upgrade())
1323 }
1324
1325 pub fn loading_buffers(
1326 &self,
1327 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Model<Buffer>>>)> {
1328 self.loading_buffers.iter().map(|(path, task)| {
1329 let task = task.clone();
1330 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1331 })
1332 }
1333
1334 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1335 self.buffers().find_map(|buffer| {
1336 let file = File::from_dyn(buffer.read(cx).file())?;
1337 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1338 Some(buffer)
1339 } else {
1340 None
1341 }
1342 })
1343 }
1344
1345 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1346 self.opened_buffers.get(&buffer_id)?.upgrade()
1347 }
1348
1349 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1350 self.get(buffer_id)
1351 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1352 }
1353
1354 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1355 self.get(buffer_id).or_else(|| {
1356 self.as_remote()
1357 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1358 })
1359 }
1360
1361 pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Model<BufferChangeSet>> {
1362 if let OpenBuffer::Complete {
1363 unstaged_changes, ..
1364 } = self.opened_buffers.get(&buffer_id)?
1365 {
1366 unstaged_changes.as_ref()?.upgrade()
1367 } else {
1368 None
1369 }
1370 }
1371
1372 pub fn buffer_version_info(
1373 &self,
1374 cx: &AppContext,
1375 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1376 let buffers = self
1377 .buffers()
1378 .map(|buffer| {
1379 let buffer = buffer.read(cx);
1380 proto::BufferVersion {
1381 id: buffer.remote_id().into(),
1382 version: language::proto::serialize_version(&buffer.version),
1383 }
1384 })
1385 .collect();
1386 let incomplete_buffer_ids = self
1387 .as_remote()
1388 .map(|remote| remote.incomplete_buffer_ids())
1389 .unwrap_or_default();
1390 (buffers, incomplete_buffer_ids)
1391 }
1392
1393 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1394 for open_buffer in self.opened_buffers.values_mut() {
1395 if let Some(buffer) = open_buffer.upgrade() {
1396 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1397 }
1398 }
1399
1400 for buffer in self.buffers() {
1401 buffer.update(cx, |buffer, cx| {
1402 buffer.set_capability(Capability::ReadOnly, cx)
1403 });
1404 }
1405
1406 if let Some(remote) = self.as_remote_mut() {
1407 // Wake up all futures currently waiting on a buffer to get opened,
1408 // to give them a chance to fail now that we've disconnected.
1409 remote.remote_buffer_listeners.clear()
1410 }
1411 }
1412
1413 pub fn shared(
1414 &mut self,
1415 remote_id: u64,
1416 downstream_client: AnyProtoClient,
1417 _cx: &mut AppContext,
1418 ) {
1419 self.downstream_client = Some((downstream_client, remote_id));
1420 }
1421
1422 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1423 self.downstream_client.take();
1424 self.forget_shared_buffers();
1425 }
1426
1427 pub fn discard_incomplete(&mut self) {
1428 self.opened_buffers
1429 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1430 }
1431
1432 pub fn find_search_candidates(
1433 &mut self,
1434 query: &SearchQuery,
1435 mut limit: usize,
1436 fs: Arc<dyn Fs>,
1437 cx: &mut ModelContext<Self>,
1438 ) -> Receiver<Model<Buffer>> {
1439 let (tx, rx) = smol::channel::unbounded();
1440 let mut open_buffers = HashSet::default();
1441 let mut unnamed_buffers = Vec::new();
1442 for handle in self.buffers() {
1443 let buffer = handle.read(cx);
1444 if let Some(entry_id) = buffer.entry_id(cx) {
1445 open_buffers.insert(entry_id);
1446 } else {
1447 limit = limit.saturating_sub(1);
1448 unnamed_buffers.push(handle)
1449 };
1450 }
1451
1452 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1453 let mut project_paths_rx = self
1454 .worktree_store
1455 .update(cx, |worktree_store, cx| {
1456 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1457 })
1458 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1459
1460 cx.spawn(|this, mut cx| async move {
1461 for buffer in unnamed_buffers {
1462 tx.send(buffer).await.ok();
1463 }
1464
1465 while let Some(project_paths) = project_paths_rx.next().await {
1466 let buffers = this.update(&mut cx, |this, cx| {
1467 project_paths
1468 .into_iter()
1469 .map(|project_path| this.open_buffer(project_path, cx))
1470 .collect::<Vec<_>>()
1471 })?;
1472 for buffer_task in buffers {
1473 if let Some(buffer) = buffer_task.await.log_err() {
1474 if tx.send(buffer).await.is_err() {
1475 return anyhow::Ok(());
1476 }
1477 }
1478 }
1479 }
1480 anyhow::Ok(())
1481 })
1482 .detach();
1483 rx
1484 }
1485
1486 pub fn recalculate_buffer_diffs(
1487 &mut self,
1488 buffers: Vec<Model<Buffer>>,
1489 cx: &mut ModelContext<Self>,
1490 ) -> impl Future<Output = ()> {
1491 let mut futures = Vec::new();
1492 for buffer in buffers {
1493 let buffer = buffer.read(cx).text_snapshot();
1494 if let Some(OpenBuffer::Complete {
1495 unstaged_changes, ..
1496 }) = self.opened_buffers.get_mut(&buffer.remote_id())
1497 {
1498 if let Some(unstaged_changes) = unstaged_changes
1499 .as_ref()
1500 .and_then(|changes| changes.upgrade())
1501 {
1502 unstaged_changes.update(cx, |unstaged_changes, cx| {
1503 futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
1504 });
1505 } else {
1506 unstaged_changes.take();
1507 }
1508 }
1509 }
1510 async move {
1511 futures::future::join_all(futures).await;
1512 }
1513 }
1514
1515 fn on_buffer_event(
1516 &mut self,
1517 buffer: Model<Buffer>,
1518 event: &BufferEvent,
1519 cx: &mut ModelContext<Self>,
1520 ) {
1521 match event {
1522 BufferEvent::FileHandleChanged => {
1523 if let Some(local) = self.as_local_mut() {
1524 local.buffer_changed_file(buffer, cx);
1525 }
1526 }
1527 BufferEvent::Reloaded => {
1528 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1529 return;
1530 };
1531 let buffer = buffer.read(cx);
1532 downstream_client
1533 .send(proto::BufferReloaded {
1534 project_id: *project_id,
1535 buffer_id: buffer.remote_id().to_proto(),
1536 version: serialize_version(&buffer.version()),
1537 mtime: buffer.saved_mtime().map(|t| t.into()),
1538 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1539 })
1540 .log_err();
1541 }
1542 _ => {}
1543 }
1544 }
1545
1546 pub async fn handle_update_buffer(
1547 this: Model<Self>,
1548 envelope: TypedEnvelope<proto::UpdateBuffer>,
1549 mut cx: AsyncAppContext,
1550 ) -> Result<proto::Ack> {
1551 let payload = envelope.payload.clone();
1552 let buffer_id = BufferId::new(payload.buffer_id)?;
1553 let ops = payload
1554 .operations
1555 .into_iter()
1556 .map(language::proto::deserialize_operation)
1557 .collect::<Result<Vec<_>, _>>()?;
1558 this.update(&mut cx, |this, cx| {
1559 match this.opened_buffers.entry(buffer_id) {
1560 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1561 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1562 OpenBuffer::Complete { buffer, .. } => {
1563 if let Some(buffer) = buffer.upgrade() {
1564 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1565 }
1566 }
1567 },
1568 hash_map::Entry::Vacant(e) => {
1569 e.insert(OpenBuffer::Operations(ops));
1570 }
1571 }
1572 Ok(proto::Ack {})
1573 })?
1574 }
1575
1576 pub fn register_shared_lsp_handle(
1577 &mut self,
1578 peer_id: proto::PeerId,
1579 buffer_id: BufferId,
1580 handle: OpenLspBufferHandle,
1581 ) {
1582 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
1583 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
1584 buffer.lsp_handle = Some(handle);
1585 return;
1586 }
1587 }
1588 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1589 }
1590
1591 pub fn handle_synchronize_buffers(
1592 &mut self,
1593 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1594 cx: &mut ModelContext<Self>,
1595 client: Arc<Client>,
1596 ) -> Result<proto::SynchronizeBuffersResponse> {
1597 let project_id = envelope.payload.project_id;
1598 let mut response = proto::SynchronizeBuffersResponse {
1599 buffers: Default::default(),
1600 };
1601 let Some(guest_id) = envelope.original_sender_id else {
1602 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1603 };
1604
1605 self.shared_buffers.entry(guest_id).or_default().clear();
1606 for buffer in envelope.payload.buffers {
1607 let buffer_id = BufferId::new(buffer.id)?;
1608 let remote_version = language::proto::deserialize_version(&buffer.version);
1609 if let Some(buffer) = self.get(buffer_id) {
1610 self.shared_buffers
1611 .entry(guest_id)
1612 .or_default()
1613 .entry(buffer_id)
1614 .or_insert_with(|| SharedBuffer {
1615 buffer: buffer.clone(),
1616 unstaged_changes: None,
1617 lsp_handle: None,
1618 });
1619
1620 let buffer = buffer.read(cx);
1621 response.buffers.push(proto::BufferVersion {
1622 id: buffer_id.into(),
1623 version: language::proto::serialize_version(&buffer.version),
1624 });
1625
1626 let operations = buffer.serialize_ops(Some(remote_version), cx);
1627 let client = client.clone();
1628 if let Some(file) = buffer.file() {
1629 client
1630 .send(proto::UpdateBufferFile {
1631 project_id,
1632 buffer_id: buffer_id.into(),
1633 file: Some(file.to_proto(cx)),
1634 })
1635 .log_err();
1636 }
1637
1638 // todo!(max): do something
1639 // client
1640 // .send(proto::UpdateStagedText {
1641 // project_id,
1642 // buffer_id: buffer_id.into(),
1643 // diff_base: buffer.diff_base().map(ToString::to_string),
1644 // })
1645 // .log_err();
1646
1647 client
1648 .send(proto::BufferReloaded {
1649 project_id,
1650 buffer_id: buffer_id.into(),
1651 version: language::proto::serialize_version(buffer.saved_version()),
1652 mtime: buffer.saved_mtime().map(|time| time.into()),
1653 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1654 as i32,
1655 })
1656 .log_err();
1657
1658 cx.background_executor()
1659 .spawn(
1660 async move {
1661 let operations = operations.await;
1662 for chunk in split_operations(operations) {
1663 client
1664 .request(proto::UpdateBuffer {
1665 project_id,
1666 buffer_id: buffer_id.into(),
1667 operations: chunk,
1668 })
1669 .await?;
1670 }
1671 anyhow::Ok(())
1672 }
1673 .log_err(),
1674 )
1675 .detach();
1676 }
1677 }
1678 Ok(response)
1679 }
1680
1681 pub fn handle_create_buffer_for_peer(
1682 &mut self,
1683 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1684 replica_id: u16,
1685 capability: Capability,
1686 cx: &mut ModelContext<Self>,
1687 ) -> Result<()> {
1688 let Some(remote) = self.as_remote_mut() else {
1689 return Err(anyhow!("buffer store is not a remote"));
1690 };
1691
1692 if let Some(buffer) =
1693 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1694 {
1695 self.add_buffer(buffer, cx)?;
1696 }
1697
1698 Ok(())
1699 }
1700
1701 pub async fn handle_update_buffer_file(
1702 this: Model<Self>,
1703 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1704 mut cx: AsyncAppContext,
1705 ) -> Result<()> {
1706 let buffer_id = envelope.payload.buffer_id;
1707 let buffer_id = BufferId::new(buffer_id)?;
1708
1709 this.update(&mut cx, |this, cx| {
1710 let payload = envelope.payload.clone();
1711 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1712 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1713 let worktree = this
1714 .worktree_store
1715 .read(cx)
1716 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1717 .ok_or_else(|| anyhow!("no such worktree"))?;
1718 let file = File::from_proto(file, worktree, cx)?;
1719 let old_file = buffer.update(cx, |buffer, cx| {
1720 let old_file = buffer.file().cloned();
1721 let new_path = file.path.clone();
1722 buffer.file_updated(Arc::new(file), cx);
1723 if old_file
1724 .as_ref()
1725 .map_or(true, |old| *old.path() != new_path)
1726 {
1727 Some(old_file)
1728 } else {
1729 None
1730 }
1731 });
1732 if let Some(old_file) = old_file {
1733 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1734 }
1735 }
1736 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1737 downstream_client
1738 .send(proto::UpdateBufferFile {
1739 project_id: *project_id,
1740 buffer_id: buffer_id.into(),
1741 file: envelope.payload.file,
1742 })
1743 .log_err();
1744 }
1745 Ok(())
1746 })?
1747 }
1748
1749 pub async fn handle_save_buffer(
1750 this: Model<Self>,
1751 envelope: TypedEnvelope<proto::SaveBuffer>,
1752 mut cx: AsyncAppContext,
1753 ) -> Result<proto::BufferSaved> {
1754 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1755 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1756 anyhow::Ok((
1757 this.get_existing(buffer_id)?,
1758 this.downstream_client
1759 .as_ref()
1760 .map(|(_, project_id)| *project_id)
1761 .context("project is not shared")?,
1762 ))
1763 })??;
1764 buffer
1765 .update(&mut cx, |buffer, _| {
1766 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1767 })?
1768 .await?;
1769 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1770
1771 if let Some(new_path) = envelope.payload.new_path {
1772 let new_path = ProjectPath::from_proto(new_path);
1773 this.update(&mut cx, |this, cx| {
1774 this.save_buffer_as(buffer.clone(), new_path, cx)
1775 })?
1776 .await?;
1777 } else {
1778 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1779 .await?;
1780 }
1781
1782 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1783 project_id,
1784 buffer_id: buffer_id.into(),
1785 version: serialize_version(buffer.saved_version()),
1786 mtime: buffer.saved_mtime().map(|time| time.into()),
1787 })
1788 }
1789
1790 pub async fn handle_close_buffer(
1791 this: Model<Self>,
1792 envelope: TypedEnvelope<proto::CloseBuffer>,
1793 mut cx: AsyncAppContext,
1794 ) -> Result<()> {
1795 let peer_id = envelope.sender_id;
1796 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1797 this.update(&mut cx, |this, _| {
1798 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1799 if shared.remove(&buffer_id).is_some() {
1800 if shared.is_empty() {
1801 this.shared_buffers.remove(&peer_id);
1802 }
1803 return;
1804 }
1805 }
1806 debug_panic!(
1807 "peer_id {} closed buffer_id {} which was either not open or already closed",
1808 peer_id,
1809 buffer_id
1810 )
1811 })
1812 }
1813
1814 pub async fn handle_buffer_saved(
1815 this: Model<Self>,
1816 envelope: TypedEnvelope<proto::BufferSaved>,
1817 mut cx: AsyncAppContext,
1818 ) -> Result<()> {
1819 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1820 let version = deserialize_version(&envelope.payload.version);
1821 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1822 this.update(&mut cx, move |this, cx| {
1823 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1824 buffer.update(cx, |buffer, cx| {
1825 buffer.did_save(version, mtime, cx);
1826 });
1827 }
1828
1829 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1830 downstream_client
1831 .send(proto::BufferSaved {
1832 project_id: *project_id,
1833 buffer_id: buffer_id.into(),
1834 mtime: envelope.payload.mtime,
1835 version: envelope.payload.version,
1836 })
1837 .log_err();
1838 }
1839 })
1840 }
1841
1842 pub async fn handle_buffer_reloaded(
1843 this: Model<Self>,
1844 envelope: TypedEnvelope<proto::BufferReloaded>,
1845 mut cx: AsyncAppContext,
1846 ) -> Result<()> {
1847 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1848 let version = deserialize_version(&envelope.payload.version);
1849 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1850 let line_ending = deserialize_line_ending(
1851 proto::LineEnding::from_i32(envelope.payload.line_ending)
1852 .ok_or_else(|| anyhow!("missing line ending"))?,
1853 );
1854 this.update(&mut cx, |this, cx| {
1855 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1856 buffer.update(cx, |buffer, cx| {
1857 buffer.did_reload(version, line_ending, mtime, cx);
1858 });
1859 }
1860
1861 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1862 downstream_client
1863 .send(proto::BufferReloaded {
1864 project_id: *project_id,
1865 buffer_id: buffer_id.into(),
1866 mtime: envelope.payload.mtime,
1867 version: envelope.payload.version,
1868 line_ending: envelope.payload.line_ending,
1869 })
1870 .log_err();
1871 }
1872 })
1873 }
1874
1875 pub async fn handle_blame_buffer(
1876 this: Model<Self>,
1877 envelope: TypedEnvelope<proto::BlameBuffer>,
1878 mut cx: AsyncAppContext,
1879 ) -> Result<proto::BlameBufferResponse> {
1880 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1881 let version = deserialize_version(&envelope.payload.version);
1882 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1883 buffer
1884 .update(&mut cx, |buffer, _| {
1885 buffer.wait_for_version(version.clone())
1886 })?
1887 .await?;
1888 let blame = this
1889 .update(&mut cx, |this, cx| {
1890 this.blame_buffer(&buffer, Some(version), cx)
1891 })?
1892 .await?;
1893 Ok(serialize_blame_buffer_response(blame))
1894 }
1895
1896 pub async fn handle_get_permalink_to_line(
1897 this: Model<Self>,
1898 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1899 mut cx: AsyncAppContext,
1900 ) -> Result<proto::GetPermalinkToLineResponse> {
1901 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1902 // let version = deserialize_version(&envelope.payload.version);
1903 let selection = {
1904 let proto_selection = envelope
1905 .payload
1906 .selection
1907 .context("no selection to get permalink for defined")?;
1908 proto_selection.start as u32..proto_selection.end as u32
1909 };
1910 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1911 let permalink = this
1912 .update(&mut cx, |this, cx| {
1913 this.get_permalink_to_line(&buffer, selection, cx)
1914 })?
1915 .await?;
1916 Ok(proto::GetPermalinkToLineResponse {
1917 permalink: permalink.to_string(),
1918 })
1919 }
1920
1921 pub async fn handle_get_staged_text(
1922 this: Model<Self>,
1923 request: TypedEnvelope<proto::GetStagedText>,
1924 mut cx: AsyncAppContext,
1925 ) -> Result<proto::GetStagedTextResponse> {
1926 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1927 let change_set = this
1928 .update(&mut cx, |this, cx| {
1929 let buffer = this.get(buffer_id)?;
1930 Some(this.open_unstaged_changes(buffer, cx))
1931 })?
1932 .ok_or_else(|| anyhow!("no such buffer"))?
1933 .await?;
1934 this.update(&mut cx, |this, _| {
1935 let shared_buffers = this
1936 .shared_buffers
1937 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1938 .or_default();
1939 debug_assert!(shared_buffers.contains_key(&buffer_id));
1940 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
1941 shared.unstaged_changes = Some(change_set.clone());
1942 }
1943 })?;
1944 let staged_text = change_set.read_with(&cx, |change_set, cx| {
1945 change_set
1946 .base_text
1947 .as_ref()
1948 .map(|buffer| buffer.read(cx).text())
1949 })?;
1950 Ok(proto::GetStagedTextResponse { staged_text })
1951 }
1952
1953 pub async fn handle_update_diff_base(
1954 this: Model<Self>,
1955 request: TypedEnvelope<proto::UpdateDiffBase>,
1956 mut cx: AsyncAppContext,
1957 ) -> Result<()> {
1958 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1959 let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
1960 if let OpenBuffer::Complete {
1961 unstaged_changes,
1962 buffer,
1963 } = this.opened_buffers.get(&buffer_id)?
1964 {
1965 Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
1966 } else {
1967 None
1968 }
1969 })?
1970 else {
1971 return Ok(());
1972 };
1973 change_set.update(&mut cx, |change_set, cx| {
1974 if let Some(staged_text) = request.payload.staged_text {
1975 let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
1976 } else {
1977 change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
1978 }
1979 })?;
1980 Ok(())
1981 }
1982
1983 pub fn reload_buffers(
1984 &self,
1985 buffers: HashSet<Model<Buffer>>,
1986 push_to_history: bool,
1987 cx: &mut ModelContext<Self>,
1988 ) -> Task<Result<ProjectTransaction>> {
1989 if buffers.is_empty() {
1990 return Task::ready(Ok(ProjectTransaction::default()));
1991 }
1992 match &self.state {
1993 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1994 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1995 }
1996 }
1997
1998 async fn handle_reload_buffers(
1999 this: Model<Self>,
2000 envelope: TypedEnvelope<proto::ReloadBuffers>,
2001 mut cx: AsyncAppContext,
2002 ) -> Result<proto::ReloadBuffersResponse> {
2003 let sender_id = envelope.original_sender_id().unwrap_or_default();
2004 let reload = this.update(&mut cx, |this, cx| {
2005 let mut buffers = HashSet::default();
2006 for buffer_id in &envelope.payload.buffer_ids {
2007 let buffer_id = BufferId::new(*buffer_id)?;
2008 buffers.insert(this.get_existing(buffer_id)?);
2009 }
2010 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2011 })??;
2012
2013 let project_transaction = reload.await?;
2014 let project_transaction = this.update(&mut cx, |this, cx| {
2015 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2016 })?;
2017 Ok(proto::ReloadBuffersResponse {
2018 transaction: Some(project_transaction),
2019 })
2020 }
2021
2022 pub fn create_buffer_for_peer(
2023 &mut self,
2024 buffer: &Model<Buffer>,
2025 peer_id: proto::PeerId,
2026 cx: &mut ModelContext<Self>,
2027 ) -> Task<Result<()>> {
2028 let buffer_id = buffer.read(cx).remote_id();
2029 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2030 if shared_buffers.contains_key(&buffer_id) {
2031 return Task::ready(Ok(()));
2032 }
2033 shared_buffers.insert(
2034 buffer_id,
2035 SharedBuffer {
2036 buffer: buffer.clone(),
2037 unstaged_changes: None,
2038 lsp_handle: None,
2039 },
2040 );
2041
2042 let Some((client, project_id)) = self.downstream_client.clone() else {
2043 return Task::ready(Ok(()));
2044 };
2045
2046 cx.spawn(|this, mut cx| async move {
2047 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2048 return anyhow::Ok(());
2049 };
2050
2051 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2052 let operations = operations.await;
2053 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2054
2055 let initial_state = proto::CreateBufferForPeer {
2056 project_id,
2057 peer_id: Some(peer_id),
2058 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2059 };
2060
2061 if client.send(initial_state).log_err().is_some() {
2062 let client = client.clone();
2063 cx.background_executor()
2064 .spawn(async move {
2065 let mut chunks = split_operations(operations).peekable();
2066 while let Some(chunk) = chunks.next() {
2067 let is_last = chunks.peek().is_none();
2068 client.send(proto::CreateBufferForPeer {
2069 project_id,
2070 peer_id: Some(peer_id),
2071 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2072 proto::BufferChunk {
2073 buffer_id: buffer_id.into(),
2074 operations: chunk,
2075 is_last,
2076 },
2077 )),
2078 })?;
2079 }
2080 anyhow::Ok(())
2081 })
2082 .await
2083 .log_err();
2084 }
2085 Ok(())
2086 })
2087 }
2088
2089 pub fn forget_shared_buffers(&mut self) {
2090 self.shared_buffers.clear();
2091 }
2092
2093 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2094 self.shared_buffers.remove(peer_id);
2095 }
2096
2097 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2098 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2099 self.shared_buffers.insert(new_peer_id, buffers);
2100 }
2101 }
2102
2103 pub fn has_shared_buffers(&self) -> bool {
2104 !self.shared_buffers.is_empty()
2105 }
2106
2107 pub fn create_local_buffer(
2108 &mut self,
2109 text: &str,
2110 language: Option<Arc<Language>>,
2111 cx: &mut ModelContext<Self>,
2112 ) -> Model<Buffer> {
2113 let buffer = cx.new_model(|cx| {
2114 Buffer::local(text, cx)
2115 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2116 });
2117
2118 self.add_buffer(buffer.clone(), cx).log_err();
2119 let buffer_id = buffer.read(cx).remote_id();
2120
2121 let this = self
2122 .as_local_mut()
2123 .expect("local-only method called in a non-local context");
2124 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2125 this.local_buffer_ids_by_path.insert(
2126 ProjectPath {
2127 worktree_id: file.worktree_id(cx),
2128 path: file.path.clone(),
2129 },
2130 buffer_id,
2131 );
2132
2133 if let Some(entry_id) = file.entry_id {
2134 this.local_buffer_ids_by_entry_id
2135 .insert(entry_id, buffer_id);
2136 }
2137 }
2138 buffer
2139 }
2140
2141 pub fn deserialize_project_transaction(
2142 &mut self,
2143 message: proto::ProjectTransaction,
2144 push_to_history: bool,
2145 cx: &mut ModelContext<Self>,
2146 ) -> Task<Result<ProjectTransaction>> {
2147 if let Some(this) = self.as_remote_mut() {
2148 this.deserialize_project_transaction(message, push_to_history, cx)
2149 } else {
2150 debug_panic!("not a remote buffer store");
2151 Task::ready(Err(anyhow!("not a remote buffer store")))
2152 }
2153 }
2154
2155 pub fn wait_for_remote_buffer(
2156 &mut self,
2157 id: BufferId,
2158 cx: &mut ModelContext<BufferStore>,
2159 ) -> Task<Result<Model<Buffer>>> {
2160 if let Some(this) = self.as_remote_mut() {
2161 this.wait_for_remote_buffer(id, cx)
2162 } else {
2163 debug_panic!("not a remote buffer store");
2164 Task::ready(Err(anyhow!("not a remote buffer store")))
2165 }
2166 }
2167
2168 pub fn serialize_project_transaction_for_peer(
2169 &mut self,
2170 project_transaction: ProjectTransaction,
2171 peer_id: proto::PeerId,
2172 cx: &mut ModelContext<Self>,
2173 ) -> proto::ProjectTransaction {
2174 let mut serialized_transaction = proto::ProjectTransaction {
2175 buffer_ids: Default::default(),
2176 transactions: Default::default(),
2177 };
2178 for (buffer, transaction) in project_transaction.0 {
2179 self.create_buffer_for_peer(&buffer, peer_id, cx)
2180 .detach_and_log_err(cx);
2181 serialized_transaction
2182 .buffer_ids
2183 .push(buffer.read(cx).remote_id().into());
2184 serialized_transaction
2185 .transactions
2186 .push(language::proto::serialize_transaction(&transaction));
2187 }
2188 serialized_transaction
2189 }
2190}
2191
2192impl BufferChangeSet {
2193 pub fn new(buffer: &text::BufferSnapshot) -> Self {
2194 Self {
2195 buffer_id: buffer.remote_id(),
2196 base_text: None,
2197 diff_to_buffer: git::diff::BufferDiff::new(buffer),
2198 recalculate_diff_task: None,
2199 diff_updated_futures: Vec::new(),
2200 base_text_version: 0,
2201 }
2202 }
2203
2204 #[cfg(any(test, feature = "test-support"))]
2205 pub fn new_with_base_text(
2206 base_text: String,
2207 buffer: text::BufferSnapshot,
2208 cx: &mut ModelContext<Self>,
2209 ) -> Self {
2210 let mut this = Self::new(&buffer);
2211 let _ = this.set_base_text(base_text, buffer, cx);
2212 this
2213 }
2214
2215 pub fn diff_hunks_intersecting_range<'a>(
2216 &'a self,
2217 range: Range<text::Anchor>,
2218 buffer_snapshot: &'a text::BufferSnapshot,
2219 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2220 self.diff_to_buffer
2221 .hunks_intersecting_range(range, buffer_snapshot)
2222 }
2223
2224 pub fn diff_hunks_intersecting_range_rev<'a>(
2225 &'a self,
2226 range: Range<text::Anchor>,
2227 buffer_snapshot: &'a text::BufferSnapshot,
2228 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2229 self.diff_to_buffer
2230 .hunks_intersecting_range_rev(range, buffer_snapshot)
2231 }
2232
2233 #[cfg(any(test, feature = "test-support"))]
2234 pub fn base_text_string(&self, cx: &AppContext) -> Option<String> {
2235 self.base_text.as_ref().map(|buffer| buffer.read(cx).text())
2236 }
2237
2238 pub fn set_base_text(
2239 &mut self,
2240 mut base_text: String,
2241 buffer_snapshot: text::BufferSnapshot,
2242 cx: &mut ModelContext<Self>,
2243 ) -> oneshot::Receiver<()> {
2244 LineEnding::normalize(&mut base_text);
2245 self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
2246 }
2247
2248 pub fn unset_base_text(
2249 &mut self,
2250 buffer_snapshot: text::BufferSnapshot,
2251 cx: &mut ModelContext<Self>,
2252 ) {
2253 if self.base_text.is_some() {
2254 self.base_text = None;
2255 self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
2256 self.recalculate_diff_task.take();
2257 self.base_text_version += 1;
2258 cx.notify();
2259 }
2260 }
2261
2262 pub fn recalculate_diff(
2263 &mut self,
2264 buffer_snapshot: text::BufferSnapshot,
2265 cx: &mut ModelContext<Self>,
2266 ) -> oneshot::Receiver<()> {
2267 if let Some(base_text) = self.base_text.clone() {
2268 self.recalculate_diff_internal(base_text.read(cx).text(), buffer_snapshot, false, cx)
2269 } else {
2270 oneshot::channel().1
2271 }
2272 }
2273
2274 fn recalculate_diff_internal(
2275 &mut self,
2276 base_text: String,
2277 buffer_snapshot: text::BufferSnapshot,
2278 base_text_changed: bool,
2279 cx: &mut ModelContext<Self>,
2280 ) -> oneshot::Receiver<()> {
2281 let (tx, rx) = oneshot::channel();
2282 self.diff_updated_futures.push(tx);
2283 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2284 let (base_text, diff) = cx
2285 .background_executor()
2286 .spawn(async move {
2287 let diff = BufferDiff::build(&base_text, &buffer_snapshot).await;
2288 (base_text, diff)
2289 })
2290 .await;
2291 this.update(&mut cx, |this, cx| {
2292 if base_text_changed {
2293 this.base_text_version += 1;
2294 this.base_text = Some(cx.new_model(|cx| {
2295 Buffer::local_normalized(Rope::from(base_text), LineEnding::default(), cx)
2296 }));
2297 }
2298 this.diff_to_buffer = diff;
2299 this.recalculate_diff_task.take();
2300 for tx in this.diff_updated_futures.drain(..) {
2301 tx.send(()).ok();
2302 }
2303 cx.notify();
2304 })?;
2305 Ok(())
2306 }));
2307 rx
2308 }
2309}
2310
2311impl OpenBuffer {
2312 fn upgrade(&self) -> Option<Model<Buffer>> {
2313 match self {
2314 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2315 OpenBuffer::Operations(_) => None,
2316 }
2317 }
2318}
2319
2320fn is_not_found_error(error: &anyhow::Error) -> bool {
2321 error
2322 .root_cause()
2323 .downcast_ref::<io::Error>()
2324 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2325}
2326
2327fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2328 let Some(blame) = blame else {
2329 return proto::BlameBufferResponse {
2330 blame_response: None,
2331 };
2332 };
2333
2334 let entries = blame
2335 .entries
2336 .into_iter()
2337 .map(|entry| proto::BlameEntry {
2338 sha: entry.sha.as_bytes().into(),
2339 start_line: entry.range.start,
2340 end_line: entry.range.end,
2341 original_line_number: entry.original_line_number,
2342 author: entry.author.clone(),
2343 author_mail: entry.author_mail.clone(),
2344 author_time: entry.author_time,
2345 author_tz: entry.author_tz.clone(),
2346 committer: entry.committer.clone(),
2347 committer_mail: entry.committer_mail.clone(),
2348 committer_time: entry.committer_time,
2349 committer_tz: entry.committer_tz.clone(),
2350 summary: entry.summary.clone(),
2351 previous: entry.previous.clone(),
2352 filename: entry.filename.clone(),
2353 })
2354 .collect::<Vec<_>>();
2355
2356 let messages = blame
2357 .messages
2358 .into_iter()
2359 .map(|(oid, message)| proto::CommitMessage {
2360 oid: oid.as_bytes().into(),
2361 message,
2362 })
2363 .collect::<Vec<_>>();
2364
2365 let permalinks = blame
2366 .permalinks
2367 .into_iter()
2368 .map(|(oid, url)| proto::CommitPermalink {
2369 oid: oid.as_bytes().into(),
2370 permalink: url.to_string(),
2371 })
2372 .collect::<Vec<_>>();
2373
2374 proto::BlameBufferResponse {
2375 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2376 entries,
2377 messages,
2378 permalinks,
2379 remote_url: blame.remote_url,
2380 }),
2381 }
2382}
2383
2384fn deserialize_blame_buffer_response(
2385 response: proto::BlameBufferResponse,
2386) -> Option<git::blame::Blame> {
2387 let response = response.blame_response?;
2388 let entries = response
2389 .entries
2390 .into_iter()
2391 .filter_map(|entry| {
2392 Some(git::blame::BlameEntry {
2393 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2394 range: entry.start_line..entry.end_line,
2395 original_line_number: entry.original_line_number,
2396 committer: entry.committer,
2397 committer_time: entry.committer_time,
2398 committer_tz: entry.committer_tz,
2399 committer_mail: entry.committer_mail,
2400 author: entry.author,
2401 author_mail: entry.author_mail,
2402 author_time: entry.author_time,
2403 author_tz: entry.author_tz,
2404 summary: entry.summary,
2405 previous: entry.previous,
2406 filename: entry.filename,
2407 })
2408 })
2409 .collect::<Vec<_>>();
2410
2411 let messages = response
2412 .messages
2413 .into_iter()
2414 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2415 .collect::<HashMap<_, _>>();
2416
2417 let permalinks = response
2418 .permalinks
2419 .into_iter()
2420 .filter_map(|permalink| {
2421 Some((
2422 git::Oid::from_bytes(&permalink.oid).ok()?,
2423 Url::from_str(&permalink.permalink).ok()?,
2424 ))
2425 })
2426 .collect::<HashMap<_, _>>();
2427
2428 Some(Blame {
2429 entries,
2430 permalinks,
2431 messages,
2432 remote_url: response.remote_url,
2433 })
2434}