1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 ProjectItem as _, ProjectPath,
5};
6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
7use anyhow::{anyhow, Context as _, Result};
8use client::Client;
9use collections::{hash_map, HashMap, HashSet};
10use fs::Fs;
11use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
12use git::{blame::Blame, diff::BufferDiff};
13use gpui::{
14 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
15 Task, WeakModel,
16};
17use http_client::Url;
18use language::{
19 proto::{
20 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
21 split_operations,
22 },
23 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
24};
25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
26use smol::channel::Receiver;
27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
28use text::{BufferId, LineEnding, Rope};
29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
31
32/// A set of open buffers.
33pub struct BufferStore {
34 state: BufferStoreState,
35 #[allow(clippy::type_complexity)]
36 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Model<Buffer>, Arc<anyhow::Error>>>>>,
37 #[allow(clippy::type_complexity)]
38 loading_change_sets:
39 HashMap<BufferId, Shared<Task<Result<Model<BufferChangeSet>, Arc<anyhow::Error>>>>>,
40 worktree_store: Model<WorktreeStore>,
41 opened_buffers: HashMap<BufferId, OpenBuffer>,
42 downstream_client: Option<(AnyProtoClient, u64)>,
43 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
44}
45
46#[derive(Hash, Eq, PartialEq, Clone)]
47struct SharedBuffer {
48 buffer: Model<Buffer>,
49 unstaged_changes: Option<Model<BufferChangeSet>>,
50}
51
52#[derive(Debug)]
53pub struct BufferChangeSet {
54 pub buffer_id: BufferId,
55 pub base_text: Option<Model<Buffer>>,
56 pub diff_to_buffer: git::diff::BufferDiff,
57 pub recalculate_diff_task: Option<Task<Result<()>>>,
58 pub diff_updated_futures: Vec<oneshot::Sender<()>>,
59 pub base_text_version: usize,
60}
61
62enum BufferStoreState {
63 Local(LocalBufferStore),
64 Remote(RemoteBufferStore),
65}
66
67struct RemoteBufferStore {
68 shared_with_me: HashSet<Model<Buffer>>,
69 upstream_client: AnyProtoClient,
70 project_id: u64,
71 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
72 remote_buffer_listeners:
73 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
74 worktree_store: Model<WorktreeStore>,
75}
76
77struct LocalBufferStore {
78 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
79 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
80 worktree_store: Model<WorktreeStore>,
81 _subscription: Subscription,
82}
83
84enum OpenBuffer {
85 Complete {
86 buffer: WeakModel<Buffer>,
87 unstaged_changes: Option<WeakModel<BufferChangeSet>>,
88 },
89 Operations(Vec<Operation>),
90}
91
92pub enum BufferStoreEvent {
93 BufferAdded(Model<Buffer>),
94 BufferDropped(BufferId),
95 BufferChangedFilePath {
96 buffer: Model<Buffer>,
97 old_file: Option<Arc<dyn language::File>>,
98 },
99}
100
101#[derive(Default, Debug)]
102pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
103
104impl EventEmitter<BufferStoreEvent> for BufferStore {}
105
106impl RemoteBufferStore {
107 fn load_staged_text(
108 &self,
109 buffer_id: BufferId,
110 cx: &AppContext,
111 ) -> Task<Result<Option<String>>> {
112 let project_id = self.project_id;
113 let client = self.upstream_client.clone();
114 cx.background_executor().spawn(async move {
115 Ok(client
116 .request(proto::GetStagedText {
117 project_id,
118 buffer_id: buffer_id.to_proto(),
119 })
120 .await?
121 .staged_text)
122 })
123 }
124 pub fn wait_for_remote_buffer(
125 &mut self,
126 id: BufferId,
127 cx: &mut ModelContext<BufferStore>,
128 ) -> Task<Result<Model<Buffer>>> {
129 let (tx, rx) = oneshot::channel();
130 self.remote_buffer_listeners.entry(id).or_default().push(tx);
131
132 cx.spawn(|this, cx| async move {
133 if let Some(buffer) = this
134 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
135 .ok()
136 .flatten()
137 {
138 return Ok(buffer);
139 }
140
141 cx.background_executor()
142 .spawn(async move { rx.await? })
143 .await
144 })
145 }
146
147 fn save_remote_buffer(
148 &self,
149 buffer_handle: Model<Buffer>,
150 new_path: Option<proto::ProjectPath>,
151 cx: &ModelContext<BufferStore>,
152 ) -> Task<Result<()>> {
153 let buffer = buffer_handle.read(cx);
154 let buffer_id = buffer.remote_id().into();
155 let version = buffer.version();
156 let rpc = self.upstream_client.clone();
157 let project_id = self.project_id;
158 cx.spawn(move |_, mut cx| async move {
159 let response = rpc
160 .request(proto::SaveBuffer {
161 project_id,
162 buffer_id,
163 new_path,
164 version: serialize_version(&version),
165 })
166 .await?;
167 let version = deserialize_version(&response.version);
168 let mtime = response.mtime.map(|mtime| mtime.into());
169
170 buffer_handle.update(&mut cx, |buffer, cx| {
171 buffer.did_save(version.clone(), mtime, cx);
172 })?;
173
174 Ok(())
175 })
176 }
177
178 pub fn handle_create_buffer_for_peer(
179 &mut self,
180 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
181 replica_id: u16,
182 capability: Capability,
183 cx: &mut ModelContext<BufferStore>,
184 ) -> Result<Option<Model<Buffer>>> {
185 match envelope
186 .payload
187 .variant
188 .ok_or_else(|| anyhow!("missing variant"))?
189 {
190 proto::create_buffer_for_peer::Variant::State(mut state) => {
191 let buffer_id = BufferId::new(state.id)?;
192
193 let buffer_result = maybe!({
194 let mut buffer_file = None;
195 if let Some(file) = state.file.take() {
196 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
197 let worktree = self
198 .worktree_store
199 .read(cx)
200 .worktree_for_id(worktree_id, cx)
201 .ok_or_else(|| {
202 anyhow!("no worktree found for id {}", file.worktree_id)
203 })?;
204 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
205 as Arc<dyn language::File>);
206 }
207 Buffer::from_proto(replica_id, capability, state, buffer_file)
208 });
209
210 match buffer_result {
211 Ok(buffer) => {
212 let buffer = cx.new_model(|_| buffer);
213 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
214 }
215 Err(error) => {
216 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
217 for listener in listeners {
218 listener.send(Err(anyhow!(error.cloned()))).ok();
219 }
220 }
221 }
222 }
223 }
224 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
225 let buffer_id = BufferId::new(chunk.buffer_id)?;
226 let buffer = self
227 .loading_remote_buffers_by_id
228 .get(&buffer_id)
229 .cloned()
230 .ok_or_else(|| {
231 anyhow!(
232 "received chunk for buffer {} without initial state",
233 chunk.buffer_id
234 )
235 })?;
236
237 let result = maybe!({
238 let operations = chunk
239 .operations
240 .into_iter()
241 .map(language::proto::deserialize_operation)
242 .collect::<Result<Vec<_>>>()?;
243 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
244 anyhow::Ok(())
245 });
246
247 if let Err(error) = result {
248 self.loading_remote_buffers_by_id.remove(&buffer_id);
249 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
250 for listener in listeners {
251 listener.send(Err(error.cloned())).ok();
252 }
253 }
254 } else if chunk.is_last {
255 self.loading_remote_buffers_by_id.remove(&buffer_id);
256 if self.upstream_client.is_via_collab() {
257 // retain buffers sent by peers to avoid races.
258 self.shared_with_me.insert(buffer.clone());
259 }
260
261 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
262 for sender in senders {
263 sender.send(Ok(buffer.clone())).ok();
264 }
265 }
266 return Ok(Some(buffer));
267 }
268 }
269 }
270 return Ok(None);
271 }
272
273 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
274 self.loading_remote_buffers_by_id
275 .keys()
276 .copied()
277 .collect::<Vec<_>>()
278 }
279
280 pub fn deserialize_project_transaction(
281 &self,
282 message: proto::ProjectTransaction,
283 push_to_history: bool,
284 cx: &mut ModelContext<BufferStore>,
285 ) -> Task<Result<ProjectTransaction>> {
286 cx.spawn(|this, mut cx| async move {
287 let mut project_transaction = ProjectTransaction::default();
288 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
289 {
290 let buffer_id = BufferId::new(buffer_id)?;
291 let buffer = this
292 .update(&mut cx, |this, cx| {
293 this.wait_for_remote_buffer(buffer_id, cx)
294 })?
295 .await?;
296 let transaction = language::proto::deserialize_transaction(transaction)?;
297 project_transaction.0.insert(buffer, transaction);
298 }
299
300 for (buffer, transaction) in &project_transaction.0 {
301 buffer
302 .update(&mut cx, |buffer, _| {
303 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
304 })?
305 .await?;
306
307 if push_to_history {
308 buffer.update(&mut cx, |buffer, _| {
309 buffer.push_transaction(transaction.clone(), Instant::now());
310 })?;
311 }
312 }
313
314 Ok(project_transaction)
315 })
316 }
317
318 fn open_buffer(
319 &self,
320 path: Arc<Path>,
321 worktree: Model<Worktree>,
322 cx: &mut ModelContext<BufferStore>,
323 ) -> Task<Result<Model<Buffer>>> {
324 let worktree_id = worktree.read(cx).id().to_proto();
325 let project_id = self.project_id;
326 let client = self.upstream_client.clone();
327 let path_string = path.clone().to_string_lossy().to_string();
328 cx.spawn(move |this, mut cx| async move {
329 let response = client
330 .request(proto::OpenBufferByPath {
331 project_id,
332 worktree_id,
333 path: path_string,
334 })
335 .await?;
336 let buffer_id = BufferId::new(response.buffer_id)?;
337
338 let buffer = this
339 .update(&mut cx, {
340 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
341 })?
342 .await?;
343
344 Ok(buffer)
345 })
346 }
347
348 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
349 let create = self.upstream_client.request(proto::OpenNewBuffer {
350 project_id: self.project_id,
351 });
352 cx.spawn(|this, mut cx| async move {
353 let response = create.await?;
354 let buffer_id = BufferId::new(response.buffer_id)?;
355
356 this.update(&mut cx, |this, cx| {
357 this.wait_for_remote_buffer(buffer_id, cx)
358 })?
359 .await
360 })
361 }
362
363 fn reload_buffers(
364 &self,
365 buffers: HashSet<Model<Buffer>>,
366 push_to_history: bool,
367 cx: &mut ModelContext<BufferStore>,
368 ) -> Task<Result<ProjectTransaction>> {
369 let request = self.upstream_client.request(proto::ReloadBuffers {
370 project_id: self.project_id,
371 buffer_ids: buffers
372 .iter()
373 .map(|buffer| buffer.read(cx).remote_id().to_proto())
374 .collect(),
375 });
376
377 cx.spawn(|this, mut cx| async move {
378 let response = request
379 .await?
380 .transaction
381 .ok_or_else(|| anyhow!("missing transaction"))?;
382 this.update(&mut cx, |this, cx| {
383 this.deserialize_project_transaction(response, push_to_history, cx)
384 })?
385 .await
386 })
387 }
388}
389
390impl LocalBufferStore {
391 fn load_staged_text(
392 &self,
393 buffer: &Model<Buffer>,
394 cx: &AppContext,
395 ) -> Task<Result<Option<String>>> {
396 let Some(file) = buffer.read(cx).file() else {
397 return Task::ready(Err(anyhow!("buffer has no file")));
398 };
399 let worktree_id = file.worktree_id(cx);
400 let path = file.path().clone();
401 let Some(worktree) = self
402 .worktree_store
403 .read(cx)
404 .worktree_for_id(worktree_id, cx)
405 else {
406 return Task::ready(Err(anyhow!("no such worktree")));
407 };
408
409 worktree.read(cx).load_staged_file(path.as_ref(), cx)
410 }
411
412 fn save_local_buffer(
413 &self,
414 buffer_handle: Model<Buffer>,
415 worktree: Model<Worktree>,
416 path: Arc<Path>,
417 mut has_changed_file: bool,
418 cx: &mut ModelContext<BufferStore>,
419 ) -> Task<Result<()>> {
420 let buffer = buffer_handle.read(cx);
421
422 let text = buffer.as_rope().clone();
423 let line_ending = buffer.line_ending();
424 let version = buffer.version();
425 let buffer_id = buffer.remote_id();
426 if buffer
427 .file()
428 .is_some_and(|file| file.disk_state() == DiskState::New)
429 {
430 has_changed_file = true;
431 }
432
433 let save = worktree.update(cx, |worktree, cx| {
434 worktree.write_file(path.as_ref(), text, line_ending, cx)
435 });
436
437 cx.spawn(move |this, mut cx| async move {
438 let new_file = save.await?;
439 let mtime = new_file.disk_state().mtime();
440 this.update(&mut cx, |this, cx| {
441 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
442 if has_changed_file {
443 downstream_client
444 .send(proto::UpdateBufferFile {
445 project_id,
446 buffer_id: buffer_id.to_proto(),
447 file: Some(language::File::to_proto(&*new_file, cx)),
448 })
449 .log_err();
450 }
451 downstream_client
452 .send(proto::BufferSaved {
453 project_id,
454 buffer_id: buffer_id.to_proto(),
455 version: serialize_version(&version),
456 mtime: mtime.map(|time| time.into()),
457 })
458 .log_err();
459 }
460 })?;
461 buffer_handle.update(&mut cx, |buffer, cx| {
462 if has_changed_file {
463 buffer.file_updated(new_file, cx);
464 }
465 buffer.did_save(version.clone(), mtime, cx);
466 })
467 })
468 }
469
470 fn subscribe_to_worktree(
471 &mut self,
472 worktree: &Model<Worktree>,
473 cx: &mut ModelContext<BufferStore>,
474 ) {
475 cx.subscribe(worktree, |this, worktree, event, cx| {
476 if worktree.read(cx).is_local() {
477 match event {
478 worktree::Event::UpdatedEntries(changes) => {
479 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
480 }
481 worktree::Event::UpdatedGitRepositories(updated_repos) => {
482 Self::local_worktree_git_repos_changed(
483 this,
484 worktree.clone(),
485 updated_repos,
486 cx,
487 )
488 }
489 _ => {}
490 }
491 }
492 })
493 .detach();
494 }
495
496 fn local_worktree_entries_changed(
497 this: &mut BufferStore,
498 worktree_handle: &Model<Worktree>,
499 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
500 cx: &mut ModelContext<BufferStore>,
501 ) {
502 let snapshot = worktree_handle.read(cx).snapshot();
503 for (path, entry_id, _) in changes {
504 Self::local_worktree_entry_changed(
505 this,
506 *entry_id,
507 path,
508 worktree_handle,
509 &snapshot,
510 cx,
511 );
512 }
513 }
514
515 fn local_worktree_git_repos_changed(
516 this: &mut BufferStore,
517 worktree_handle: Model<Worktree>,
518 changed_repos: &UpdatedGitRepositoriesSet,
519 cx: &mut ModelContext<BufferStore>,
520 ) {
521 debug_assert!(worktree_handle.read(cx).is_local());
522
523 let buffer_change_sets = this
524 .opened_buffers
525 .values()
526 .filter_map(|buffer| {
527 if let OpenBuffer::Complete {
528 buffer,
529 unstaged_changes,
530 } = buffer
531 {
532 let buffer = buffer.upgrade()?.read(cx);
533 let file = File::from_dyn(buffer.file())?;
534 if file.worktree != worktree_handle {
535 return None;
536 }
537 changed_repos
538 .iter()
539 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
540 let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
541 let snapshot = buffer.text_snapshot();
542 Some((unstaged_changes, snapshot, file.path.clone()))
543 } else {
544 None
545 }
546 })
547 .collect::<Vec<_>>();
548
549 if buffer_change_sets.is_empty() {
550 return;
551 }
552
553 cx.spawn(move |this, mut cx| async move {
554 let snapshot =
555 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
556 let diff_bases_by_buffer = cx
557 .background_executor()
558 .spawn(async move {
559 buffer_change_sets
560 .into_iter()
561 .filter_map(|(change_set, buffer_snapshot, path)| {
562 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
563 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
564 let base_text = local_repo_entry.repo().load_index_text(&relative_path);
565 Some((change_set, buffer_snapshot, base_text))
566 })
567 .collect::<Vec<_>>()
568 })
569 .await;
570
571 this.update(&mut cx, |this, cx| {
572 for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
573 change_set.update(cx, |change_set, cx| {
574 if let Some(staged_text) = staged_text.clone() {
575 let _ =
576 change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
577 } else {
578 change_set.unset_base_text(buffer_snapshot.clone(), cx);
579 }
580 });
581
582 if let Some((client, project_id)) = &this.downstream_client.clone() {
583 client
584 .send(proto::UpdateDiffBase {
585 project_id: *project_id,
586 buffer_id: buffer_snapshot.remote_id().to_proto(),
587 staged_text,
588 })
589 .log_err();
590 }
591 }
592 })
593 })
594 .detach_and_log_err(cx);
595 }
596
597 fn local_worktree_entry_changed(
598 this: &mut BufferStore,
599 entry_id: ProjectEntryId,
600 path: &Arc<Path>,
601 worktree: &Model<worktree::Worktree>,
602 snapshot: &worktree::Snapshot,
603 cx: &mut ModelContext<BufferStore>,
604 ) -> Option<()> {
605 let project_path = ProjectPath {
606 worktree_id: snapshot.id(),
607 path: path.clone(),
608 };
609
610 let buffer_id = {
611 let local = this.as_local_mut()?;
612 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
613 Some(&buffer_id) => buffer_id,
614 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
615 }
616 };
617
618 let buffer = if let Some(buffer) = this.get(buffer_id) {
619 Some(buffer)
620 } else {
621 this.opened_buffers.remove(&buffer_id);
622 None
623 };
624
625 let buffer = if let Some(buffer) = buffer {
626 buffer
627 } else {
628 let this = this.as_local_mut()?;
629 this.local_buffer_ids_by_path.remove(&project_path);
630 this.local_buffer_ids_by_entry_id.remove(&entry_id);
631 return None;
632 };
633
634 let events = buffer.update(cx, |buffer, cx| {
635 let local = this.as_local_mut()?;
636 let file = buffer.file()?;
637 let old_file = File::from_dyn(Some(file))?;
638 if old_file.worktree != *worktree {
639 return None;
640 }
641
642 let snapshot_entry = old_file
643 .entry_id
644 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
645 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
646
647 let new_file = if let Some(entry) = snapshot_entry {
648 File {
649 disk_state: match entry.mtime {
650 Some(mtime) => DiskState::Present { mtime },
651 None => old_file.disk_state,
652 },
653 is_local: true,
654 entry_id: Some(entry.id),
655 path: entry.path.clone(),
656 worktree: worktree.clone(),
657 is_private: entry.is_private,
658 }
659 } else {
660 File {
661 disk_state: DiskState::Deleted,
662 is_local: true,
663 entry_id: old_file.entry_id,
664 path: old_file.path.clone(),
665 worktree: worktree.clone(),
666 is_private: old_file.is_private,
667 }
668 };
669
670 if new_file == *old_file {
671 return None;
672 }
673
674 let mut events = Vec::new();
675 if new_file.path != old_file.path {
676 local.local_buffer_ids_by_path.remove(&ProjectPath {
677 path: old_file.path.clone(),
678 worktree_id: old_file.worktree_id(cx),
679 });
680 local.local_buffer_ids_by_path.insert(
681 ProjectPath {
682 worktree_id: new_file.worktree_id(cx),
683 path: new_file.path.clone(),
684 },
685 buffer_id,
686 );
687 events.push(BufferStoreEvent::BufferChangedFilePath {
688 buffer: cx.handle(),
689 old_file: buffer.file().cloned(),
690 });
691 }
692
693 if new_file.entry_id != old_file.entry_id {
694 if let Some(entry_id) = old_file.entry_id {
695 local.local_buffer_ids_by_entry_id.remove(&entry_id);
696 }
697 if let Some(entry_id) = new_file.entry_id {
698 local
699 .local_buffer_ids_by_entry_id
700 .insert(entry_id, buffer_id);
701 }
702 }
703
704 if let Some((client, project_id)) = &this.downstream_client {
705 client
706 .send(proto::UpdateBufferFile {
707 project_id: *project_id,
708 buffer_id: buffer_id.to_proto(),
709 file: Some(new_file.to_proto(cx)),
710 })
711 .ok();
712 }
713
714 buffer.file_updated(Arc::new(new_file), cx);
715 Some(events)
716 })?;
717
718 for event in events {
719 cx.emit(event);
720 }
721
722 None
723 }
724
725 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
726 let file = File::from_dyn(buffer.read(cx).file())?;
727
728 let remote_id = buffer.read(cx).remote_id();
729 if let Some(entry_id) = file.entry_id {
730 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
731 Some(_) => {
732 return None;
733 }
734 None => {
735 self.local_buffer_ids_by_entry_id
736 .insert(entry_id, remote_id);
737 }
738 }
739 };
740 self.local_buffer_ids_by_path.insert(
741 ProjectPath {
742 worktree_id: file.worktree_id(cx),
743 path: file.path.clone(),
744 },
745 remote_id,
746 );
747
748 Some(())
749 }
750
751 fn save_buffer(
752 &self,
753 buffer: Model<Buffer>,
754 cx: &mut ModelContext<BufferStore>,
755 ) -> Task<Result<()>> {
756 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
757 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
758 };
759 let worktree = file.worktree.clone();
760 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
761 }
762
763 fn save_buffer_as(
764 &self,
765 buffer: Model<Buffer>,
766 path: ProjectPath,
767 cx: &mut ModelContext<BufferStore>,
768 ) -> Task<Result<()>> {
769 let Some(worktree) = self
770 .worktree_store
771 .read(cx)
772 .worktree_for_id(path.worktree_id, cx)
773 else {
774 return Task::ready(Err(anyhow!("no such worktree")));
775 };
776 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
777 }
778
779 fn open_buffer(
780 &self,
781 path: Arc<Path>,
782 worktree: Model<Worktree>,
783 cx: &mut ModelContext<BufferStore>,
784 ) -> Task<Result<Model<Buffer>>> {
785 let load_buffer = worktree.update(cx, |worktree, cx| {
786 let load_file = worktree.load_file(path.as_ref(), cx);
787 let reservation = cx.reserve_model();
788 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
789 cx.spawn(move |_, mut cx| async move {
790 let loaded = load_file.await?;
791 let text_buffer = cx
792 .background_executor()
793 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
794 .await;
795 cx.insert_model(reservation, |_| {
796 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
797 })
798 })
799 });
800
801 cx.spawn(move |this, mut cx| async move {
802 let buffer = match load_buffer.await {
803 Ok(buffer) => Ok(buffer),
804 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
805 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
806 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
807 Buffer::build(
808 text_buffer,
809 Some(Arc::new(File {
810 worktree,
811 path,
812 disk_state: DiskState::New,
813 entry_id: None,
814 is_local: true,
815 is_private: false,
816 })),
817 Capability::ReadWrite,
818 )
819 }),
820 Err(e) => Err(e),
821 }?;
822 this.update(&mut cx, |this, cx| {
823 this.add_buffer(buffer.clone(), cx)?;
824 let buffer_id = buffer.read(cx).remote_id();
825 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
826 let this = this.as_local_mut().unwrap();
827 this.local_buffer_ids_by_path.insert(
828 ProjectPath {
829 worktree_id: file.worktree_id(cx),
830 path: file.path.clone(),
831 },
832 buffer_id,
833 );
834
835 if let Some(entry_id) = file.entry_id {
836 this.local_buffer_ids_by_entry_id
837 .insert(entry_id, buffer_id);
838 }
839 }
840
841 anyhow::Ok(())
842 })??;
843
844 Ok(buffer)
845 })
846 }
847
848 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
849 cx.spawn(|buffer_store, mut cx| async move {
850 let buffer = cx.new_model(|cx| {
851 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
852 })?;
853 buffer_store.update(&mut cx, |buffer_store, cx| {
854 buffer_store.add_buffer(buffer.clone(), cx).log_err();
855 })?;
856 Ok(buffer)
857 })
858 }
859
860 fn reload_buffers(
861 &self,
862 buffers: HashSet<Model<Buffer>>,
863 push_to_history: bool,
864 cx: &mut ModelContext<BufferStore>,
865 ) -> Task<Result<ProjectTransaction>> {
866 cx.spawn(move |_, mut cx| async move {
867 let mut project_transaction = ProjectTransaction::default();
868 for buffer in buffers {
869 let transaction = buffer
870 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
871 .await?;
872 buffer.update(&mut cx, |buffer, cx| {
873 if let Some(transaction) = transaction {
874 if !push_to_history {
875 buffer.forget_transaction(transaction.id);
876 }
877 project_transaction.0.insert(cx.handle(), transaction);
878 }
879 })?;
880 }
881
882 Ok(project_transaction)
883 })
884 }
885}
886
887impl BufferStore {
888 pub fn init(client: &AnyProtoClient) {
889 client.add_model_message_handler(Self::handle_buffer_reloaded);
890 client.add_model_message_handler(Self::handle_buffer_saved);
891 client.add_model_message_handler(Self::handle_update_buffer_file);
892 client.add_model_request_handler(Self::handle_save_buffer);
893 client.add_model_request_handler(Self::handle_blame_buffer);
894 client.add_model_request_handler(Self::handle_reload_buffers);
895 client.add_model_request_handler(Self::handle_get_permalink_to_line);
896 client.add_model_request_handler(Self::handle_get_staged_text);
897 client.add_model_message_handler(Self::handle_update_diff_base);
898 }
899
900 /// Creates a buffer store, optionally retaining its buffers.
901 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
902 Self {
903 state: BufferStoreState::Local(LocalBufferStore {
904 local_buffer_ids_by_path: Default::default(),
905 local_buffer_ids_by_entry_id: Default::default(),
906 worktree_store: worktree_store.clone(),
907 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
908 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
909 let this = this.as_local_mut().unwrap();
910 this.subscribe_to_worktree(worktree, cx);
911 }
912 }),
913 }),
914 downstream_client: None,
915 opened_buffers: Default::default(),
916 shared_buffers: Default::default(),
917 loading_buffers: Default::default(),
918 loading_change_sets: Default::default(),
919 worktree_store,
920 }
921 }
922
923 pub fn remote(
924 worktree_store: Model<WorktreeStore>,
925 upstream_client: AnyProtoClient,
926 remote_id: u64,
927 _cx: &mut ModelContext<Self>,
928 ) -> Self {
929 Self {
930 state: BufferStoreState::Remote(RemoteBufferStore {
931 shared_with_me: Default::default(),
932 loading_remote_buffers_by_id: Default::default(),
933 remote_buffer_listeners: Default::default(),
934 project_id: remote_id,
935 upstream_client,
936 worktree_store: worktree_store.clone(),
937 }),
938 downstream_client: None,
939 opened_buffers: Default::default(),
940 loading_buffers: Default::default(),
941 loading_change_sets: Default::default(),
942 shared_buffers: Default::default(),
943 worktree_store,
944 }
945 }
946
947 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
948 match &mut self.state {
949 BufferStoreState::Local(state) => Some(state),
950 _ => None,
951 }
952 }
953
954 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
955 match &mut self.state {
956 BufferStoreState::Remote(state) => Some(state),
957 _ => None,
958 }
959 }
960
961 fn as_remote(&self) -> Option<&RemoteBufferStore> {
962 match &self.state {
963 BufferStoreState::Remote(state) => Some(state),
964 _ => None,
965 }
966 }
967
968 pub fn open_buffer(
969 &mut self,
970 project_path: ProjectPath,
971 cx: &mut ModelContext<Self>,
972 ) -> Task<Result<Model<Buffer>>> {
973 if let Some(buffer) = self.get_by_path(&project_path, cx) {
974 return Task::ready(Ok(buffer));
975 }
976
977 let task = match self.loading_buffers.entry(project_path.clone()) {
978 hash_map::Entry::Occupied(e) => e.get().clone(),
979 hash_map::Entry::Vacant(entry) => {
980 let path = project_path.path.clone();
981 let Some(worktree) = self
982 .worktree_store
983 .read(cx)
984 .worktree_for_id(project_path.worktree_id, cx)
985 else {
986 return Task::ready(Err(anyhow!("no such worktree")));
987 };
988 let load_buffer = match &self.state {
989 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
990 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
991 };
992
993 entry
994 .insert(
995 cx.spawn(move |this, mut cx| async move {
996 let load_result = load_buffer.await;
997 this.update(&mut cx, |this, _cx| {
998 // Record the fact that the buffer is no longer loading.
999 this.loading_buffers.remove(&project_path);
1000 })
1001 .ok();
1002 load_result.map_err(Arc::new)
1003 })
1004 .shared(),
1005 )
1006 .clone()
1007 }
1008 };
1009
1010 cx.background_executor()
1011 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1012 }
1013
1014 pub fn open_unstaged_changes(
1015 &mut self,
1016 buffer: Model<Buffer>,
1017 cx: &mut ModelContext<Self>,
1018 ) -> Task<Result<Model<BufferChangeSet>>> {
1019 let buffer_id = buffer.read(cx).remote_id();
1020 if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
1021 return Task::ready(Ok(change_set));
1022 }
1023
1024 let task = match self.loading_change_sets.entry(buffer_id) {
1025 hash_map::Entry::Occupied(e) => e.get().clone(),
1026 hash_map::Entry::Vacant(entry) => {
1027 let load = match &self.state {
1028 BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1029 BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
1030 };
1031
1032 entry
1033 .insert(
1034 cx.spawn(move |this, cx| async move {
1035 Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
1036 .await
1037 .map_err(Arc::new)
1038 })
1039 .shared(),
1040 )
1041 .clone()
1042 }
1043 };
1044
1045 cx.background_executor()
1046 .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1047 }
1048
1049 #[cfg(any(test, feature = "test-support"))]
1050 pub fn set_change_set(&mut self, buffer_id: BufferId, change_set: Model<BufferChangeSet>) {
1051 self.loading_change_sets
1052 .insert(buffer_id, Task::ready(Ok(change_set)).shared());
1053 }
1054
1055 pub async fn open_unstaged_changes_internal(
1056 this: WeakModel<Self>,
1057 text: Result<Option<String>>,
1058 buffer: Model<Buffer>,
1059 mut cx: AsyncAppContext,
1060 ) -> Result<Model<BufferChangeSet>> {
1061 let text = match text {
1062 Err(e) => {
1063 this.update(&mut cx, |this, cx| {
1064 let buffer_id = buffer.read(cx).remote_id();
1065 this.loading_change_sets.remove(&buffer_id);
1066 })?;
1067 return Err(e);
1068 }
1069 Ok(text) => text,
1070 };
1071
1072 let change_set = buffer.update(&mut cx, |buffer, cx| {
1073 cx.new_model(|_| BufferChangeSet::new(buffer))
1074 })?;
1075
1076 if let Some(text) = text {
1077 change_set
1078 .update(&mut cx, |change_set, cx| {
1079 let snapshot = buffer.read(cx).text_snapshot();
1080 change_set.set_base_text(text, snapshot, cx)
1081 })?
1082 .await
1083 .ok();
1084 }
1085
1086 this.update(&mut cx, |this, cx| {
1087 let buffer_id = buffer.read(cx).remote_id();
1088 this.loading_change_sets.remove(&buffer_id);
1089 if let Some(OpenBuffer::Complete {
1090 unstaged_changes, ..
1091 }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1092 {
1093 *unstaged_changes = Some(change_set.downgrade());
1094 }
1095 })?;
1096
1097 Ok(change_set)
1098 }
1099
1100 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1101 match &self.state {
1102 BufferStoreState::Local(this) => this.create_buffer(cx),
1103 BufferStoreState::Remote(this) => this.create_buffer(cx),
1104 }
1105 }
1106
1107 pub fn save_buffer(
1108 &mut self,
1109 buffer: Model<Buffer>,
1110 cx: &mut ModelContext<Self>,
1111 ) -> Task<Result<()>> {
1112 match &mut self.state {
1113 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1114 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1115 }
1116 }
1117
1118 pub fn save_buffer_as(
1119 &mut self,
1120 buffer: Model<Buffer>,
1121 path: ProjectPath,
1122 cx: &mut ModelContext<Self>,
1123 ) -> Task<Result<()>> {
1124 let old_file = buffer.read(cx).file().cloned();
1125 let task = match &self.state {
1126 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1127 BufferStoreState::Remote(this) => {
1128 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1129 }
1130 };
1131 cx.spawn(|this, mut cx| async move {
1132 task.await?;
1133 this.update(&mut cx, |_, cx| {
1134 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1135 })
1136 })
1137 }
1138
1139 pub fn blame_buffer(
1140 &self,
1141 buffer: &Model<Buffer>,
1142 version: Option<clock::Global>,
1143 cx: &AppContext,
1144 ) -> Task<Result<Option<Blame>>> {
1145 let buffer = buffer.read(cx);
1146 let Some(file) = File::from_dyn(buffer.file()) else {
1147 return Task::ready(Err(anyhow!("buffer has no file")));
1148 };
1149
1150 match file.worktree.clone().read(cx) {
1151 Worktree::Local(worktree) => {
1152 let worktree = worktree.snapshot();
1153 let blame_params = maybe!({
1154 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1155 Some(repo_for_path) => repo_for_path,
1156 None => return Ok(None),
1157 };
1158
1159 let relative_path = repo_entry
1160 .relativize(&worktree, &file.path)
1161 .context("failed to relativize buffer path")?;
1162
1163 let repo = local_repo_entry.repo().clone();
1164
1165 let content = match version {
1166 Some(version) => buffer.rope_for_version(&version).clone(),
1167 None => buffer.as_rope().clone(),
1168 };
1169
1170 anyhow::Ok(Some((repo, relative_path, content)))
1171 });
1172
1173 cx.background_executor().spawn(async move {
1174 let Some((repo, relative_path, content)) = blame_params? else {
1175 return Ok(None);
1176 };
1177 repo.blame(&relative_path, content)
1178 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1179 .map(Some)
1180 })
1181 }
1182 Worktree::Remote(worktree) => {
1183 let buffer_id = buffer.remote_id();
1184 let version = buffer.version();
1185 let project_id = worktree.project_id();
1186 let client = worktree.client();
1187 cx.spawn(|_| async move {
1188 let response = client
1189 .request(proto::BlameBuffer {
1190 project_id,
1191 buffer_id: buffer_id.into(),
1192 version: serialize_version(&version),
1193 })
1194 .await?;
1195 Ok(deserialize_blame_buffer_response(response))
1196 })
1197 }
1198 }
1199 }
1200
1201 pub fn get_permalink_to_line(
1202 &self,
1203 buffer: &Model<Buffer>,
1204 selection: Range<u32>,
1205 cx: &AppContext,
1206 ) -> Task<Result<url::Url>> {
1207 let buffer = buffer.read(cx);
1208 let Some(file) = File::from_dyn(buffer.file()) else {
1209 return Task::ready(Err(anyhow!("buffer has no file")));
1210 };
1211
1212 match file.worktree.clone().read(cx) {
1213 Worktree::Local(worktree) => {
1214 let Some(repo) = worktree.local_git_repo(file.path()) else {
1215 return Task::ready(Err(anyhow!("no repository for buffer found")));
1216 };
1217
1218 let path = file.path().clone();
1219
1220 cx.spawn(|cx| async move {
1221 const REMOTE_NAME: &str = "origin";
1222 let origin_url = repo
1223 .remote_url(REMOTE_NAME)
1224 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1225
1226 let sha = repo
1227 .head_sha()
1228 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1229
1230 let provider_registry =
1231 cx.update(GitHostingProviderRegistry::default_global)?;
1232
1233 let (provider, remote) =
1234 parse_git_remote_url(provider_registry, &origin_url)
1235 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1236
1237 let path = path
1238 .to_str()
1239 .context("failed to convert buffer path to string")?;
1240
1241 Ok(provider.build_permalink(
1242 remote,
1243 BuildPermalinkParams {
1244 sha: &sha,
1245 path,
1246 selection: Some(selection),
1247 },
1248 ))
1249 })
1250 }
1251 Worktree::Remote(worktree) => {
1252 let buffer_id = buffer.remote_id();
1253 let project_id = worktree.project_id();
1254 let client = worktree.client();
1255 cx.spawn(|_| async move {
1256 let response = client
1257 .request(proto::GetPermalinkToLine {
1258 project_id,
1259 buffer_id: buffer_id.into(),
1260 selection: Some(proto::Range {
1261 start: selection.start as u64,
1262 end: selection.end as u64,
1263 }),
1264 })
1265 .await?;
1266
1267 url::Url::parse(&response.permalink).context("failed to parse permalink")
1268 })
1269 }
1270 }
1271 }
1272
1273 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1274 let remote_id = buffer.read(cx).remote_id();
1275 let is_remote = buffer.read(cx).replica_id() != 0;
1276 let open_buffer = OpenBuffer::Complete {
1277 buffer: buffer.downgrade(),
1278 unstaged_changes: None,
1279 };
1280
1281 let handle = cx.handle().downgrade();
1282 buffer.update(cx, move |_, cx| {
1283 cx.on_release(move |buffer, cx| {
1284 handle
1285 .update(cx, |_, cx| {
1286 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1287 })
1288 .ok();
1289 })
1290 .detach()
1291 });
1292
1293 match self.opened_buffers.entry(remote_id) {
1294 hash_map::Entry::Vacant(entry) => {
1295 entry.insert(open_buffer);
1296 }
1297 hash_map::Entry::Occupied(mut entry) => {
1298 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1299 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1300 } else if entry.get().upgrade().is_some() {
1301 if is_remote {
1302 return Ok(());
1303 } else {
1304 debug_panic!("buffer {} was already registered", remote_id);
1305 Err(anyhow!("buffer {} was already registered", remote_id))?;
1306 }
1307 }
1308 entry.insert(open_buffer);
1309 }
1310 }
1311
1312 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1313 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1314 Ok(())
1315 }
1316
1317 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1318 self.opened_buffers
1319 .values()
1320 .filter_map(|buffer| buffer.upgrade())
1321 }
1322
1323 pub fn loading_buffers(
1324 &self,
1325 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Model<Buffer>>>)> {
1326 self.loading_buffers.iter().map(|(path, task)| {
1327 let task = task.clone();
1328 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1329 })
1330 }
1331
1332 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1333 self.buffers().find_map(|buffer| {
1334 let file = File::from_dyn(buffer.read(cx).file())?;
1335 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1336 Some(buffer)
1337 } else {
1338 None
1339 }
1340 })
1341 }
1342
1343 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1344 self.opened_buffers.get(&buffer_id)?.upgrade()
1345 }
1346
1347 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1348 self.get(buffer_id)
1349 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1350 }
1351
1352 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1353 self.get(buffer_id).or_else(|| {
1354 self.as_remote()
1355 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1356 })
1357 }
1358
1359 pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Model<BufferChangeSet>> {
1360 if let OpenBuffer::Complete {
1361 unstaged_changes, ..
1362 } = self.opened_buffers.get(&buffer_id)?
1363 {
1364 unstaged_changes.as_ref()?.upgrade()
1365 } else {
1366 None
1367 }
1368 }
1369
1370 pub fn buffer_version_info(
1371 &self,
1372 cx: &AppContext,
1373 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1374 let buffers = self
1375 .buffers()
1376 .map(|buffer| {
1377 let buffer = buffer.read(cx);
1378 proto::BufferVersion {
1379 id: buffer.remote_id().into(),
1380 version: language::proto::serialize_version(&buffer.version),
1381 }
1382 })
1383 .collect();
1384 let incomplete_buffer_ids = self
1385 .as_remote()
1386 .map(|remote| remote.incomplete_buffer_ids())
1387 .unwrap_or_default();
1388 (buffers, incomplete_buffer_ids)
1389 }
1390
1391 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1392 for open_buffer in self.opened_buffers.values_mut() {
1393 if let Some(buffer) = open_buffer.upgrade() {
1394 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1395 }
1396 }
1397
1398 for buffer in self.buffers() {
1399 buffer.update(cx, |buffer, cx| {
1400 buffer.set_capability(Capability::ReadOnly, cx)
1401 });
1402 }
1403
1404 if let Some(remote) = self.as_remote_mut() {
1405 // Wake up all futures currently waiting on a buffer to get opened,
1406 // to give them a chance to fail now that we've disconnected.
1407 remote.remote_buffer_listeners.clear()
1408 }
1409 }
1410
1411 pub fn shared(
1412 &mut self,
1413 remote_id: u64,
1414 downstream_client: AnyProtoClient,
1415 _cx: &mut AppContext,
1416 ) {
1417 self.downstream_client = Some((downstream_client, remote_id));
1418 }
1419
1420 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1421 self.downstream_client.take();
1422 self.forget_shared_buffers();
1423 }
1424
1425 pub fn discard_incomplete(&mut self) {
1426 self.opened_buffers
1427 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1428 }
1429
1430 pub fn find_search_candidates(
1431 &mut self,
1432 query: &SearchQuery,
1433 mut limit: usize,
1434 fs: Arc<dyn Fs>,
1435 cx: &mut ModelContext<Self>,
1436 ) -> Receiver<Model<Buffer>> {
1437 let (tx, rx) = smol::channel::unbounded();
1438 let mut open_buffers = HashSet::default();
1439 let mut unnamed_buffers = Vec::new();
1440 for handle in self.buffers() {
1441 let buffer = handle.read(cx);
1442 if let Some(entry_id) = buffer.entry_id(cx) {
1443 open_buffers.insert(entry_id);
1444 } else {
1445 limit = limit.saturating_sub(1);
1446 unnamed_buffers.push(handle)
1447 };
1448 }
1449
1450 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1451 let mut project_paths_rx = self
1452 .worktree_store
1453 .update(cx, |worktree_store, cx| {
1454 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1455 })
1456 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1457
1458 cx.spawn(|this, mut cx| async move {
1459 for buffer in unnamed_buffers {
1460 tx.send(buffer).await.ok();
1461 }
1462
1463 while let Some(project_paths) = project_paths_rx.next().await {
1464 let buffers = this.update(&mut cx, |this, cx| {
1465 project_paths
1466 .into_iter()
1467 .map(|project_path| this.open_buffer(project_path, cx))
1468 .collect::<Vec<_>>()
1469 })?;
1470 for buffer_task in buffers {
1471 if let Some(buffer) = buffer_task.await.log_err() {
1472 if tx.send(buffer).await.is_err() {
1473 return anyhow::Ok(());
1474 }
1475 }
1476 }
1477 }
1478 anyhow::Ok(())
1479 })
1480 .detach();
1481 rx
1482 }
1483
1484 pub fn recalculate_buffer_diffs(
1485 &mut self,
1486 buffers: Vec<Model<Buffer>>,
1487 cx: &mut ModelContext<Self>,
1488 ) -> impl Future<Output = ()> {
1489 let mut futures = Vec::new();
1490 for buffer in buffers {
1491 let buffer = buffer.read(cx).text_snapshot();
1492 if let Some(OpenBuffer::Complete {
1493 unstaged_changes, ..
1494 }) = self.opened_buffers.get_mut(&buffer.remote_id())
1495 {
1496 if let Some(unstaged_changes) = unstaged_changes
1497 .as_ref()
1498 .and_then(|changes| changes.upgrade())
1499 {
1500 unstaged_changes.update(cx, |unstaged_changes, cx| {
1501 futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
1502 });
1503 } else {
1504 unstaged_changes.take();
1505 }
1506 }
1507 }
1508 async move {
1509 futures::future::join_all(futures).await;
1510 }
1511 }
1512
1513 fn on_buffer_event(
1514 &mut self,
1515 buffer: Model<Buffer>,
1516 event: &BufferEvent,
1517 cx: &mut ModelContext<Self>,
1518 ) {
1519 match event {
1520 BufferEvent::FileHandleChanged => {
1521 if let Some(local) = self.as_local_mut() {
1522 local.buffer_changed_file(buffer, cx);
1523 }
1524 }
1525 BufferEvent::Reloaded => {
1526 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1527 return;
1528 };
1529 let buffer = buffer.read(cx);
1530 downstream_client
1531 .send(proto::BufferReloaded {
1532 project_id: *project_id,
1533 buffer_id: buffer.remote_id().to_proto(),
1534 version: serialize_version(&buffer.version()),
1535 mtime: buffer.saved_mtime().map(|t| t.into()),
1536 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1537 })
1538 .log_err();
1539 }
1540 _ => {}
1541 }
1542 }
1543
1544 pub async fn handle_update_buffer(
1545 this: Model<Self>,
1546 envelope: TypedEnvelope<proto::UpdateBuffer>,
1547 mut cx: AsyncAppContext,
1548 ) -> Result<proto::Ack> {
1549 let payload = envelope.payload.clone();
1550 let buffer_id = BufferId::new(payload.buffer_id)?;
1551 let ops = payload
1552 .operations
1553 .into_iter()
1554 .map(language::proto::deserialize_operation)
1555 .collect::<Result<Vec<_>, _>>()?;
1556 this.update(&mut cx, |this, cx| {
1557 match this.opened_buffers.entry(buffer_id) {
1558 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1559 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1560 OpenBuffer::Complete { buffer, .. } => {
1561 if let Some(buffer) = buffer.upgrade() {
1562 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1563 }
1564 }
1565 },
1566 hash_map::Entry::Vacant(e) => {
1567 e.insert(OpenBuffer::Operations(ops));
1568 }
1569 }
1570 Ok(proto::Ack {})
1571 })?
1572 }
1573
1574 pub fn handle_synchronize_buffers(
1575 &mut self,
1576 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1577 cx: &mut ModelContext<Self>,
1578 client: Arc<Client>,
1579 ) -> Result<proto::SynchronizeBuffersResponse> {
1580 let project_id = envelope.payload.project_id;
1581 let mut response = proto::SynchronizeBuffersResponse {
1582 buffers: Default::default(),
1583 };
1584 let Some(guest_id) = envelope.original_sender_id else {
1585 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1586 };
1587
1588 self.shared_buffers.entry(guest_id).or_default().clear();
1589 for buffer in envelope.payload.buffers {
1590 let buffer_id = BufferId::new(buffer.id)?;
1591 let remote_version = language::proto::deserialize_version(&buffer.version);
1592 if let Some(buffer) = self.get(buffer_id) {
1593 self.shared_buffers
1594 .entry(guest_id)
1595 .or_default()
1596 .entry(buffer_id)
1597 .or_insert_with(|| SharedBuffer {
1598 buffer: buffer.clone(),
1599 unstaged_changes: None,
1600 });
1601
1602 let buffer = buffer.read(cx);
1603 response.buffers.push(proto::BufferVersion {
1604 id: buffer_id.into(),
1605 version: language::proto::serialize_version(&buffer.version),
1606 });
1607
1608 let operations = buffer.serialize_ops(Some(remote_version), cx);
1609 let client = client.clone();
1610 if let Some(file) = buffer.file() {
1611 client
1612 .send(proto::UpdateBufferFile {
1613 project_id,
1614 buffer_id: buffer_id.into(),
1615 file: Some(file.to_proto(cx)),
1616 })
1617 .log_err();
1618 }
1619
1620 // todo!(max): do something
1621 // client
1622 // .send(proto::UpdateStagedText {
1623 // project_id,
1624 // buffer_id: buffer_id.into(),
1625 // diff_base: buffer.diff_base().map(ToString::to_string),
1626 // })
1627 // .log_err();
1628
1629 client
1630 .send(proto::BufferReloaded {
1631 project_id,
1632 buffer_id: buffer_id.into(),
1633 version: language::proto::serialize_version(buffer.saved_version()),
1634 mtime: buffer.saved_mtime().map(|time| time.into()),
1635 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1636 as i32,
1637 })
1638 .log_err();
1639
1640 cx.background_executor()
1641 .spawn(
1642 async move {
1643 let operations = operations.await;
1644 for chunk in split_operations(operations) {
1645 client
1646 .request(proto::UpdateBuffer {
1647 project_id,
1648 buffer_id: buffer_id.into(),
1649 operations: chunk,
1650 })
1651 .await?;
1652 }
1653 anyhow::Ok(())
1654 }
1655 .log_err(),
1656 )
1657 .detach();
1658 }
1659 }
1660 Ok(response)
1661 }
1662
1663 pub fn handle_create_buffer_for_peer(
1664 &mut self,
1665 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1666 replica_id: u16,
1667 capability: Capability,
1668 cx: &mut ModelContext<Self>,
1669 ) -> Result<()> {
1670 let Some(remote) = self.as_remote_mut() else {
1671 return Err(anyhow!("buffer store is not a remote"));
1672 };
1673
1674 if let Some(buffer) =
1675 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1676 {
1677 self.add_buffer(buffer, cx)?;
1678 }
1679
1680 Ok(())
1681 }
1682
1683 pub async fn handle_update_buffer_file(
1684 this: Model<Self>,
1685 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1686 mut cx: AsyncAppContext,
1687 ) -> Result<()> {
1688 let buffer_id = envelope.payload.buffer_id;
1689 let buffer_id = BufferId::new(buffer_id)?;
1690
1691 this.update(&mut cx, |this, cx| {
1692 let payload = envelope.payload.clone();
1693 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1694 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1695 let worktree = this
1696 .worktree_store
1697 .read(cx)
1698 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1699 .ok_or_else(|| anyhow!("no such worktree"))?;
1700 let file = File::from_proto(file, worktree, cx)?;
1701 let old_file = buffer.update(cx, |buffer, cx| {
1702 let old_file = buffer.file().cloned();
1703 let new_path = file.path.clone();
1704 buffer.file_updated(Arc::new(file), cx);
1705 if old_file
1706 .as_ref()
1707 .map_or(true, |old| *old.path() != new_path)
1708 {
1709 Some(old_file)
1710 } else {
1711 None
1712 }
1713 });
1714 if let Some(old_file) = old_file {
1715 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1716 }
1717 }
1718 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1719 downstream_client
1720 .send(proto::UpdateBufferFile {
1721 project_id: *project_id,
1722 buffer_id: buffer_id.into(),
1723 file: envelope.payload.file,
1724 })
1725 .log_err();
1726 }
1727 Ok(())
1728 })?
1729 }
1730
1731 pub async fn handle_save_buffer(
1732 this: Model<Self>,
1733 envelope: TypedEnvelope<proto::SaveBuffer>,
1734 mut cx: AsyncAppContext,
1735 ) -> Result<proto::BufferSaved> {
1736 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1737 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1738 anyhow::Ok((
1739 this.get_existing(buffer_id)?,
1740 this.downstream_client
1741 .as_ref()
1742 .map(|(_, project_id)| *project_id)
1743 .context("project is not shared")?,
1744 ))
1745 })??;
1746 buffer
1747 .update(&mut cx, |buffer, _| {
1748 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1749 })?
1750 .await?;
1751 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1752
1753 if let Some(new_path) = envelope.payload.new_path {
1754 let new_path = ProjectPath::from_proto(new_path);
1755 this.update(&mut cx, |this, cx| {
1756 this.save_buffer_as(buffer.clone(), new_path, cx)
1757 })?
1758 .await?;
1759 } else {
1760 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1761 .await?;
1762 }
1763
1764 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1765 project_id,
1766 buffer_id: buffer_id.into(),
1767 version: serialize_version(buffer.saved_version()),
1768 mtime: buffer.saved_mtime().map(|time| time.into()),
1769 })
1770 }
1771
1772 pub async fn handle_close_buffer(
1773 this: Model<Self>,
1774 envelope: TypedEnvelope<proto::CloseBuffer>,
1775 mut cx: AsyncAppContext,
1776 ) -> Result<()> {
1777 let peer_id = envelope.sender_id;
1778 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1779 this.update(&mut cx, |this, _| {
1780 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1781 if shared.remove(&buffer_id).is_some() {
1782 if shared.is_empty() {
1783 this.shared_buffers.remove(&peer_id);
1784 }
1785 return;
1786 }
1787 }
1788 debug_panic!(
1789 "peer_id {} closed buffer_id {} which was either not open or already closed",
1790 peer_id,
1791 buffer_id
1792 )
1793 })
1794 }
1795
1796 pub async fn handle_buffer_saved(
1797 this: Model<Self>,
1798 envelope: TypedEnvelope<proto::BufferSaved>,
1799 mut cx: AsyncAppContext,
1800 ) -> Result<()> {
1801 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1802 let version = deserialize_version(&envelope.payload.version);
1803 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1804 this.update(&mut cx, move |this, cx| {
1805 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1806 buffer.update(cx, |buffer, cx| {
1807 buffer.did_save(version, mtime, cx);
1808 });
1809 }
1810
1811 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1812 downstream_client
1813 .send(proto::BufferSaved {
1814 project_id: *project_id,
1815 buffer_id: buffer_id.into(),
1816 mtime: envelope.payload.mtime,
1817 version: envelope.payload.version,
1818 })
1819 .log_err();
1820 }
1821 })
1822 }
1823
1824 pub async fn handle_buffer_reloaded(
1825 this: Model<Self>,
1826 envelope: TypedEnvelope<proto::BufferReloaded>,
1827 mut cx: AsyncAppContext,
1828 ) -> Result<()> {
1829 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1830 let version = deserialize_version(&envelope.payload.version);
1831 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1832 let line_ending = deserialize_line_ending(
1833 proto::LineEnding::from_i32(envelope.payload.line_ending)
1834 .ok_or_else(|| anyhow!("missing line ending"))?,
1835 );
1836 this.update(&mut cx, |this, cx| {
1837 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1838 buffer.update(cx, |buffer, cx| {
1839 buffer.did_reload(version, line_ending, mtime, cx);
1840 });
1841 }
1842
1843 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1844 downstream_client
1845 .send(proto::BufferReloaded {
1846 project_id: *project_id,
1847 buffer_id: buffer_id.into(),
1848 mtime: envelope.payload.mtime,
1849 version: envelope.payload.version,
1850 line_ending: envelope.payload.line_ending,
1851 })
1852 .log_err();
1853 }
1854 })
1855 }
1856
1857 pub async fn handle_blame_buffer(
1858 this: Model<Self>,
1859 envelope: TypedEnvelope<proto::BlameBuffer>,
1860 mut cx: AsyncAppContext,
1861 ) -> Result<proto::BlameBufferResponse> {
1862 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1863 let version = deserialize_version(&envelope.payload.version);
1864 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1865 buffer
1866 .update(&mut cx, |buffer, _| {
1867 buffer.wait_for_version(version.clone())
1868 })?
1869 .await?;
1870 let blame = this
1871 .update(&mut cx, |this, cx| {
1872 this.blame_buffer(&buffer, Some(version), cx)
1873 })?
1874 .await?;
1875 Ok(serialize_blame_buffer_response(blame))
1876 }
1877
1878 pub async fn handle_get_permalink_to_line(
1879 this: Model<Self>,
1880 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1881 mut cx: AsyncAppContext,
1882 ) -> Result<proto::GetPermalinkToLineResponse> {
1883 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1884 // let version = deserialize_version(&envelope.payload.version);
1885 let selection = {
1886 let proto_selection = envelope
1887 .payload
1888 .selection
1889 .context("no selection to get permalink for defined")?;
1890 proto_selection.start as u32..proto_selection.end as u32
1891 };
1892 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1893 let permalink = this
1894 .update(&mut cx, |this, cx| {
1895 this.get_permalink_to_line(&buffer, selection, cx)
1896 })?
1897 .await?;
1898 Ok(proto::GetPermalinkToLineResponse {
1899 permalink: permalink.to_string(),
1900 })
1901 }
1902
1903 pub async fn handle_get_staged_text(
1904 this: Model<Self>,
1905 request: TypedEnvelope<proto::GetStagedText>,
1906 mut cx: AsyncAppContext,
1907 ) -> Result<proto::GetStagedTextResponse> {
1908 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1909 let change_set = this
1910 .update(&mut cx, |this, cx| {
1911 let buffer = this.get(buffer_id)?;
1912 Some(this.open_unstaged_changes(buffer, cx))
1913 })?
1914 .ok_or_else(|| anyhow!("no such buffer"))?
1915 .await?;
1916 this.update(&mut cx, |this, _| {
1917 let shared_buffers = this
1918 .shared_buffers
1919 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1920 .or_default();
1921 debug_assert!(shared_buffers.contains_key(&buffer_id));
1922 if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
1923 shared.unstaged_changes = Some(change_set.clone());
1924 }
1925 })?;
1926 let staged_text = change_set.read_with(&cx, |change_set, cx| {
1927 change_set
1928 .base_text
1929 .as_ref()
1930 .map(|buffer| buffer.read(cx).text())
1931 })?;
1932 Ok(proto::GetStagedTextResponse { staged_text })
1933 }
1934
1935 pub async fn handle_update_diff_base(
1936 this: Model<Self>,
1937 request: TypedEnvelope<proto::UpdateDiffBase>,
1938 mut cx: AsyncAppContext,
1939 ) -> Result<()> {
1940 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1941 let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
1942 if let OpenBuffer::Complete {
1943 unstaged_changes,
1944 buffer,
1945 } = this.opened_buffers.get(&buffer_id)?
1946 {
1947 Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
1948 } else {
1949 None
1950 }
1951 })?
1952 else {
1953 return Ok(());
1954 };
1955 change_set.update(&mut cx, |change_set, cx| {
1956 if let Some(staged_text) = request.payload.staged_text {
1957 let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
1958 } else {
1959 change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
1960 }
1961 })?;
1962 Ok(())
1963 }
1964
1965 pub fn reload_buffers(
1966 &self,
1967 buffers: HashSet<Model<Buffer>>,
1968 push_to_history: bool,
1969 cx: &mut ModelContext<Self>,
1970 ) -> Task<Result<ProjectTransaction>> {
1971 if buffers.is_empty() {
1972 return Task::ready(Ok(ProjectTransaction::default()));
1973 }
1974 match &self.state {
1975 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1976 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1977 }
1978 }
1979
1980 async fn handle_reload_buffers(
1981 this: Model<Self>,
1982 envelope: TypedEnvelope<proto::ReloadBuffers>,
1983 mut cx: AsyncAppContext,
1984 ) -> Result<proto::ReloadBuffersResponse> {
1985 let sender_id = envelope.original_sender_id().unwrap_or_default();
1986 let reload = this.update(&mut cx, |this, cx| {
1987 let mut buffers = HashSet::default();
1988 for buffer_id in &envelope.payload.buffer_ids {
1989 let buffer_id = BufferId::new(*buffer_id)?;
1990 buffers.insert(this.get_existing(buffer_id)?);
1991 }
1992 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1993 })??;
1994
1995 let project_transaction = reload.await?;
1996 let project_transaction = this.update(&mut cx, |this, cx| {
1997 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1998 })?;
1999 Ok(proto::ReloadBuffersResponse {
2000 transaction: Some(project_transaction),
2001 })
2002 }
2003
2004 pub fn create_buffer_for_peer(
2005 &mut self,
2006 buffer: &Model<Buffer>,
2007 peer_id: proto::PeerId,
2008 cx: &mut ModelContext<Self>,
2009 ) -> Task<Result<()>> {
2010 let buffer_id = buffer.read(cx).remote_id();
2011 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2012 if shared_buffers.contains_key(&buffer_id) {
2013 return Task::ready(Ok(()));
2014 }
2015 shared_buffers.insert(
2016 buffer_id,
2017 SharedBuffer {
2018 buffer: buffer.clone(),
2019 unstaged_changes: None,
2020 },
2021 );
2022
2023 let Some((client, project_id)) = self.downstream_client.clone() else {
2024 return Task::ready(Ok(()));
2025 };
2026
2027 cx.spawn(|this, mut cx| async move {
2028 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2029 return anyhow::Ok(());
2030 };
2031
2032 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2033 let operations = operations.await;
2034 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2035
2036 let initial_state = proto::CreateBufferForPeer {
2037 project_id,
2038 peer_id: Some(peer_id),
2039 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2040 };
2041
2042 if client.send(initial_state).log_err().is_some() {
2043 let client = client.clone();
2044 cx.background_executor()
2045 .spawn(async move {
2046 let mut chunks = split_operations(operations).peekable();
2047 while let Some(chunk) = chunks.next() {
2048 let is_last = chunks.peek().is_none();
2049 client.send(proto::CreateBufferForPeer {
2050 project_id,
2051 peer_id: Some(peer_id),
2052 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2053 proto::BufferChunk {
2054 buffer_id: buffer_id.into(),
2055 operations: chunk,
2056 is_last,
2057 },
2058 )),
2059 })?;
2060 }
2061 anyhow::Ok(())
2062 })
2063 .await
2064 .log_err();
2065 }
2066 Ok(())
2067 })
2068 }
2069
2070 pub fn forget_shared_buffers(&mut self) {
2071 self.shared_buffers.clear();
2072 }
2073
2074 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2075 self.shared_buffers.remove(peer_id);
2076 }
2077
2078 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2079 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2080 self.shared_buffers.insert(new_peer_id, buffers);
2081 }
2082 }
2083
2084 pub fn has_shared_buffers(&self) -> bool {
2085 !self.shared_buffers.is_empty()
2086 }
2087
2088 pub fn create_local_buffer(
2089 &mut self,
2090 text: &str,
2091 language: Option<Arc<Language>>,
2092 cx: &mut ModelContext<Self>,
2093 ) -> Model<Buffer> {
2094 let buffer = cx.new_model(|cx| {
2095 Buffer::local(text, cx)
2096 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2097 });
2098
2099 self.add_buffer(buffer.clone(), cx).log_err();
2100 let buffer_id = buffer.read(cx).remote_id();
2101
2102 let this = self
2103 .as_local_mut()
2104 .expect("local-only method called in a non-local context");
2105 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2106 this.local_buffer_ids_by_path.insert(
2107 ProjectPath {
2108 worktree_id: file.worktree_id(cx),
2109 path: file.path.clone(),
2110 },
2111 buffer_id,
2112 );
2113
2114 if let Some(entry_id) = file.entry_id {
2115 this.local_buffer_ids_by_entry_id
2116 .insert(entry_id, buffer_id);
2117 }
2118 }
2119 buffer
2120 }
2121
2122 pub fn deserialize_project_transaction(
2123 &mut self,
2124 message: proto::ProjectTransaction,
2125 push_to_history: bool,
2126 cx: &mut ModelContext<Self>,
2127 ) -> Task<Result<ProjectTransaction>> {
2128 if let Some(this) = self.as_remote_mut() {
2129 this.deserialize_project_transaction(message, push_to_history, cx)
2130 } else {
2131 debug_panic!("not a remote buffer store");
2132 Task::ready(Err(anyhow!("not a remote buffer store")))
2133 }
2134 }
2135
2136 pub fn wait_for_remote_buffer(
2137 &mut self,
2138 id: BufferId,
2139 cx: &mut ModelContext<BufferStore>,
2140 ) -> Task<Result<Model<Buffer>>> {
2141 if let Some(this) = self.as_remote_mut() {
2142 this.wait_for_remote_buffer(id, cx)
2143 } else {
2144 debug_panic!("not a remote buffer store");
2145 Task::ready(Err(anyhow!("not a remote buffer store")))
2146 }
2147 }
2148
2149 pub fn serialize_project_transaction_for_peer(
2150 &mut self,
2151 project_transaction: ProjectTransaction,
2152 peer_id: proto::PeerId,
2153 cx: &mut ModelContext<Self>,
2154 ) -> proto::ProjectTransaction {
2155 let mut serialized_transaction = proto::ProjectTransaction {
2156 buffer_ids: Default::default(),
2157 transactions: Default::default(),
2158 };
2159 for (buffer, transaction) in project_transaction.0 {
2160 self.create_buffer_for_peer(&buffer, peer_id, cx)
2161 .detach_and_log_err(cx);
2162 serialized_transaction
2163 .buffer_ids
2164 .push(buffer.read(cx).remote_id().into());
2165 serialized_transaction
2166 .transactions
2167 .push(language::proto::serialize_transaction(&transaction));
2168 }
2169 serialized_transaction
2170 }
2171}
2172
2173impl BufferChangeSet {
2174 pub fn new(buffer: &text::BufferSnapshot) -> Self {
2175 Self {
2176 buffer_id: buffer.remote_id(),
2177 base_text: None,
2178 diff_to_buffer: git::diff::BufferDiff::new(buffer),
2179 recalculate_diff_task: None,
2180 diff_updated_futures: Vec::new(),
2181 base_text_version: 0,
2182 }
2183 }
2184
2185 #[cfg(any(test, feature = "test-support"))]
2186 pub fn new_with_base_text(
2187 base_text: String,
2188 buffer: text::BufferSnapshot,
2189 cx: &mut ModelContext<Self>,
2190 ) -> Self {
2191 let mut this = Self::new(&buffer);
2192 let _ = this.set_base_text(base_text, buffer, cx);
2193 this
2194 }
2195
2196 pub fn diff_hunks_intersecting_range<'a>(
2197 &'a self,
2198 range: Range<text::Anchor>,
2199 buffer_snapshot: &'a text::BufferSnapshot,
2200 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2201 self.diff_to_buffer
2202 .hunks_intersecting_range(range, buffer_snapshot)
2203 }
2204
2205 pub fn diff_hunks_intersecting_range_rev<'a>(
2206 &'a self,
2207 range: Range<text::Anchor>,
2208 buffer_snapshot: &'a text::BufferSnapshot,
2209 ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2210 self.diff_to_buffer
2211 .hunks_intersecting_range_rev(range, buffer_snapshot)
2212 }
2213
2214 #[cfg(any(test, feature = "test-support"))]
2215 pub fn base_text_string(&self, cx: &AppContext) -> Option<String> {
2216 self.base_text.as_ref().map(|buffer| buffer.read(cx).text())
2217 }
2218
2219 pub fn set_base_text(
2220 &mut self,
2221 mut base_text: String,
2222 buffer_snapshot: text::BufferSnapshot,
2223 cx: &mut ModelContext<Self>,
2224 ) -> oneshot::Receiver<()> {
2225 LineEnding::normalize(&mut base_text);
2226 self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
2227 }
2228
2229 pub fn unset_base_text(
2230 &mut self,
2231 buffer_snapshot: text::BufferSnapshot,
2232 cx: &mut ModelContext<Self>,
2233 ) {
2234 if self.base_text.is_some() {
2235 self.base_text = None;
2236 self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
2237 self.recalculate_diff_task.take();
2238 self.base_text_version += 1;
2239 cx.notify();
2240 }
2241 }
2242
2243 pub fn recalculate_diff(
2244 &mut self,
2245 buffer_snapshot: text::BufferSnapshot,
2246 cx: &mut ModelContext<Self>,
2247 ) -> oneshot::Receiver<()> {
2248 if let Some(base_text) = self.base_text.clone() {
2249 self.recalculate_diff_internal(base_text.read(cx).text(), buffer_snapshot, false, cx)
2250 } else {
2251 oneshot::channel().1
2252 }
2253 }
2254
2255 fn recalculate_diff_internal(
2256 &mut self,
2257 base_text: String,
2258 buffer_snapshot: text::BufferSnapshot,
2259 base_text_changed: bool,
2260 cx: &mut ModelContext<Self>,
2261 ) -> oneshot::Receiver<()> {
2262 let (tx, rx) = oneshot::channel();
2263 self.diff_updated_futures.push(tx);
2264 self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2265 let (base_text, diff) = cx
2266 .background_executor()
2267 .spawn(async move {
2268 let diff = BufferDiff::build(&base_text, &buffer_snapshot).await;
2269 (base_text, diff)
2270 })
2271 .await;
2272 this.update(&mut cx, |this, cx| {
2273 if base_text_changed {
2274 this.base_text_version += 1;
2275 this.base_text = Some(cx.new_model(|cx| {
2276 Buffer::local_normalized(Rope::from(base_text), LineEnding::default(), cx)
2277 }));
2278 }
2279 this.diff_to_buffer = diff;
2280 this.recalculate_diff_task.take();
2281 for tx in this.diff_updated_futures.drain(..) {
2282 tx.send(()).ok();
2283 }
2284 cx.notify();
2285 })?;
2286 Ok(())
2287 }));
2288 rx
2289 }
2290}
2291
2292impl OpenBuffer {
2293 fn upgrade(&self) -> Option<Model<Buffer>> {
2294 match self {
2295 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2296 OpenBuffer::Operations(_) => None,
2297 }
2298 }
2299}
2300
2301fn is_not_found_error(error: &anyhow::Error) -> bool {
2302 error
2303 .root_cause()
2304 .downcast_ref::<io::Error>()
2305 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2306}
2307
2308fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2309 let Some(blame) = blame else {
2310 return proto::BlameBufferResponse {
2311 blame_response: None,
2312 };
2313 };
2314
2315 let entries = blame
2316 .entries
2317 .into_iter()
2318 .map(|entry| proto::BlameEntry {
2319 sha: entry.sha.as_bytes().into(),
2320 start_line: entry.range.start,
2321 end_line: entry.range.end,
2322 original_line_number: entry.original_line_number,
2323 author: entry.author.clone(),
2324 author_mail: entry.author_mail.clone(),
2325 author_time: entry.author_time,
2326 author_tz: entry.author_tz.clone(),
2327 committer: entry.committer.clone(),
2328 committer_mail: entry.committer_mail.clone(),
2329 committer_time: entry.committer_time,
2330 committer_tz: entry.committer_tz.clone(),
2331 summary: entry.summary.clone(),
2332 previous: entry.previous.clone(),
2333 filename: entry.filename.clone(),
2334 })
2335 .collect::<Vec<_>>();
2336
2337 let messages = blame
2338 .messages
2339 .into_iter()
2340 .map(|(oid, message)| proto::CommitMessage {
2341 oid: oid.as_bytes().into(),
2342 message,
2343 })
2344 .collect::<Vec<_>>();
2345
2346 let permalinks = blame
2347 .permalinks
2348 .into_iter()
2349 .map(|(oid, url)| proto::CommitPermalink {
2350 oid: oid.as_bytes().into(),
2351 permalink: url.to_string(),
2352 })
2353 .collect::<Vec<_>>();
2354
2355 proto::BlameBufferResponse {
2356 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2357 entries,
2358 messages,
2359 permalinks,
2360 remote_url: blame.remote_url,
2361 }),
2362 }
2363}
2364
2365fn deserialize_blame_buffer_response(
2366 response: proto::BlameBufferResponse,
2367) -> Option<git::blame::Blame> {
2368 let response = response.blame_response?;
2369 let entries = response
2370 .entries
2371 .into_iter()
2372 .filter_map(|entry| {
2373 Some(git::blame::BlameEntry {
2374 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2375 range: entry.start_line..entry.end_line,
2376 original_line_number: entry.original_line_number,
2377 committer: entry.committer,
2378 committer_time: entry.committer_time,
2379 committer_tz: entry.committer_tz,
2380 committer_mail: entry.committer_mail,
2381 author: entry.author,
2382 author_mail: entry.author_mail,
2383 author_time: entry.author_time,
2384 author_tz: entry.author_tz,
2385 summary: entry.summary,
2386 previous: entry.previous,
2387 filename: entry.filename,
2388 })
2389 })
2390 .collect::<Vec<_>>();
2391
2392 let messages = response
2393 .messages
2394 .into_iter()
2395 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2396 .collect::<HashMap<_, _>>();
2397
2398 let permalinks = response
2399 .permalinks
2400 .into_iter()
2401 .filter_map(|permalink| {
2402 Some((
2403 git::Oid::from_bytes(&permalink.oid).ok()?,
2404 Url::from_str(&permalink.permalink).ok()?,
2405 ))
2406 })
2407 .collect::<HashMap<_, _>>();
2408
2409 Some(Blame {
2410 entries,
2411 permalinks,
2412 messages,
2413 remote_url: response.remote_url,
2414 })
2415}