1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 ProjectItem as _, ProjectPath,
5};
6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
7use anyhow::{anyhow, Context as _, Result};
8use client::Client;
9use collections::{hash_map, HashMap, HashSet};
10use fs::Fs;
11use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
12use git::{blame::Blame, diff::BufferDiff};
13use gpui::{
14 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
15 Task, WeakModel,
16};
17use http_client::Url;
18use language::{
19 proto::{
20 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
21 split_operations,
22 },
23 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
24};
25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
26use smol::channel::Receiver;
27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
28use text::{BufferId, LineEnding, Rope};
29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
31
32/// A set of open buffers.
33pub struct BufferStore {
34 state: BufferStoreState,
35 #[allow(clippy::type_complexity)]
36 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Model<Buffer>, Arc<anyhow::Error>>>>>,
37 #[allow(clippy::type_complexity)]
38 loading_change_sets:
39 HashMap<BufferId, Shared<Task<Result<Model<BufferChangeSet>, Arc<anyhow::Error>>>>>,
40 worktree_store: Model<WorktreeStore>,
41 opened_buffers: HashMap<BufferId, OpenBuffer>,
42 downstream_client: Option<(AnyProtoClient, u64)>,
43 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
44}
45
46#[derive(Hash, Eq, PartialEq, Clone)]
47struct SharedBuffer {
48 buffer: Model<Buffer>,
49 unstaged_changes: Option<Model<BufferChangeSet>>,
50}
51
52pub struct BufferChangeSet {
53 pub buffer_id: BufferId,
54 pub base_text: Option<Model<Buffer>>,
55 pub diff_to_buffer: git::diff::BufferDiff,
56 pub recalculate_diff_task: Option<Task<Result<()>>>,
57 pub diff_updated_futures: Vec<oneshot::Sender<()>>,
58 pub base_text_version: usize,
59}
60
61enum BufferStoreState {
62 Local(LocalBufferStore),
63 Remote(RemoteBufferStore),
64}
65
66struct RemoteBufferStore {
67 shared_with_me: HashSet<Model<Buffer>>,
68 upstream_client: AnyProtoClient,
69 project_id: u64,
70 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
71 remote_buffer_listeners:
72 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
73 worktree_store: Model<WorktreeStore>,
74}
75
76struct LocalBufferStore {
77 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
78 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
79 worktree_store: Model<WorktreeStore>,
80 _subscription: Subscription,
81}
82
83enum OpenBuffer {
84 Complete {
85 buffer: WeakModel<Buffer>,
86 unstaged_changes: Option<WeakModel<BufferChangeSet>>,
87 },
88 Operations(Vec<Operation>),
89}
90
91pub enum BufferStoreEvent {
92 BufferAdded(Model<Buffer>),
93 BufferDropped(BufferId),
94 BufferChangedFilePath {
95 buffer: Model<Buffer>,
96 old_file: Option<Arc<dyn language::File>>,
97 },
98}
99
100#[derive(Default, Debug)]
101pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
102
103impl EventEmitter<BufferStoreEvent> for BufferStore {}
104
105impl RemoteBufferStore {
106 fn load_staged_text(
107 &self,
108 buffer_id: BufferId,
109 cx: &AppContext,
110 ) -> Task<Result<Option<String>>> {
111 let project_id = self.project_id;
112 let client = self.upstream_client.clone();
113 cx.background_executor().spawn(async move {
114 Ok(client
115 .request(proto::GetStagedText {
116 project_id,
117 buffer_id: buffer_id.to_proto(),
118 })
119 .await?
120 .staged_text)
121 })
122 }
123 pub fn wait_for_remote_buffer(
124 &mut self,
125 id: BufferId,
126 cx: &mut ModelContext<BufferStore>,
127 ) -> Task<Result<Model<Buffer>>> {
128 let (tx, rx) = oneshot::channel();
129 self.remote_buffer_listeners.entry(id).or_default().push(tx);
130
131 cx.spawn(|this, cx| async move {
132 if let Some(buffer) = this
133 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
134 .ok()
135 .flatten()
136 {
137 return Ok(buffer);
138 }
139
140 cx.background_executor()
141 .spawn(async move { rx.await? })
142 .await
143 })
144 }
145
146 fn save_remote_buffer(
147 &self,
148 buffer_handle: Model<Buffer>,
149 new_path: Option<proto::ProjectPath>,
150 cx: &ModelContext<BufferStore>,
151 ) -> Task<Result<()>> {
152 let buffer = buffer_handle.read(cx);
153 let buffer_id = buffer.remote_id().into();
154 let version = buffer.version();
155 let rpc = self.upstream_client.clone();
156 let project_id = self.project_id;
157 cx.spawn(move |_, mut cx| async move {
158 let response = rpc
159 .request(proto::SaveBuffer {
160 project_id,
161 buffer_id,
162 new_path,
163 version: serialize_version(&version),
164 })
165 .await?;
166 let version = deserialize_version(&response.version);
167 let mtime = response.mtime.map(|mtime| mtime.into());
168
169 buffer_handle.update(&mut cx, |buffer, cx| {
170 buffer.did_save(version.clone(), mtime, cx);
171 })?;
172
173 Ok(())
174 })
175 }
176
177 pub fn handle_create_buffer_for_peer(
178 &mut self,
179 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
180 replica_id: u16,
181 capability: Capability,
182 cx: &mut ModelContext<BufferStore>,
183 ) -> Result<Option<Model<Buffer>>> {
184 match envelope
185 .payload
186 .variant
187 .ok_or_else(|| anyhow!("missing variant"))?
188 {
189 proto::create_buffer_for_peer::Variant::State(mut state) => {
190 let buffer_id = BufferId::new(state.id)?;
191
192 let buffer_result = maybe!({
193 let mut buffer_file = None;
194 if let Some(file) = state.file.take() {
195 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
196 let worktree = self
197 .worktree_store
198 .read(cx)
199 .worktree_for_id(worktree_id, cx)
200 .ok_or_else(|| {
201 anyhow!("no worktree found for id {}", file.worktree_id)
202 })?;
203 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
204 as Arc<dyn language::File>);
205 }
206 Buffer::from_proto(replica_id, capability, state, buffer_file)
207 });
208
209 match buffer_result {
210 Ok(buffer) => {
211 let buffer = cx.new_model(|_| buffer);
212 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
213 }
214 Err(error) => {
215 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
216 for listener in listeners {
217 listener.send(Err(anyhow!(error.cloned()))).ok();
218 }
219 }
220 }
221 }
222 }
223 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
224 let buffer_id = BufferId::new(chunk.buffer_id)?;
225 let buffer = self
226 .loading_remote_buffers_by_id
227 .get(&buffer_id)
228 .cloned()
229 .ok_or_else(|| {
230 anyhow!(
231 "received chunk for buffer {} without initial state",
232 chunk.buffer_id
233 )
234 })?;
235
236 let result = maybe!({
237 let operations = chunk
238 .operations
239 .into_iter()
240 .map(language::proto::deserialize_operation)
241 .collect::<Result<Vec<_>>>()?;
242 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
243 anyhow::Ok(())
244 });
245
246 if let Err(error) = result {
247 self.loading_remote_buffers_by_id.remove(&buffer_id);
248 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
249 for listener in listeners {
250 listener.send(Err(error.cloned())).ok();
251 }
252 }
253 } else if chunk.is_last {
254 self.loading_remote_buffers_by_id.remove(&buffer_id);
255 if self.upstream_client.is_via_collab() {
256 // retain buffers sent by peers to avoid races.
257 self.shared_with_me.insert(buffer.clone());
258 }
259
260 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
261 for sender in senders {
262 sender.send(Ok(buffer.clone())).ok();
263 }
264 }
265 return Ok(Some(buffer));
266 }
267 }
268 }
269 return Ok(None);
270 }
271
272 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
273 self.loading_remote_buffers_by_id
274 .keys()
275 .copied()
276 .collect::<Vec<_>>()
277 }
278
279 pub fn deserialize_project_transaction(
280 &self,
281 message: proto::ProjectTransaction,
282 push_to_history: bool,
283 cx: &mut ModelContext<BufferStore>,
284 ) -> Task<Result<ProjectTransaction>> {
285 cx.spawn(|this, mut cx| async move {
286 let mut project_transaction = ProjectTransaction::default();
287 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
288 {
289 let buffer_id = BufferId::new(buffer_id)?;
290 let buffer = this
291 .update(&mut cx, |this, cx| {
292 this.wait_for_remote_buffer(buffer_id, cx)
293 })?
294 .await?;
295 let transaction = language::proto::deserialize_transaction(transaction)?;
296 project_transaction.0.insert(buffer, transaction);
297 }
298
299 for (buffer, transaction) in &project_transaction.0 {
300 buffer
301 .update(&mut cx, |buffer, _| {
302 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
303 })?
304 .await?;
305
306 if push_to_history {
307 buffer.update(&mut cx, |buffer, _| {
308 buffer.push_transaction(transaction.clone(), Instant::now());
309 })?;
310 }
311 }
312
313 Ok(project_transaction)
314 })
315 }
316
317 fn open_buffer(
318 &self,
319 path: Arc<Path>,
320 worktree: Model<Worktree>,
321 cx: &mut ModelContext<BufferStore>,
322 ) -> Task<Result<Model<Buffer>>> {
323 let worktree_id = worktree.read(cx).id().to_proto();
324 let project_id = self.project_id;
325 let client = self.upstream_client.clone();
326 let path_string = path.clone().to_string_lossy().to_string();
327 cx.spawn(move |this, mut cx| async move {
328 let response = client
329 .request(proto::OpenBufferByPath {
330 project_id,
331 worktree_id,
332 path: path_string,
333 })
334 .await?;
335 let buffer_id = BufferId::new(response.buffer_id)?;
336
337 let buffer = this
338 .update(&mut cx, {
339 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
340 })?
341 .await?;
342
343 Ok(buffer)
344 })
345 }
346
347 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
348 let create = self.upstream_client.request(proto::OpenNewBuffer {
349 project_id: self.project_id,
350 });
351 cx.spawn(|this, mut cx| async move {
352 let response = create.await?;
353 let buffer_id = BufferId::new(response.buffer_id)?;
354
355 this.update(&mut cx, |this, cx| {
356 this.wait_for_remote_buffer(buffer_id, cx)
357 })?
358 .await
359 })
360 }
361
362 fn reload_buffers(
363 &self,
364 buffers: HashSet<Model<Buffer>>,
365 push_to_history: bool,
366 cx: &mut ModelContext<BufferStore>,
367 ) -> Task<Result<ProjectTransaction>> {
368 let request = self.upstream_client.request(proto::ReloadBuffers {
369 project_id: self.project_id,
370 buffer_ids: buffers
371 .iter()
372 .map(|buffer| buffer.read(cx).remote_id().to_proto())
373 .collect(),
374 });
375
376 cx.spawn(|this, mut cx| async move {
377 let response = request
378 .await?
379 .transaction
380 .ok_or_else(|| anyhow!("missing transaction"))?;
381 this.update(&mut cx, |this, cx| {
382 this.deserialize_project_transaction(response, push_to_history, cx)
383 })?
384 .await
385 })
386 }
387}
388
389impl LocalBufferStore {
390 fn load_staged_text(
391 &self,
392 buffer: &Model<Buffer>,
393 cx: &AppContext,
394 ) -> Task<Result<Option<String>>> {
395 let Some(file) = buffer.read(cx).file() else {
396 return Task::ready(Err(anyhow!("buffer has no file")));
397 };
398 let worktree_id = file.worktree_id(cx);
399 let path = file.path().clone();
400 let Some(worktree) = self
401 .worktree_store
402 .read(cx)
403 .worktree_for_id(worktree_id, cx)
404 else {
405 return Task::ready(Err(anyhow!("no such worktree")));
406 };
407
408 worktree.read(cx).load_staged_file(path.as_ref(), cx)
409 }
410
411 fn save_local_buffer(
412 &self,
413 buffer_handle: Model<Buffer>,
414 worktree: Model<Worktree>,
415 path: Arc<Path>,
416 mut has_changed_file: bool,
417 cx: &mut ModelContext<BufferStore>,
418 ) -> Task<Result<()>> {
419 let buffer = buffer_handle.read(cx);
420
421 let text = buffer.as_rope().clone();
422 let line_ending = buffer.line_ending();
423 let version = buffer.version();
424 let buffer_id = buffer.remote_id();
425 if buffer
426 .file()
427 .is_some_and(|file| file.disk_state() == DiskState::New)
428 {
429 has_changed_file = true;
430 }
431
432 let save = worktree.update(cx, |worktree, cx| {
433 worktree.write_file(path.as_ref(), text, line_ending, cx)
434 });
435
436 cx.spawn(move |this, mut cx| async move {
437 let new_file = save.await?;
438 let mtime = new_file.disk_state().mtime();
439 this.update(&mut cx, |this, cx| {
440 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
441 if has_changed_file {
442 downstream_client
443 .send(proto::UpdateBufferFile {
444 project_id,
445 buffer_id: buffer_id.to_proto(),
446 file: Some(language::File::to_proto(&*new_file, cx)),
447 })
448 .log_err();
449 }
450 downstream_client
451 .send(proto::BufferSaved {
452 project_id,
453 buffer_id: buffer_id.to_proto(),
454 version: serialize_version(&version),
455 mtime: mtime.map(|time| time.into()),
456 })
457 .log_err();
458 }
459 })?;
460 buffer_handle.update(&mut cx, |buffer, cx| {
461 if has_changed_file {
462 buffer.file_updated(new_file, cx);
463 }
464 buffer.did_save(version.clone(), mtime, cx);
465 })
466 })
467 }
468
469 fn subscribe_to_worktree(
470 &mut self,
471 worktree: &Model<Worktree>,
472 cx: &mut ModelContext<BufferStore>,
473 ) {
474 cx.subscribe(worktree, |this, worktree, event, cx| {
475 if worktree.read(cx).is_local() {
476 match event {
477 worktree::Event::UpdatedEntries(changes) => {
478 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
479 }
480 worktree::Event::UpdatedGitRepositories(updated_repos) => {
481 Self::local_worktree_git_repos_changed(
482 this,
483 worktree.clone(),
484 updated_repos,
485 cx,
486 )
487 }
488 _ => {}
489 }
490 }
491 })
492 .detach();
493 }
494
495 fn local_worktree_entries_changed(
496 this: &mut BufferStore,
497 worktree_handle: &Model<Worktree>,
498 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
499 cx: &mut ModelContext<BufferStore>,
500 ) {
501 let snapshot = worktree_handle.read(cx).snapshot();
502 for (path, entry_id, _) in changes {
503 Self::local_worktree_entry_changed(
504 this,
505 *entry_id,
506 path,
507 worktree_handle,
508 &snapshot,
509 cx,
510 );
511 }
512 }
513
514 fn local_worktree_git_repos_changed(
515 this: &mut BufferStore,
516 worktree_handle: Model<Worktree>,
517 changed_repos: &UpdatedGitRepositoriesSet,
518 cx: &mut ModelContext<BufferStore>,
519 ) {
520 debug_assert!(worktree_handle.read(cx).is_local());
521
522 let buffer_change_sets = this
523 .opened_buffers
524 .values()
525 .filter_map(|buffer| {
526 if let OpenBuffer::Complete {
527 buffer,
528 unstaged_changes,
529 } = buffer
530 {
531 let buffer = buffer.upgrade()?.read(cx);
532 let file = File::from_dyn(buffer.file())?;
533 if file.worktree != worktree_handle {
534 return None;
535 }
536 changed_repos
537 .iter()
538 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
539 let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
540 let snapshot = buffer.text_snapshot();
541 Some((unstaged_changes, snapshot, file.path.clone()))
542 } else {
543 None
544 }
545 })
546 .collect::<Vec<_>>();
547
548 if buffer_change_sets.is_empty() {
549 return;
550 }
551
552 cx.spawn(move |this, mut cx| async move {
553 let snapshot =
554 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
555 let diff_bases_by_buffer = cx
556 .background_executor()
557 .spawn(async move {
558 buffer_change_sets
559 .into_iter()
560 .filter_map(|(change_set, buffer_snapshot, path)| {
561 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
562 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
563 let base_text = local_repo_entry.repo().load_index_text(&relative_path);
564 Some((change_set, buffer_snapshot, base_text))
565 })
566 .collect::<Vec<_>>()
567 })
568 .await;
569
570 this.update(&mut cx, |this, cx| {
571 for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
572 change_set.update(cx, |change_set, cx| {
573 if let Some(staged_text) = staged_text.clone() {
574 let _ =
575 change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
576 } else {
577 change_set.unset_base_text(buffer_snapshot.clone(), cx);
578 }
579 });
580
581 if let Some((client, project_id)) = &this.downstream_client.clone() {
582 client
583 .send(proto::UpdateDiffBase {
584 project_id: *project_id,
585 buffer_id: buffer_snapshot.remote_id().to_proto(),
586 staged_text,
587 })
588 .log_err();
589 }
590 }
591 })
592 })
593 .detach_and_log_err(cx);
594 }
595
596 fn local_worktree_entry_changed(
597 this: &mut BufferStore,
598 entry_id: ProjectEntryId,
599 path: &Arc<Path>,
600 worktree: &Model<worktree::Worktree>,
601 snapshot: &worktree::Snapshot,
602 cx: &mut ModelContext<BufferStore>,
603 ) -> Option<()> {
604 let project_path = ProjectPath {
605 worktree_id: snapshot.id(),
606 path: path.clone(),
607 };
608
609 let buffer_id = {
610 let local = this.as_local_mut()?;
611 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
612 Some(&buffer_id) => buffer_id,
613 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
614 }
615 };
616
617 let buffer = if let Some(buffer) = this.get(buffer_id) {
618 Some(buffer)
619 } else {
620 this.opened_buffers.remove(&buffer_id);
621 None
622 };
623
624 let buffer = if let Some(buffer) = buffer {
625 buffer
626 } else {
627 let this = this.as_local_mut()?;
628 this.local_buffer_ids_by_path.remove(&project_path);
629 this.local_buffer_ids_by_entry_id.remove(&entry_id);
630 return None;
631 };
632
633 let events = buffer.update(cx, |buffer, cx| {
634 let local = this.as_local_mut()?;
635 let file = buffer.file()?;
636 let old_file = File::from_dyn(Some(file))?;
637 if old_file.worktree != *worktree {
638 return None;
639 }
640
641 let snapshot_entry = old_file
642 .entry_id
643 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
644 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
645
646 let new_file = if let Some(entry) = snapshot_entry {
647 File {
648 disk_state: match entry.mtime {
649 Some(mtime) => DiskState::Present { mtime },
650 None => old_file.disk_state,
651 },
652 is_local: true,
653 entry_id: Some(entry.id),
654 path: entry.path.clone(),
655 worktree: worktree.clone(),
656 is_private: entry.is_private,
657 }
658 } else {
659 File {
660 disk_state: DiskState::Deleted,
661 is_local: true,
662 entry_id: old_file.entry_id,
663 path: old_file.path.clone(),
664 worktree: worktree.clone(),
665 is_private: old_file.is_private,
666 }
667 };
668
669 if new_file == *old_file {
670 return None;
671 }
672
673 let mut events = Vec::new();
674 if new_file.path != old_file.path {
675 local.local_buffer_ids_by_path.remove(&ProjectPath {
676 path: old_file.path.clone(),
677 worktree_id: old_file.worktree_id(cx),
678 });
679 local.local_buffer_ids_by_path.insert(
680 ProjectPath {
681 worktree_id: new_file.worktree_id(cx),
682 path: new_file.path.clone(),
683 },
684 buffer_id,
685 );
686 events.push(BufferStoreEvent::BufferChangedFilePath {
687 buffer: cx.handle(),
688 old_file: buffer.file().cloned(),
689 });
690 }
691
692 if new_file.entry_id != old_file.entry_id {
693 if let Some(entry_id) = old_file.entry_id {
694 local.local_buffer_ids_by_entry_id.remove(&entry_id);
695 }
696 if let Some(entry_id) = new_file.entry_id {
697 local
698 .local_buffer_ids_by_entry_id
699 .insert(entry_id, buffer_id);
700 }
701 }
702
703 if let Some((client, project_id)) = &this.downstream_client {
704 client
705 .send(proto::UpdateBufferFile {
706 project_id: *project_id,
707 buffer_id: buffer_id.to_proto(),
708 file: Some(new_file.to_proto(cx)),
709 })
710 .ok();
711 }
712
713 buffer.file_updated(Arc::new(new_file), cx);
714 Some(events)
715 })?;
716
717 for event in events {
718 cx.emit(event);
719 }
720
721 None
722 }
723
724 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
725 let file = File::from_dyn(buffer.read(cx).file())?;
726
727 let remote_id = buffer.read(cx).remote_id();
728 if let Some(entry_id) = file.entry_id {
729 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
730 Some(_) => {
731 return None;
732 }
733 None => {
734 self.local_buffer_ids_by_entry_id
735 .insert(entry_id, remote_id);
736 }
737 }
738 };
739 self.local_buffer_ids_by_path.insert(
740 ProjectPath {
741 worktree_id: file.worktree_id(cx),
742 path: file.path.clone(),
743 },
744 remote_id,
745 );
746
747 Some(())
748 }
749
750 fn save_buffer(
751 &self,
752 buffer: Model<Buffer>,
753 cx: &mut ModelContext<BufferStore>,
754 ) -> Task<Result<()>> {
755 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
756 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
757 };
758 let worktree = file.worktree.clone();
759 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
760 }
761
762 fn save_buffer_as(
763 &self,
764 buffer: Model<Buffer>,
765 path: ProjectPath,
766 cx: &mut ModelContext<BufferStore>,
767 ) -> Task<Result<()>> {
768 let Some(worktree) = self
769 .worktree_store
770 .read(cx)
771 .worktree_for_id(path.worktree_id, cx)
772 else {
773 return Task::ready(Err(anyhow!("no such worktree")));
774 };
775 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
776 }
777
778 fn open_buffer(
779 &self,
780 path: Arc<Path>,
781 worktree: Model<Worktree>,
782 cx: &mut ModelContext<BufferStore>,
783 ) -> Task<Result<Model<Buffer>>> {
784 let load_buffer = worktree.update(cx, |worktree, cx| {
785 let load_file = worktree.load_file(path.as_ref(), cx);
786 let reservation = cx.reserve_model();
787 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
788 cx.spawn(move |_, mut cx| async move {
789 let loaded = load_file.await?;
790 let text_buffer = cx
791 .background_executor()
792 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
793 .await;
794 cx.insert_model(reservation, |_| {
795 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
796 })
797 })
798 });
799
800 cx.spawn(move |this, mut cx| async move {
801 let buffer = match load_buffer.await {
802 Ok(buffer) => Ok(buffer),
803 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
804 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
805 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
806 Buffer::build(
807 text_buffer,
808 Some(Arc::new(File {
809 worktree,
810 path,
811 disk_state: DiskState::New,
812 entry_id: None,
813 is_local: true,
814 is_private: false,
815 })),
816 Capability::ReadWrite,
817 )
818 }),
819 Err(e) => Err(e),
820 }?;
821 this.update(&mut cx, |this, cx| {
822 this.add_buffer(buffer.clone(), cx)?;
823 let buffer_id = buffer.read(cx).remote_id();
824 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
825 let this = this.as_local_mut().unwrap();
826 this.local_buffer_ids_by_path.insert(
827 ProjectPath {
828 worktree_id: file.worktree_id(cx),
829 path: file.path.clone(),
830 },
831 buffer_id,
832 );
833
834 if let Some(entry_id) = file.entry_id {
835 this.local_buffer_ids_by_entry_id
836 .insert(entry_id, buffer_id);
837 }
838 }
839
840 anyhow::Ok(())
841 })??;
842
843 Ok(buffer)
844 })
845 }
846
847 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
848 cx.spawn(|buffer_store, mut cx| async move {
849 let buffer = cx.new_model(|cx| {
850 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
851 })?;
852 buffer_store.update(&mut cx, |buffer_store, cx| {
853 buffer_store.add_buffer(buffer.clone(), cx).log_err();
854 })?;
855 Ok(buffer)
856 })
857 }
858
859 fn reload_buffers(
860 &self,
861 buffers: HashSet<Model<Buffer>>,
862 push_to_history: bool,
863 cx: &mut ModelContext<BufferStore>,
864 ) -> Task<Result<ProjectTransaction>> {
865 cx.spawn(move |_, mut cx| async move {
866 let mut project_transaction = ProjectTransaction::default();
867 for buffer in buffers {
868 let transaction = buffer
869 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
870 .await?;
871 buffer.update(&mut cx, |buffer, cx| {
872 if let Some(transaction) = transaction {
873 if !push_to_history {
874 buffer.forget_transaction(transaction.id);
875 }
876 project_transaction.0.insert(cx.handle(), transaction);
877 }
878 })?;
879 }
880
881 Ok(project_transaction)
882 })
883 }
884}
885
886impl BufferStore {
887 pub fn init(client: &AnyProtoClient) {
888 client.add_model_message_handler(Self::handle_buffer_reloaded);
889 client.add_model_message_handler(Self::handle_buffer_saved);
890 client.add_model_message_handler(Self::handle_update_buffer_file);
891 client.add_model_request_handler(Self::handle_save_buffer);
892 client.add_model_request_handler(Self::handle_blame_buffer);
893 client.add_model_request_handler(Self::handle_reload_buffers);
894 client.add_model_request_handler(Self::handle_get_permalink_to_line);
895 client.add_model_request_handler(Self::handle_get_staged_text);
896 client.add_model_message_handler(Self::handle_update_diff_base);
897 }
898
899 /// Creates a buffer store, optionally retaining its buffers.
900 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
901 Self {
902 state: BufferStoreState::Local(LocalBufferStore {
903 local_buffer_ids_by_path: Default::default(),
904 local_buffer_ids_by_entry_id: Default::default(),
905 worktree_store: worktree_store.clone(),
906 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
907 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
908 let this = this.as_local_mut().unwrap();
909 this.subscribe_to_worktree(worktree, cx);
910 }
911 }),
912 }),
913 downstream_client: None,
914 opened_buffers: Default::default(),
915 shared_buffers: Default::default(),
916 loading_buffers: Default::default(),
917 loading_change_sets: Default::default(),
918 worktree_store,
919 }
920 }
921
922 pub fn remote(
923 worktree_store: Model<WorktreeStore>,
924 upstream_client: AnyProtoClient,
925 remote_id: u64,
926 _cx: &mut ModelContext<Self>,
927 ) -> Self {
928 Self {
929 state: BufferStoreState::Remote(RemoteBufferStore {
930 shared_with_me: Default::default(),
931 loading_remote_buffers_by_id: Default::default(),
932 remote_buffer_listeners: Default::default(),
933 project_id: remote_id,
934 upstream_client,
935 worktree_store: worktree_store.clone(),
936 }),
937 downstream_client: None,
938 opened_buffers: Default::default(),
939 loading_buffers: Default::default(),
940 loading_change_sets: Default::default(),
941 shared_buffers: Default::default(),
942 worktree_store,
943 }
944 }
945
946 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
947 match &mut self.state {
948 BufferStoreState::Local(state) => Some(state),
949 _ => None,
950 }
951 }
952
953 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
954 match &mut self.state {
955 BufferStoreState::Remote(state) => Some(state),
956 _ => None,
957 }
958 }
959
960 fn as_remote(&self) -> Option<&RemoteBufferStore> {
961 match &self.state {
962 BufferStoreState::Remote(state) => Some(state),
963 _ => None,
964 }
965 }
966
967 pub fn open_buffer(
968 &mut self,
969 project_path: ProjectPath,
970 cx: &mut ModelContext<Self>,
971 ) -> Task<Result<Model<Buffer>>> {
972 if let Some(buffer) = self.get_by_path(&project_path, cx) {
973 return Task::ready(Ok(buffer));
974 }
975
976 let task = match self.loading_buffers.entry(project_path.clone()) {
977 hash_map::Entry::Occupied(e) => e.get().clone(),
978 hash_map::Entry::Vacant(entry) => {
979 let path = project_path.path.clone();
980 let Some(worktree) = self
981 .worktree_store
982 .read(cx)
983 .worktree_for_id(project_path.worktree_id, cx)
984 else {
985 return Task::ready(Err(anyhow!("no such worktree")));
986 };
987 let load_buffer = match &self.state {
988 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
989 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
990 };
991
992 entry
993 .insert(
994 cx.spawn(move |this, mut cx| async move {
995 let load_result = load_buffer.await;
996 this.update(&mut cx, |this, _cx| {
997 // Record the fact that the buffer is no longer loading.
998 this.loading_buffers.remove(&project_path);
999 })
1000 .ok();
1001 load_result.map_err(Arc::new)
1002 })
1003 .shared(),
1004 )
1005 .clone()
1006 }
1007 };
1008
1009 cx.background_executor()
1010 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1011 }
1012
1013 pub fn open_unstaged_changes(
1014 &mut self,
1015 buffer: Model<Buffer>,
1016 cx: &mut ModelContext<Self>,
1017 ) -> Task<Result<Model<BufferChangeSet>>> {
1018 let buffer_id = buffer.read(cx).remote_id();
1019 if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
1020 return Task::ready(Ok(change_set));
1021 }
1022
1023 let task = match self.loading_change_sets.entry(buffer_id) {
1024 hash_map::Entry::Occupied(e) => e.get().clone(),
1025 hash_map::Entry::Vacant(entry) => {
1026 let load = match &self.state {
1027 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1028 BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
1029 };
1030
1031 entry
1032 .insert(
1033 cx.spawn(move |this, cx| async move {
1034 Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
1035 .await
1036 .map_err(Arc::new)
1037 })
1038 .shared(),
1039 )
1040 .clone()
1041 }
1042 };
1043
1044 cx.background_executor()
1045 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1046 }
1047
1048 pub async fn open_unstaged_changes_internal(
1049 this: WeakModel<Self>,
1050 text: Result<Option<String>>,
1051 buffer: Model<Buffer>,
1052 mut cx: AsyncAppContext,
1053 ) -> Result<Model<BufferChangeSet>> {
1054 let text = match text {
1055 Err(e) => {
1056 this.update(&mut cx, |this, cx| {
1057 let buffer_id = buffer.read(cx).remote_id();
1058 this.loading_change_sets.remove(&buffer_id);
1059 })?;
1060 return Err(e);
1061 }
1062 Ok(text) => text,
1063 };
1064
1065 let change_set = buffer.update(&mut cx, |buffer, cx| {
1066 cx.new_model(|_| BufferChangeSet::new(buffer))
1067 })?;
1068
1069 if let Some(text) = text {
1070 change_set
1071 .update(&mut cx, |change_set, cx| {
1072 let snapshot = buffer.read(cx).text_snapshot();
1073 change_set.set_base_text(text, snapshot, cx)
1074 })?
1075 .await
1076 .ok();
1077 }
1078
1079 this.update(&mut cx, |this, cx| {
1080 let buffer_id = buffer.read(cx).remote_id();
1081 this.loading_change_sets.remove(&buffer_id);
1082 if let Some(OpenBuffer::Complete {
1083 unstaged_changes, ..
1084 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1085 {
1086 *unstaged_changes = Some(change_set.downgrade());
1087 }
1088 })?;
1089
1090 Ok(change_set)
1091 }
1092
1093 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1094 match &self.state {
1095 BufferStoreState::Local(this) => this.create_buffer(cx),
1096 BufferStoreState::Remote(this) => this.create_buffer(cx),
1097 }
1098 }
1099
1100 pub fn save_buffer(
1101 &mut self,
1102 buffer: Model<Buffer>,
1103 cx: &mut ModelContext<Self>,
1104 ) -> Task<Result<()>> {
1105 match &mut self.state {
1106 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1107 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1108 }
1109 }
1110
1111 pub fn save_buffer_as(
1112 &mut self,
1113 buffer: Model<Buffer>,
1114 path: ProjectPath,
1115 cx: &mut ModelContext<Self>,
1116 ) -> Task<Result<()>> {
1117 let old_file = buffer.read(cx).file().cloned();
1118 let task = match &self.state {
1119 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1120 BufferStoreState::Remote(this) => {
1121 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1122 }
1123 };
1124 cx.spawn(|this, mut cx| async move {
1125 task.await?;
1126 this.update(&mut cx, |_, cx| {
1127 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1128 })
1129 })
1130 }
1131
1132 pub fn blame_buffer(
1133 &self,
1134 buffer: &Model<Buffer>,
1135 version: Option<clock::Global>,
1136 cx: &AppContext,
1137 ) -> Task<Result<Option<Blame>>> {
1138 let buffer = buffer.read(cx);
1139 let Some(file) = File::from_dyn(buffer.file()) else {
1140 return Task::ready(Err(anyhow!("buffer has no file")));
1141 };
1142
1143 match file.worktree.clone().read(cx) {
1144 Worktree::Local(worktree) => {
1145 let worktree = worktree.snapshot();
1146 let blame_params = maybe!({
1147 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1148 Some(repo_for_path) => repo_for_path,
1149 None => return Ok(None),
1150 };
1151
1152 let relative_path = repo_entry
1153 .relativize(&worktree, &file.path)
1154 .context("failed to relativize buffer path")?;
1155
1156 let repo = local_repo_entry.repo().clone();
1157
1158 let content = match version {
1159 Some(version) => buffer.rope_for_version(&version).clone(),
1160 None => buffer.as_rope().clone(),
1161 };
1162
1163 anyhow::Ok(Some((repo, relative_path, content)))
1164 });
1165
1166 cx.background_executor().spawn(async move {
1167 let Some((repo, relative_path, content)) = blame_params? else {
1168 return Ok(None);
1169 };
1170 repo.blame(&relative_path, content)
1171 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1172 .map(Some)
1173 })
1174 }
1175 Worktree::Remote(worktree) => {
1176 let buffer_id = buffer.remote_id();
1177 let version = buffer.version();
1178 let project_id = worktree.project_id();
1179 let client = worktree.client();
1180 cx.spawn(|_| async move {
1181 let response = client
1182 .request(proto::BlameBuffer {
1183 project_id,
1184 buffer_id: buffer_id.into(),
1185 version: serialize_version(&version),
1186 })
1187 .await?;
1188 Ok(deserialize_blame_buffer_response(response))
1189 })
1190 }
1191 }
1192 }
1193
1194 pub fn get_permalink_to_line(
1195 &self,
1196 buffer: &Model<Buffer>,
1197 selection: Range<u32>,
1198 cx: &AppContext,
1199 ) -> Task<Result<url::Url>> {
1200 let buffer = buffer.read(cx);
1201 let Some(file) = File::from_dyn(buffer.file()) else {
1202 return Task::ready(Err(anyhow!("buffer has no file")));
1203 };
1204
1205 match file.worktree.clone().read(cx) {
1206 Worktree::Local(worktree) => {
1207 let Some(repo) = worktree.local_git_repo(file.path()) else {
1208 return Task::ready(Err(anyhow!("no repository for buffer found")));
1209 };
1210
1211 let path = file.path().clone();
1212
1213 cx.spawn(|cx| async move {
1214 const REMOTE_NAME: &str = "origin";
1215 let origin_url = repo
1216 .remote_url(REMOTE_NAME)
1217 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1218
1219 let sha = repo
1220 .head_sha()
1221 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1222
1223 let provider_registry =
1224 cx.update(GitHostingProviderRegistry::default_global)?;
1225
1226 let (provider, remote) =
1227 parse_git_remote_url(provider_registry, &origin_url)
1228 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1229
1230 let path = path
1231 .to_str()
1232 .context("failed to convert buffer path to string")?;
1233
1234 Ok(provider.build_permalink(
1235 remote,
1236 BuildPermalinkParams {
1237 sha: &sha,
1238 path,
1239 selection: Some(selection),
1240 },
1241 ))
1242 })
1243 }
1244 Worktree::Remote(worktree) => {
1245 let buffer_id = buffer.remote_id();
1246 let project_id = worktree.project_id();
1247 let client = worktree.client();
1248 cx.spawn(|_| async move {
1249 let response = client
1250 .request(proto::GetPermalinkToLine {
1251 project_id,
1252 buffer_id: buffer_id.into(),
1253 selection: Some(proto::Range {
1254 start: selection.start as u64,
1255 end: selection.end as u64,
1256 }),
1257 })
1258 .await?;
1259
1260 url::Url::parse(&response.permalink).context("failed to parse permalink")
1261 })
1262 }
1263 }
1264 }
1265
1266 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1267 let remote_id = buffer.read(cx).remote_id();
1268 let is_remote = buffer.read(cx).replica_id() != 0;
1269 let open_buffer = OpenBuffer::Complete {
1270 buffer: buffer.downgrade(),
1271 unstaged_changes: None,
1272 };
1273
1274 let handle = cx.handle().downgrade();
1275 buffer.update(cx, move |_, cx| {
1276 cx.on_release(move |buffer, cx| {
1277 handle
1278 .update(cx, |_, cx| {
1279 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1280 })
1281 .ok();
1282 })
1283 .detach()
1284 });
1285
1286 match self.opened_buffers.entry(remote_id) {
1287 hash_map::Entry::Vacant(entry) => {
1288 entry.insert(open_buffer);
1289 }
1290 hash_map::Entry::Occupied(mut entry) => {
1291 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1292 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1293 } else if entry.get().upgrade().is_some() {
1294 if is_remote {
1295 return Ok(());
1296 } else {
1297 debug_panic!("buffer {} was already registered", remote_id);
1298 Err(anyhow!("buffer {} was already registered", remote_id))?;
1299 }
1300 }
1301 entry.insert(open_buffer);
1302 }
1303 }
1304
1305 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1306 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1307 Ok(())
1308 }
1309
1310 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1311 self.opened_buffers
1312 .values()
1313 .filter_map(|buffer| buffer.upgrade())
1314 }
1315
1316 pub fn loading_buffers(
1317 &self,
1318 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Model<Buffer>>>)> {
1319 self.loading_buffers.iter().map(|(path, task)| {
1320 let task = task.clone();
1321 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1322 })
1323 }
1324
1325 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1326 self.buffers().find_map(|buffer| {
1327 let file = File::from_dyn(buffer.read(cx).file())?;
1328 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1329 Some(buffer)
1330 } else {
1331 None
1332 }
1333 })
1334 }
1335
1336 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1337 self.opened_buffers.get(&buffer_id)?.upgrade()
1338 }
1339
1340 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1341 self.get(buffer_id)
1342 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1343 }
1344
1345 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1346 self.get(buffer_id).or_else(|| {
1347 self.as_remote()
1348 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1349 })
1350 }
1351
1352 pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Model<BufferChangeSet>> {
1353 if let OpenBuffer::Complete {
1354 unstaged_changes, ..
1355 } = self.opened_buffers.get(&buffer_id)?
1356 {
1357 unstaged_changes.as_ref()?.upgrade()
1358 } else {
1359 None
1360 }
1361 }
1362
1363 pub fn buffer_version_info(
1364 &self,
1365 cx: &AppContext,
1366 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1367 let buffers = self
1368 .buffers()
1369 .map(|buffer| {
1370 let buffer = buffer.read(cx);
1371 proto::BufferVersion {
1372 id: buffer.remote_id().into(),
1373 version: language::proto::serialize_version(&buffer.version),
1374 }
1375 })
1376 .collect();
1377 let incomplete_buffer_ids = self
1378 .as_remote()
1379 .map(|remote| remote.incomplete_buffer_ids())
1380 .unwrap_or_default();
1381 (buffers, incomplete_buffer_ids)
1382 }
1383
1384 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1385 for open_buffer in self.opened_buffers.values_mut() {
1386 if let Some(buffer) = open_buffer.upgrade() {
1387 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1388 }
1389 }
1390
1391 for buffer in self.buffers() {
1392 buffer.update(cx, |buffer, cx| {
1393 buffer.set_capability(Capability::ReadOnly, cx)
1394 });
1395 }
1396
1397 if let Some(remote) = self.as_remote_mut() {
1398 // Wake up all futures currently waiting on a buffer to get opened,
1399 // to give them a chance to fail now that we've disconnected.
1400 remote.remote_buffer_listeners.clear()
1401 }
1402 }
1403
1404 pub fn shared(
1405 &mut self,
1406 remote_id: u64,
1407 downstream_client: AnyProtoClient,
1408 _cx: &mut AppContext,
1409 ) {
1410 self.downstream_client = Some((downstream_client, remote_id));
1411 }
1412
1413 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1414 self.downstream_client.take();
1415 self.forget_shared_buffers();
1416 }
1417
1418 pub fn discard_incomplete(&mut self) {
1419 self.opened_buffers
1420 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1421 }
1422
1423 pub fn find_search_candidates(
1424 &mut self,
1425 query: &SearchQuery,
1426 mut limit: usize,
1427 fs: Arc<dyn Fs>,
1428 cx: &mut ModelContext<Self>,
1429 ) -> Receiver<Model<Buffer>> {
1430 let (tx, rx) = smol::channel::unbounded();
1431 let mut open_buffers = HashSet::default();
1432 let mut unnamed_buffers = Vec::new();
1433 for handle in self.buffers() {
1434 let buffer = handle.read(cx);
1435 if let Some(entry_id) = buffer.entry_id(cx) {
1436 open_buffers.insert(entry_id);
1437 } else {
1438 limit = limit.saturating_sub(1);
1439 unnamed_buffers.push(handle)
1440 };
1441 }
1442
1443 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1444 let mut project_paths_rx = self
1445 .worktree_store
1446 .update(cx, |worktree_store, cx| {
1447 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1448 })
1449 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1450
1451 cx.spawn(|this, mut cx| async move {
1452 for buffer in unnamed_buffers {
1453 tx.send(buffer).await.ok();
1454 }
1455
1456 while let Some(project_paths) = project_paths_rx.next().await {
1457 let buffers = this.update(&mut cx, |this, cx| {
1458 project_paths
1459 .into_iter()
1460 .map(|project_path| this.open_buffer(project_path, cx))
1461 .collect::<Vec<_>>()
1462 })?;
1463 for buffer_task in buffers {
1464 if let Some(buffer) = buffer_task.await.log_err() {
1465 if tx.send(buffer).await.is_err() {
1466 return anyhow::Ok(());
1467 }
1468 }
1469 }
1470 }
1471 anyhow::Ok(())
1472 })
1473 .detach();
1474 rx
1475 }
1476
1477 pub fn recalculate_buffer_diffs(
1478 &mut self,
1479 buffers: Vec<Model<Buffer>>,
1480 cx: &mut ModelContext<Self>,
1481 ) -> impl Future<Output = ()> {
1482 let mut futures = Vec::new();
1483 for buffer in buffers {
1484 let buffer = buffer.read(cx).text_snapshot();
1485 if let Some(OpenBuffer::Complete {
1486 unstaged_changes, ..
1487 }) = self.opened_buffers.get_mut(&buffer.remote_id())
1488 {
1489 if let Some(unstaged_changes) = unstaged_changes
1490 .as_ref()
1491 .and_then(|changes| changes.upgrade())
1492 {
1493 unstaged_changes.update(cx, |unstaged_changes, cx| {
1494 futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
1495 });
1496 } else {
1497 unstaged_changes.take();
1498 }
1499 }
1500 }
1501 async move {
1502 futures::future::join_all(futures).await;
1503 }
1504 }
1505
1506 fn on_buffer_event(
1507 &mut self,
1508 buffer: Model<Buffer>,
1509 event: &BufferEvent,
1510 cx: &mut ModelContext<Self>,
1511 ) {
1512 match event {
1513 BufferEvent::FileHandleChanged => {
1514 if let Some(local) = self.as_local_mut() {
1515 local.buffer_changed_file(buffer, cx);
1516 }
1517 }
1518 BufferEvent::Reloaded => {
1519 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1520 return;
1521 };
1522 let buffer = buffer.read(cx);
1523 downstream_client
1524 .send(proto::BufferReloaded {
1525 project_id: *project_id,
1526 buffer_id: buffer.remote_id().to_proto(),
1527 version: serialize_version(&buffer.version()),
1528 mtime: buffer.saved_mtime().map(|t| t.into()),
1529 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1530 })
1531 .log_err();
1532 }
1533 _ => {}
1534 }
1535 }
1536
1537 pub async fn handle_update_buffer(
1538 this: Model<Self>,
1539 envelope: TypedEnvelope<proto::UpdateBuffer>,
1540 mut cx: AsyncAppContext,
1541 ) -> Result<proto::Ack> {
1542 let payload = envelope.payload.clone();
1543 let buffer_id = BufferId::new(payload.buffer_id)?;
1544 let ops = payload
1545 .operations
1546 .into_iter()
1547 .map(language::proto::deserialize_operation)
1548 .collect::<Result<Vec<_>, _>>()?;
1549 this.update(&mut cx, |this, cx| {
1550 match this.opened_buffers.entry(buffer_id) {
1551 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1552 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1553 OpenBuffer::Complete { buffer, .. } => {
1554 if let Some(buffer) = buffer.upgrade() {
1555 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1556 }
1557 }
1558 },
1559 hash_map::Entry::Vacant(e) => {
1560 e.insert(OpenBuffer::Operations(ops));
1561 }
1562 }
1563 Ok(proto::Ack {})
1564 })?
1565 }
1566
1567 pub fn handle_synchronize_buffers(
1568 &mut self,
1569 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1570 cx: &mut ModelContext<Self>,
1571 client: Arc<Client>,
1572 ) -> Result<proto::SynchronizeBuffersResponse> {
1573 let project_id = envelope.payload.project_id;
1574 let mut response = proto::SynchronizeBuffersResponse {
1575 buffers: Default::default(),
1576 };
1577 let Some(guest_id) = envelope.original_sender_id else {
1578 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1579 };
1580
1581 self.shared_buffers.entry(guest_id).or_default().clear();
1582 for buffer in envelope.payload.buffers {
1583 let buffer_id = BufferId::new(buffer.id)?;
1584 let remote_version = language::proto::deserialize_version(&buffer.version);
1585 if let Some(buffer) = self.get(buffer_id) {
1586 self.shared_buffers
1587 .entry(guest_id)
1588 .or_default()
1589 .entry(buffer_id)
1590 .or_insert_with(|| SharedBuffer {
1591 buffer: buffer.clone(),
1592 unstaged_changes: None,
1593 });
1594
1595 let buffer = buffer.read(cx);
1596 response.buffers.push(proto::BufferVersion {
1597 id: buffer_id.into(),
1598 version: language::proto::serialize_version(&buffer.version),
1599 });
1600
1601 let operations = buffer.serialize_ops(Some(remote_version), cx);
1602 let client = client.clone();
1603 if let Some(file) = buffer.file() {
1604 client
1605 .send(proto::UpdateBufferFile {
1606 project_id,
1607 buffer_id: buffer_id.into(),
1608 file: Some(file.to_proto(cx)),
1609 })
1610 .log_err();
1611 }
1612
1613 // todo!(max): do something
1614 // client
1615 // .send(proto::UpdateStagedText {
1616 // project_id,
1617 // buffer_id: buffer_id.into(),
1618 // diff_base: buffer.diff_base().map(ToString::to_string),
1619 // })
1620 // .log_err();
1621
1622 client
1623 .send(proto::BufferReloaded {
1624 project_id,
1625 buffer_id: buffer_id.into(),
1626 version: language::proto::serialize_version(buffer.saved_version()),
1627 mtime: buffer.saved_mtime().map(|time| time.into()),
1628 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1629 as i32,
1630 })
1631 .log_err();
1632
1633 cx.background_executor()
1634 .spawn(
1635 async move {
1636 let operations = operations.await;
1637 for chunk in split_operations(operations) {
1638 client
1639 .request(proto::UpdateBuffer {
1640 project_id,
1641 buffer_id: buffer_id.into(),
1642 operations: chunk,
1643 })
1644 .await?;
1645 }
1646 anyhow::Ok(())
1647 }
1648 .log_err(),
1649 )
1650 .detach();
1651 }
1652 }
1653 Ok(response)
1654 }
1655
1656 pub fn handle_create_buffer_for_peer(
1657 &mut self,
1658 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1659 replica_id: u16,
1660 capability: Capability,
1661 cx: &mut ModelContext<Self>,
1662 ) -> Result<()> {
1663 let Some(remote) = self.as_remote_mut() else {
1664 return Err(anyhow!("buffer store is not a remote"));
1665 };
1666
1667 if let Some(buffer) =
1668 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1669 {
1670 self.add_buffer(buffer, cx)?;
1671 }
1672
1673 Ok(())
1674 }
1675
1676 pub async fn handle_update_buffer_file(
1677 this: Model<Self>,
1678 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1679 mut cx: AsyncAppContext,
1680 ) -> Result<()> {
1681 let buffer_id = envelope.payload.buffer_id;
1682 let buffer_id = BufferId::new(buffer_id)?;
1683
1684 this.update(&mut cx, |this, cx| {
1685 let payload = envelope.payload.clone();
1686 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1687 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1688 let worktree = this
1689 .worktree_store
1690 .read(cx)
1691 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1692 .ok_or_else(|| anyhow!("no such worktree"))?;
1693 let file = File::from_proto(file, worktree, cx)?;
1694 let old_file = buffer.update(cx, |buffer, cx| {
1695 let old_file = buffer.file().cloned();
1696 let new_path = file.path.clone();
1697 buffer.file_updated(Arc::new(file), cx);
1698 if old_file
1699 .as_ref()
1700 .map_or(true, |old| *old.path() != new_path)
1701 {
1702 Some(old_file)
1703 } else {
1704 None
1705 }
1706 });
1707 if let Some(old_file) = old_file {
1708 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1709 }
1710 }
1711 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1712 downstream_client
1713 .send(proto::UpdateBufferFile {
1714 project_id: *project_id,
1715 buffer_id: buffer_id.into(),
1716 file: envelope.payload.file,
1717 })
1718 .log_err();
1719 }
1720 Ok(())
1721 })?
1722 }
1723
1724 pub async fn handle_save_buffer(
1725 this: Model<Self>,
1726 envelope: TypedEnvelope<proto::SaveBuffer>,
1727 mut cx: AsyncAppContext,
1728 ) -> Result<proto::BufferSaved> {
1729 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1730 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1731 anyhow::Ok((
1732 this.get_existing(buffer_id)?,
1733 this.downstream_client
1734 .as_ref()
1735 .map(|(_, project_id)| *project_id)
1736 .context("project is not shared")?,
1737 ))
1738 })??;
1739 buffer
1740 .update(&mut cx, |buffer, _| {
1741 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1742 })?
1743 .await?;
1744 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1745
1746 if let Some(new_path) = envelope.payload.new_path {
1747 let new_path = ProjectPath::from_proto(new_path);
1748 this.update(&mut cx, |this, cx| {
1749 this.save_buffer_as(buffer.clone(), new_path, cx)
1750 })?
1751 .await?;
1752 } else {
1753 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1754 .await?;
1755 }
1756
1757 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1758 project_id,
1759 buffer_id: buffer_id.into(),
1760 version: serialize_version(buffer.saved_version()),
1761 mtime: buffer.saved_mtime().map(|time| time.into()),
1762 })
1763 }
1764
1765 pub async fn handle_close_buffer(
1766 this: Model<Self>,
1767 envelope: TypedEnvelope<proto::CloseBuffer>,
1768 mut cx: AsyncAppContext,
1769 ) -> Result<()> {
1770 let peer_id = envelope.sender_id;
1771 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1772 this.update(&mut cx, |this, _| {
1773 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1774 if shared.remove(&buffer_id).is_some() {
1775 if shared.is_empty() {
1776 this.shared_buffers.remove(&peer_id);
1777 }
1778 return;
1779 }
1780 }
1781 debug_panic!(
1782 "peer_id {} closed buffer_id {} which was either not open or already closed",
1783 peer_id,
1784 buffer_id
1785 )
1786 })
1787 }
1788
1789 pub async fn handle_buffer_saved(
1790 this: Model<Self>,
1791 envelope: TypedEnvelope<proto::BufferSaved>,
1792 mut cx: AsyncAppContext,
1793 ) -> Result<()> {
1794 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1795 let version = deserialize_version(&envelope.payload.version);
1796 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1797 this.update(&mut cx, move |this, cx| {
1798 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1799 buffer.update(cx, |buffer, cx| {
1800 buffer.did_save(version, mtime, cx);
1801 });
1802 }
1803
1804 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1805 downstream_client
1806 .send(proto::BufferSaved {
1807 project_id: *project_id,
1808 buffer_id: buffer_id.into(),
1809 mtime: envelope.payload.mtime,
1810 version: envelope.payload.version,
1811 })
1812 .log_err();
1813 }
1814 })
1815 }
1816
1817 pub async fn handle_buffer_reloaded(
1818 this: Model<Self>,
1819 envelope: TypedEnvelope<proto::BufferReloaded>,
1820 mut cx: AsyncAppContext,
1821 ) -> Result<()> {
1822 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1823 let version = deserialize_version(&envelope.payload.version);
1824 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1825 let line_ending = deserialize_line_ending(
1826 proto::LineEnding::from_i32(envelope.payload.line_ending)
1827 .ok_or_else(|| anyhow!("missing line ending"))?,
1828 );
1829 this.update(&mut cx, |this, cx| {
1830 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1831 buffer.update(cx, |buffer, cx| {
1832 buffer.did_reload(version, line_ending, mtime, cx);
1833 });
1834 }
1835
1836 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1837 downstream_client
1838 .send(proto::BufferReloaded {
1839 project_id: *project_id,
1840 buffer_id: buffer_id.into(),
1841 mtime: envelope.payload.mtime,
1842 version: envelope.payload.version,
1843 line_ending: envelope.payload.line_ending,
1844 })
1845 .log_err();
1846 }
1847 })
1848 }
1849
1850 pub async fn handle_blame_buffer(
1851 this: Model<Self>,
1852 envelope: TypedEnvelope<proto::BlameBuffer>,
1853 mut cx: AsyncAppContext,
1854 ) -> Result<proto::BlameBufferResponse> {
1855 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1856 let version = deserialize_version(&envelope.payload.version);
1857 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1858 buffer
1859 .update(&mut cx, |buffer, _| {
1860 buffer.wait_for_version(version.clone())
1861 })?
1862 .await?;
1863 let blame = this
1864 .update(&mut cx, |this, cx| {
1865 this.blame_buffer(&buffer, Some(version), cx)
1866 })?
1867 .await?;
1868 Ok(serialize_blame_buffer_response(blame))
1869 }
1870
1871 pub async fn handle_get_permalink_to_line(
1872 this: Model<Self>,
1873 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1874 mut cx: AsyncAppContext,
1875 ) -> Result<proto::GetPermalinkToLineResponse> {
1876 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1877 // let version = deserialize_version(&envelope.payload.version);
1878 let selection = {
1879 let proto_selection = envelope
1880 .payload
1881 .selection
1882 .context("no selection to get permalink for defined")?;
1883 proto_selection.start as u32..proto_selection.end as u32
1884 };
1885 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1886 let permalink = this
1887 .update(&mut cx, |this, cx| {
1888 this.get_permalink_to_line(&buffer, selection, cx)
1889 })?
1890 .await?;
1891 Ok(proto::GetPermalinkToLineResponse {
1892 permalink: permalink.to_string(),
1893 })
1894 }
1895
1896 pub async fn handle_get_staged_text(
1897 this: Model<Self>,
1898 request: TypedEnvelope<proto::GetStagedText>,
1899 mut cx: AsyncAppContext,
1900 ) -> Result<proto::GetStagedTextResponse> {
1901 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1902 let change_set = this
1903 .update(&mut cx, |this, cx| {
1904 let buffer = this.get(buffer_id)?;
1905 Some(this.open_unstaged_changes(buffer, cx))
1906 })?
1907 .ok_or_else(|| anyhow!("no such buffer"))?
1908 .await?;
1909 this.update(&mut cx, |this, _| {
1910 let shared_buffers = this
1911 .shared_buffers
1912 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1913 .or_default();
1914 debug_assert!(shared_buffers.contains_key(&buffer_id));
1915 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
1916 shared.unstaged_changes = Some(change_set.clone());
1917 }
1918 })?;
1919 let staged_text = change_set.read_with(&cx, |change_set, cx| {
1920 change_set
1921 .base_text
1922 .as_ref()
1923 .map(|buffer| buffer.read(cx).text())
1924 })?;
1925 Ok(proto::GetStagedTextResponse { staged_text })
1926 }
1927
1928 pub async fn handle_update_diff_base(
1929 this: Model<Self>,
1930 request: TypedEnvelope<proto::UpdateDiffBase>,
1931 mut cx: AsyncAppContext,
1932 ) -> Result<()> {
1933 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1934 let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
1935 if let OpenBuffer::Complete {
1936 unstaged_changes,
1937 buffer,
1938 } = this.opened_buffers.get(&buffer_id)?
1939 {
1940 Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
1941 } else {
1942 None
1943 }
1944 })?
1945 else {
1946 return Ok(());
1947 };
1948 change_set.update(&mut cx, |change_set, cx| {
1949 if let Some(staged_text) = request.payload.staged_text {
1950 let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
1951 } else {
1952 change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
1953 }
1954 })?;
1955 Ok(())
1956 }
1957
1958 pub fn reload_buffers(
1959 &self,
1960 buffers: HashSet<Model<Buffer>>,
1961 push_to_history: bool,
1962 cx: &mut ModelContext<Self>,
1963 ) -> Task<Result<ProjectTransaction>> {
1964 if buffers.is_empty() {
1965 return Task::ready(Ok(ProjectTransaction::default()));
1966 }
1967 match &self.state {
1968 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1969 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1970 }
1971 }
1972
1973 async fn handle_reload_buffers(
1974 this: Model<Self>,
1975 envelope: TypedEnvelope<proto::ReloadBuffers>,
1976 mut cx: AsyncAppContext,
1977 ) -> Result<proto::ReloadBuffersResponse> {
1978 let sender_id = envelope.original_sender_id().unwrap_or_default();
1979 let reload = this.update(&mut cx, |this, cx| {
1980 let mut buffers = HashSet::default();
1981 for buffer_id in &envelope.payload.buffer_ids {
1982 let buffer_id = BufferId::new(*buffer_id)?;
1983 buffers.insert(this.get_existing(buffer_id)?);
1984 }
1985 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1986 })??;
1987
1988 let project_transaction = reload.await?;
1989 let project_transaction = this.update(&mut cx, |this, cx| {
1990 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1991 })?;
1992 Ok(proto::ReloadBuffersResponse {
1993 transaction: Some(project_transaction),
1994 })
1995 }
1996
1997 pub fn create_buffer_for_peer(
1998 &mut self,
1999 buffer: &Model<Buffer>,
2000 peer_id: proto::PeerId,
2001 cx: &mut ModelContext<Self>,
2002 ) -> Task<Result<()>> {
2003 let buffer_id = buffer.read(cx).remote_id();
2004 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2005 if shared_buffers.contains_key(&buffer_id) {
2006 return Task::ready(Ok(()));
2007 }
2008 shared_buffers.insert(
2009 buffer_id,
2010 SharedBuffer {
2011 buffer: buffer.clone(),
2012 unstaged_changes: None,
2013 },
2014 );
2015
2016 let Some((client, project_id)) = self.downstream_client.clone() else {
2017 return Task::ready(Ok(()));
2018 };
2019
2020 cx.spawn(|this, mut cx| async move {
2021 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2022 return anyhow::Ok(());
2023 };
2024
2025 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2026 let operations = operations.await;
2027 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2028
2029 let initial_state = proto::CreateBufferForPeer {
2030 project_id,
2031 peer_id: Some(peer_id),
2032 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2033 };
2034
2035 if client.send(initial_state).log_err().is_some() {
2036 let client = client.clone();
2037 cx.background_executor()
2038 .spawn(async move {
2039 let mut chunks = split_operations(operations).peekable();
2040 while let Some(chunk) = chunks.next() {
2041 let is_last = chunks.peek().is_none();
2042 client.send(proto::CreateBufferForPeer {
2043 project_id,
2044 peer_id: Some(peer_id),
2045 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2046 proto::BufferChunk {
2047 buffer_id: buffer_id.into(),
2048 operations: chunk,
2049 is_last,
2050 },
2051 )),
2052 })?;
2053 }
2054 anyhow::Ok(())
2055 })
2056 .await
2057 .log_err();
2058 }
2059 Ok(())
2060 })
2061 }
2062
2063 pub fn forget_shared_buffers(&mut self) {
2064 self.shared_buffers.clear();
2065 }
2066
2067 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2068 self.shared_buffers.remove(peer_id);
2069 }
2070
2071 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2072 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2073 self.shared_buffers.insert(new_peer_id, buffers);
2074 }
2075 }
2076
2077 pub fn has_shared_buffers(&self) -> bool {
2078 !self.shared_buffers.is_empty()
2079 }
2080
2081 pub fn create_local_buffer(
2082 &mut self,
2083 text: &str,
2084 language: Option<Arc<Language>>,
2085 cx: &mut ModelContext<Self>,
2086 ) -> Model<Buffer> {
2087 let buffer = cx.new_model(|cx| {
2088 Buffer::local(text, cx)
2089 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2090 });
2091
2092 self.add_buffer(buffer.clone(), cx).log_err();
2093 let buffer_id = buffer.read(cx).remote_id();
2094
2095 let this = self
2096 .as_local_mut()
2097 .expect("local-only method called in a non-local context");
2098 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2099 this.local_buffer_ids_by_path.insert(
2100 ProjectPath {
2101 worktree_id: file.worktree_id(cx),
2102 path: file.path.clone(),
2103 },
2104 buffer_id,
2105 );
2106
2107 if let Some(entry_id) = file.entry_id {
2108 this.local_buffer_ids_by_entry_id
2109 .insert(entry_id, buffer_id);
2110 }
2111 }
2112 buffer
2113 }
2114
2115 pub fn deserialize_project_transaction(
2116 &mut self,
2117 message: proto::ProjectTransaction,
2118 push_to_history: bool,
2119 cx: &mut ModelContext<Self>,
2120 ) -> Task<Result<ProjectTransaction>> {
2121 if let Some(this) = self.as_remote_mut() {
2122 this.deserialize_project_transaction(message, push_to_history, cx)
2123 } else {
2124 debug_panic!("not a remote buffer store");
2125 Task::ready(Err(anyhow!("not a remote buffer store")))
2126 }
2127 }
2128
2129 pub fn wait_for_remote_buffer(
2130 &mut self,
2131 id: BufferId,
2132 cx: &mut ModelContext<BufferStore>,
2133 ) -> Task<Result<Model<Buffer>>> {
2134 if let Some(this) = self.as_remote_mut() {
2135 this.wait_for_remote_buffer(id, cx)
2136 } else {
2137 debug_panic!("not a remote buffer store");
2138 Task::ready(Err(anyhow!("not a remote buffer store")))
2139 }
2140 }
2141
2142 pub fn serialize_project_transaction_for_peer(
2143 &mut self,
2144 project_transaction: ProjectTransaction,
2145 peer_id: proto::PeerId,
2146 cx: &mut ModelContext<Self>,
2147 ) -> proto::ProjectTransaction {
2148 let mut serialized_transaction = proto::ProjectTransaction {
2149 buffer_ids: Default::default(),
2150 transactions: Default::default(),
2151 };
2152 for (buffer, transaction) in project_transaction.0 {
2153 self.create_buffer_for_peer(&buffer, peer_id, cx)
2154 .detach_and_log_err(cx);
2155 serialized_transaction
2156 .buffer_ids
2157 .push(buffer.read(cx).remote_id().into());
2158 serialized_transaction
2159 .transactions
2160 .push(language::proto::serialize_transaction(&transaction));
2161 }
2162 serialized_transaction
2163 }
2164}
2165
2166impl BufferChangeSet {
2167 pub fn new(buffer: &text::BufferSnapshot) -> Self {
2168 Self {
2169 buffer_id: buffer.remote_id(),
2170 base_text: None,
2171 diff_to_buffer: git::diff::BufferDiff::new(buffer),
2172 recalculate_diff_task: None,
2173 diff_updated_futures: Vec::new(),
2174 base_text_version: 0,
2175 }
2176 }
2177
2178 #[cfg(any(test, feature = "test-support"))]
2179 pub fn new_with_base_text(
2180 base_text: String,
2181 buffer: text::BufferSnapshot,
2182 cx: &mut ModelContext<Self>,
2183 ) -> Self {
2184 let mut this = Self::new(&buffer);
2185 let _ = this.set_base_text(base_text, buffer, cx);
2186 this
2187 }
2188
2189 pub fn diff_hunks_intersecting_range<'a>(
2190 &'a self,
2191 range: Range<text::Anchor>,
2192 buffer_snapshot: &'a text::BufferSnapshot,
2193 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2194 self.diff_to_buffer
2195 .hunks_intersecting_range(range, buffer_snapshot)
2196 }
2197
2198 pub fn diff_hunks_intersecting_range_rev<'a>(
2199 &'a self,
2200 range: Range<text::Anchor>,
2201 buffer_snapshot: &'a text::BufferSnapshot,
2202 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2203 self.diff_to_buffer
2204 .hunks_intersecting_range_rev(range, buffer_snapshot)
2205 }
2206
2207 #[cfg(any(test, feature = "test-support"))]
2208 pub fn base_text_string(&self, cx: &AppContext) -> Option<String> {
2209 self.base_text.as_ref().map(|buffer| buffer.read(cx).text())
2210 }
2211
2212 pub fn set_base_text(
2213 &mut self,
2214 mut base_text: String,
2215 buffer_snapshot: text::BufferSnapshot,
2216 cx: &mut ModelContext<Self>,
2217 ) -> oneshot::Receiver<()> {
2218 LineEnding::normalize(&mut base_text);
2219 self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
2220 }
2221
2222 pub fn unset_base_text(
2223 &mut self,
2224 buffer_snapshot: text::BufferSnapshot,
2225 cx: &mut ModelContext<Self>,
2226 ) {
2227 if self.base_text.is_some() {
2228 self.base_text = None;
2229 self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
2230 self.recalculate_diff_task.take();
2231 self.base_text_version += 1;
2232 cx.notify();
2233 }
2234 }
2235
2236 pub fn recalculate_diff(
2237 &mut self,
2238 buffer_snapshot: text::BufferSnapshot,
2239 cx: &mut ModelContext<Self>,
2240 ) -> oneshot::Receiver<()> {
2241 if let Some(base_text) = self.base_text.clone() {
2242 self.recalculate_diff_internal(base_text.read(cx).text(), buffer_snapshot, false, cx)
2243 } else {
2244 oneshot::channel().1
2245 }
2246 }
2247
2248 fn recalculate_diff_internal(
2249 &mut self,
2250 base_text: String,
2251 buffer_snapshot: text::BufferSnapshot,
2252 base_text_changed: bool,
2253 cx: &mut ModelContext<Self>,
2254 ) -> oneshot::Receiver<()> {
2255 let (tx, rx) = oneshot::channel();
2256 self.diff_updated_futures.push(tx);
2257 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2258 let (base_text, diff) = cx
2259 .background_executor()
2260 .spawn(async move {
2261 let diff = BufferDiff::build(&base_text, &buffer_snapshot).await;
2262 (base_text, diff)
2263 })
2264 .await;
2265 this.update(&mut cx, |this, cx| {
2266 if base_text_changed {
2267 this.base_text_version += 1;
2268 this.base_text = Some(cx.new_model(|cx| {
2269 Buffer::local_normalized(Rope::from(base_text), LineEnding::default(), cx)
2270 }));
2271 }
2272 this.diff_to_buffer = diff;
2273 this.recalculate_diff_task.take();
2274 for tx in this.diff_updated_futures.drain(..) {
2275 tx.send(()).ok();
2276 }
2277 cx.notify();
2278 })?;
2279 Ok(())
2280 }));
2281 rx
2282 }
2283}
2284
2285impl OpenBuffer {
2286 fn upgrade(&self) -> Option<Model<Buffer>> {
2287 match self {
2288 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2289 OpenBuffer::Operations(_) => None,
2290 }
2291 }
2292}
2293
2294fn is_not_found_error(error: &anyhow::Error) -> bool {
2295 error
2296 .root_cause()
2297 .downcast_ref::<io::Error>()
2298 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2299}
2300
2301fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2302 let Some(blame) = blame else {
2303 return proto::BlameBufferResponse {
2304 blame_response: None,
2305 };
2306 };
2307
2308 let entries = blame
2309 .entries
2310 .into_iter()
2311 .map(|entry| proto::BlameEntry {
2312 sha: entry.sha.as_bytes().into(),
2313 start_line: entry.range.start,
2314 end_line: entry.range.end,
2315 original_line_number: entry.original_line_number,
2316 author: entry.author.clone(),
2317 author_mail: entry.author_mail.clone(),
2318 author_time: entry.author_time,
2319 author_tz: entry.author_tz.clone(),
2320 committer: entry.committer.clone(),
2321 committer_mail: entry.committer_mail.clone(),
2322 committer_time: entry.committer_time,
2323 committer_tz: entry.committer_tz.clone(),
2324 summary: entry.summary.clone(),
2325 previous: entry.previous.clone(),
2326 filename: entry.filename.clone(),
2327 })
2328 .collect::<Vec<_>>();
2329
2330 let messages = blame
2331 .messages
2332 .into_iter()
2333 .map(|(oid, message)| proto::CommitMessage {
2334 oid: oid.as_bytes().into(),
2335 message,
2336 })
2337 .collect::<Vec<_>>();
2338
2339 let permalinks = blame
2340 .permalinks
2341 .into_iter()
2342 .map(|(oid, url)| proto::CommitPermalink {
2343 oid: oid.as_bytes().into(),
2344 permalink: url.to_string(),
2345 })
2346 .collect::<Vec<_>>();
2347
2348 proto::BlameBufferResponse {
2349 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2350 entries,
2351 messages,
2352 permalinks,
2353 remote_url: blame.remote_url,
2354 }),
2355 }
2356}
2357
2358fn deserialize_blame_buffer_response(
2359 response: proto::BlameBufferResponse,
2360) -> Option<git::blame::Blame> {
2361 let response = response.blame_response?;
2362 let entries = response
2363 .entries
2364 .into_iter()
2365 .filter_map(|entry| {
2366 Some(git::blame::BlameEntry {
2367 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2368 range: entry.start_line..entry.end_line,
2369 original_line_number: entry.original_line_number,
2370 committer: entry.committer,
2371 committer_time: entry.committer_time,
2372 committer_tz: entry.committer_tz,
2373 committer_mail: entry.committer_mail,
2374 author: entry.author,
2375 author_mail: entry.author_mail,
2376 author_time: entry.author_time,
2377 author_tz: entry.author_tz,
2378 summary: entry.summary,
2379 previous: entry.previous,
2380 filename: entry.filename,
2381 })
2382 })
2383 .collect::<Vec<_>>();
2384
2385 let messages = response
2386 .messages
2387 .into_iter()
2388 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2389 .collect::<HashMap<_, _>>();
2390
2391 let permalinks = response
2392 .permalinks
2393 .into_iter()
2394 .filter_map(|permalink| {
2395 Some((
2396 git::Oid::from_bytes(&permalink.oid).ok()?,
2397 Url::from_str(&permalink.permalink).ok()?,
2398 ))
2399 })
2400 .collect::<HashMap<_, _>>();
2401
2402 Some(Blame {
2403 entries,
2404 permalinks,
2405 messages,
2406 remote_url: response.remote_url,
2407 })
2408}