1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, ProjectPath,
5};
6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
7use anyhow::{anyhow, Context as _, Result};
8use client::Client;
9use collections::{hash_map, HashMap, HashSet};
10use fs::Fs;
11use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
12use git::blame::Blame;
13use gpui::{
14 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
15 Task, WeakModel,
16};
17use http_client::Url;
18use language::{
19 proto::{
20 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
21 split_operations,
22 },
23 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
24};
25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
26use smol::channel::Receiver;
27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
28use text::BufferId;
29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
31
32trait BufferStoreImpl {
33 fn open_buffer(
34 &self,
35 path: Arc<Path>,
36 worktree: Model<Worktree>,
37 cx: &mut ModelContext<BufferStore>,
38 ) -> Task<Result<Model<Buffer>>>;
39
40 fn save_buffer(
41 &self,
42 buffer: Model<Buffer>,
43 cx: &mut ModelContext<BufferStore>,
44 ) -> Task<Result<()>>;
45
46 fn save_buffer_as(
47 &self,
48 buffer: Model<Buffer>,
49 path: ProjectPath,
50 cx: &mut ModelContext<BufferStore>,
51 ) -> Task<Result<()>>;
52
53 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>>;
54
55 fn reload_buffers(
56 &self,
57 buffers: HashSet<Model<Buffer>>,
58 push_to_history: bool,
59 cx: &mut ModelContext<BufferStore>,
60 ) -> Task<Result<ProjectTransaction>>;
61
62 fn as_remote(&self) -> Option<Model<RemoteBufferStore>>;
63 fn as_local(&self) -> Option<Model<LocalBufferStore>>;
64}
65
66struct RemoteBufferStore {
67 shared_with_me: HashSet<Model<Buffer>>,
68 upstream_client: AnyProtoClient,
69 project_id: u64,
70 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
71 remote_buffer_listeners:
72 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
73 worktree_store: Model<WorktreeStore>,
74 buffer_store: WeakModel<BufferStore>,
75}
76
77struct LocalBufferStore {
78 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
79 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
80 buffer_store: WeakModel<BufferStore>,
81 worktree_store: Model<WorktreeStore>,
82 _subscription: Subscription,
83}
84
85/// A set of open buffers.
86pub struct BufferStore {
87 state: Box<dyn BufferStoreImpl>,
88 #[allow(clippy::type_complexity)]
89 loading_buffers_by_path: HashMap<
90 ProjectPath,
91 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
92 >,
93 worktree_store: Model<WorktreeStore>,
94 opened_buffers: HashMap<BufferId, OpenBuffer>,
95 downstream_client: Option<(AnyProtoClient, u64)>,
96 shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
97}
98
99enum OpenBuffer {
100 Buffer(WeakModel<Buffer>),
101 Operations(Vec<Operation>),
102}
103
104pub enum BufferStoreEvent {
105 BufferAdded(Model<Buffer>),
106 BufferDropped(BufferId),
107 BufferChangedFilePath {
108 buffer: Model<Buffer>,
109 old_file: Option<Arc<dyn language::File>>,
110 },
111}
112
113#[derive(Default, Debug)]
114pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
115
116impl EventEmitter<BufferStoreEvent> for BufferStore {}
117
118impl RemoteBufferStore {
119 pub fn wait_for_remote_buffer(
120 &mut self,
121 id: BufferId,
122 cx: &mut AppContext,
123 ) -> Task<Result<Model<Buffer>>> {
124 let buffer_store = self.buffer_store.clone();
125 let (tx, rx) = oneshot::channel();
126 self.remote_buffer_listeners.entry(id).or_default().push(tx);
127
128 cx.spawn(|cx| async move {
129 if let Some(buffer) = buffer_store
130 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
131 .ok()
132 .flatten()
133 {
134 return Ok(buffer);
135 }
136
137 cx.background_executor()
138 .spawn(async move { rx.await? })
139 .await
140 })
141 }
142
143 fn save_remote_buffer(
144 &self,
145 buffer_handle: Model<Buffer>,
146 new_path: Option<proto::ProjectPath>,
147 cx: &ModelContext<Self>,
148 ) -> Task<Result<()>> {
149 let buffer = buffer_handle.read(cx);
150 let buffer_id = buffer.remote_id().into();
151 let version = buffer.version();
152 let rpc = self.upstream_client.clone();
153 let project_id = self.project_id;
154 cx.spawn(move |_, mut cx| async move {
155 let response = rpc
156 .request(proto::SaveBuffer {
157 project_id,
158 buffer_id,
159 new_path,
160 version: serialize_version(&version),
161 })
162 .await?;
163 let version = deserialize_version(&response.version);
164 let mtime = response.mtime.map(|mtime| mtime.into());
165
166 buffer_handle.update(&mut cx, |buffer, cx| {
167 buffer.did_save(version.clone(), mtime, cx);
168 })?;
169
170 Ok(())
171 })
172 }
173
174 pub fn handle_create_buffer_for_peer(
175 &mut self,
176 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
177 replica_id: u16,
178 capability: Capability,
179 cx: &mut ModelContext<Self>,
180 ) -> Result<Option<Model<Buffer>>> {
181 match envelope
182 .payload
183 .variant
184 .ok_or_else(|| anyhow!("missing variant"))?
185 {
186 proto::create_buffer_for_peer::Variant::State(mut state) => {
187 let buffer_id = BufferId::new(state.id)?;
188
189 let buffer_result = maybe!({
190 let mut buffer_file = None;
191 if let Some(file) = state.file.take() {
192 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
193 let worktree = self
194 .worktree_store
195 .read(cx)
196 .worktree_for_id(worktree_id, cx)
197 .ok_or_else(|| {
198 anyhow!("no worktree found for id {}", file.worktree_id)
199 })?;
200 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
201 as Arc<dyn language::File>);
202 }
203 Buffer::from_proto(replica_id, capability, state, buffer_file)
204 });
205
206 match buffer_result {
207 Ok(buffer) => {
208 let buffer = cx.new_model(|_| buffer);
209 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
210 }
211 Err(error) => {
212 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
213 for listener in listeners {
214 listener.send(Err(anyhow!(error.cloned()))).ok();
215 }
216 }
217 }
218 }
219 }
220 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
221 let buffer_id = BufferId::new(chunk.buffer_id)?;
222 let buffer = self
223 .loading_remote_buffers_by_id
224 .get(&buffer_id)
225 .cloned()
226 .ok_or_else(|| {
227 anyhow!(
228 "received chunk for buffer {} without initial state",
229 chunk.buffer_id
230 )
231 })?;
232
233 let result = maybe!({
234 let operations = chunk
235 .operations
236 .into_iter()
237 .map(language::proto::deserialize_operation)
238 .collect::<Result<Vec<_>>>()?;
239 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
240 anyhow::Ok(())
241 });
242
243 if let Err(error) = result {
244 self.loading_remote_buffers_by_id.remove(&buffer_id);
245 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
246 for listener in listeners {
247 listener.send(Err(error.cloned())).ok();
248 }
249 }
250 } else if chunk.is_last {
251 self.loading_remote_buffers_by_id.remove(&buffer_id);
252 if self.upstream_client.is_via_collab() {
253 // retain buffers sent by peers to avoid races.
254 self.shared_with_me.insert(buffer.clone());
255 }
256
257 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
258 for sender in senders {
259 sender.send(Ok(buffer.clone())).ok();
260 }
261 }
262 return Ok(Some(buffer));
263 }
264 }
265 }
266 return Ok(None);
267 }
268
269 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
270 self.loading_remote_buffers_by_id
271 .keys()
272 .copied()
273 .collect::<Vec<_>>()
274 }
275
276 pub fn deserialize_project_transaction(
277 &self,
278 message: proto::ProjectTransaction,
279 push_to_history: bool,
280 cx: &mut ModelContext<Self>,
281 ) -> Task<Result<ProjectTransaction>> {
282 cx.spawn(|this, mut cx| async move {
283 let mut project_transaction = ProjectTransaction::default();
284 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
285 {
286 let buffer_id = BufferId::new(buffer_id)?;
287 let buffer = this
288 .update(&mut cx, |this, cx| {
289 this.wait_for_remote_buffer(buffer_id, cx)
290 })?
291 .await?;
292 let transaction = language::proto::deserialize_transaction(transaction)?;
293 project_transaction.0.insert(buffer, transaction);
294 }
295
296 for (buffer, transaction) in &project_transaction.0 {
297 buffer
298 .update(&mut cx, |buffer, _| {
299 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
300 })?
301 .await?;
302
303 if push_to_history {
304 buffer.update(&mut cx, |buffer, _| {
305 buffer.push_transaction(transaction.clone(), Instant::now());
306 })?;
307 }
308 }
309
310 Ok(project_transaction)
311 })
312 }
313}
314
315impl BufferStoreImpl for Model<RemoteBufferStore> {
316 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
317 Some(self.clone())
318 }
319
320 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
321 None
322 }
323
324 fn save_buffer(
325 &self,
326 buffer: Model<Buffer>,
327 cx: &mut ModelContext<BufferStore>,
328 ) -> Task<Result<()>> {
329 self.update(cx, |this, cx| {
330 this.save_remote_buffer(buffer.clone(), None, cx)
331 })
332 }
333 fn save_buffer_as(
334 &self,
335 buffer: Model<Buffer>,
336 path: ProjectPath,
337 cx: &mut ModelContext<BufferStore>,
338 ) -> Task<Result<()>> {
339 self.update(cx, |this, cx| {
340 this.save_remote_buffer(buffer, Some(path.to_proto()), cx)
341 })
342 }
343
344 fn open_buffer(
345 &self,
346 path: Arc<Path>,
347 worktree: Model<Worktree>,
348 cx: &mut ModelContext<BufferStore>,
349 ) -> Task<Result<Model<Buffer>>> {
350 self.update(cx, |this, cx| {
351 let worktree_id = worktree.read(cx).id().to_proto();
352 let project_id = this.project_id;
353 let client = this.upstream_client.clone();
354 let path_string = path.clone().to_string_lossy().to_string();
355 cx.spawn(move |this, mut cx| async move {
356 let response = client
357 .request(proto::OpenBufferByPath {
358 project_id,
359 worktree_id,
360 path: path_string,
361 })
362 .await?;
363 let buffer_id = BufferId::new(response.buffer_id)?;
364
365 let buffer = this
366 .update(&mut cx, {
367 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
368 })?
369 .await?;
370
371 Ok(buffer)
372 })
373 })
374 }
375
376 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
377 self.update(cx, |this, cx| {
378 let create = this.upstream_client.request(proto::OpenNewBuffer {
379 project_id: this.project_id,
380 });
381 cx.spawn(|this, mut cx| async move {
382 let response = create.await?;
383 let buffer_id = BufferId::new(response.buffer_id)?;
384
385 this.update(&mut cx, |this, cx| {
386 this.wait_for_remote_buffer(buffer_id, cx)
387 })?
388 .await
389 })
390 })
391 }
392
393 fn reload_buffers(
394 &self,
395 buffers: HashSet<Model<Buffer>>,
396 push_to_history: bool,
397 cx: &mut ModelContext<BufferStore>,
398 ) -> Task<Result<ProjectTransaction>> {
399 self.update(cx, |this, cx| {
400 let request = this.upstream_client.request(proto::ReloadBuffers {
401 project_id: this.project_id,
402 buffer_ids: buffers
403 .iter()
404 .map(|buffer| buffer.read(cx).remote_id().to_proto())
405 .collect(),
406 });
407
408 cx.spawn(|this, mut cx| async move {
409 let response = request
410 .await?
411 .transaction
412 .ok_or_else(|| anyhow!("missing transaction"))?;
413 this.update(&mut cx, |this, cx| {
414 this.deserialize_project_transaction(response, push_to_history, cx)
415 })?
416 .await
417 })
418 })
419 }
420}
421
422impl LocalBufferStore {
423 fn save_local_buffer(
424 &self,
425 buffer_handle: Model<Buffer>,
426 worktree: Model<Worktree>,
427 path: Arc<Path>,
428 mut has_changed_file: bool,
429 cx: &mut ModelContext<Self>,
430 ) -> Task<Result<()>> {
431 let buffer = buffer_handle.read(cx);
432
433 let text = buffer.as_rope().clone();
434 let line_ending = buffer.line_ending();
435 let version = buffer.version();
436 let buffer_id = buffer.remote_id();
437 if buffer
438 .file()
439 .is_some_and(|file| file.disk_state() == DiskState::New)
440 {
441 has_changed_file = true;
442 }
443
444 let save = worktree.update(cx, |worktree, cx| {
445 worktree.write_file(path.as_ref(), text, line_ending, cx)
446 });
447
448 cx.spawn(move |this, mut cx| async move {
449 let new_file = save.await?;
450 let mtime = new_file.disk_state().mtime();
451 this.update(&mut cx, |this, cx| {
452 if let Some((downstream_client, project_id)) = this.downstream_client(cx) {
453 if has_changed_file {
454 downstream_client
455 .send(proto::UpdateBufferFile {
456 project_id,
457 buffer_id: buffer_id.to_proto(),
458 file: Some(language::File::to_proto(&*new_file, cx)),
459 })
460 .log_err();
461 }
462 downstream_client
463 .send(proto::BufferSaved {
464 project_id,
465 buffer_id: buffer_id.to_proto(),
466 version: serialize_version(&version),
467 mtime: mtime.map(|time| time.into()),
468 })
469 .log_err();
470 }
471 })?;
472 buffer_handle.update(&mut cx, |buffer, cx| {
473 if has_changed_file {
474 buffer.file_updated(new_file, cx);
475 }
476 buffer.did_save(version.clone(), mtime, cx);
477 })
478 })
479 }
480
481 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
482 cx.subscribe(worktree, |this, worktree, event, cx| {
483 if worktree.read(cx).is_local() {
484 match event {
485 worktree::Event::UpdatedEntries(changes) => {
486 this.local_worktree_entries_changed(&worktree, changes, cx);
487 }
488 worktree::Event::UpdatedGitRepositories(updated_repos) => {
489 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
490 }
491 _ => {}
492 }
493 }
494 })
495 .detach();
496 }
497
498 fn local_worktree_entries_changed(
499 &mut self,
500 worktree_handle: &Model<Worktree>,
501 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
502 cx: &mut ModelContext<Self>,
503 ) {
504 let snapshot = worktree_handle.read(cx).snapshot();
505 for (path, entry_id, _) in changes {
506 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
507 }
508 }
509
510 fn local_worktree_git_repos_changed(
511 &mut self,
512 worktree_handle: Model<Worktree>,
513 changed_repos: &UpdatedGitRepositoriesSet,
514 cx: &mut ModelContext<Self>,
515 ) {
516 debug_assert!(worktree_handle.read(cx).is_local());
517 let Some(buffer_store) = self.buffer_store.upgrade() else {
518 return;
519 };
520
521 // Identify the loading buffers whose containing repository that has changed.
522 let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| {
523 let future_buffers = buffer_store
524 .loading_buffers()
525 .filter_map(|(project_path, receiver)| {
526 if project_path.worktree_id != worktree_handle.read(cx).id() {
527 return None;
528 }
529 let path = &project_path.path;
530 changed_repos
531 .iter()
532 .find(|(work_dir, _)| path.starts_with(work_dir))?;
533 let path = path.clone();
534 Some(async move {
535 BufferStore::wait_for_loading_buffer(receiver)
536 .await
537 .ok()
538 .map(|buffer| (buffer, path))
539 })
540 })
541 .collect::<FuturesUnordered<_>>();
542
543 // Identify the current buffers whose containing repository has changed.
544 let current_buffers = buffer_store
545 .buffers()
546 .filter_map(|buffer| {
547 let file = File::from_dyn(buffer.read(cx).file())?;
548 if file.worktree != worktree_handle {
549 return None;
550 }
551 changed_repos
552 .iter()
553 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
554 Some((buffer, file.path.clone()))
555 })
556 .collect::<Vec<_>>();
557 (future_buffers, current_buffers)
558 });
559
560 if future_buffers.len() + current_buffers.len() == 0 {
561 return;
562 }
563
564 cx.spawn(move |this, mut cx| async move {
565 // Wait for all of the buffers to load.
566 let future_buffers = future_buffers.collect::<Vec<_>>().await;
567
568 // Reload the diff base for every buffer whose containing git repository has changed.
569 let snapshot =
570 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
571 let diff_bases_by_buffer = cx
572 .background_executor()
573 .spawn(async move {
574 let mut diff_base_tasks = future_buffers
575 .into_iter()
576 .flatten()
577 .chain(current_buffers)
578 .filter_map(|(buffer, path)| {
579 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
580 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
581 Some(async move {
582 let base_text =
583 local_repo_entry.repo().load_index_text(&relative_path);
584 Some((buffer, base_text))
585 })
586 })
587 .collect::<FuturesUnordered<_>>();
588
589 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
590 while let Some(diff_base) = diff_base_tasks.next().await {
591 if let Some(diff_base) = diff_base {
592 diff_bases.push(diff_base);
593 }
594 }
595 diff_bases
596 })
597 .await;
598
599 this.update(&mut cx, |this, cx| {
600 // Assign the new diff bases on all of the buffers.
601 for (buffer, diff_base) in diff_bases_by_buffer {
602 let buffer_id = buffer.update(cx, |buffer, cx| {
603 buffer.set_diff_base(diff_base.clone(), cx);
604 buffer.remote_id().to_proto()
605 });
606 if let Some((client, project_id)) = &this.downstream_client(cx) {
607 client
608 .send(proto::UpdateDiffBase {
609 project_id: *project_id,
610 buffer_id,
611 diff_base,
612 })
613 .log_err();
614 }
615 }
616 })
617 })
618 .detach_and_log_err(cx);
619 }
620
621 fn local_worktree_entry_changed(
622 &mut self,
623 entry_id: ProjectEntryId,
624 path: &Arc<Path>,
625 worktree: &Model<worktree::Worktree>,
626 snapshot: &worktree::Snapshot,
627 cx: &mut ModelContext<Self>,
628 ) -> Option<()> {
629 let project_path = ProjectPath {
630 worktree_id: snapshot.id(),
631 path: path.clone(),
632 };
633 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
634 Some(&buffer_id) => buffer_id,
635 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
636 };
637 let buffer = self
638 .buffer_store
639 .update(cx, |buffer_store, _| {
640 if let Some(buffer) = buffer_store.get(buffer_id) {
641 Some(buffer)
642 } else {
643 buffer_store.opened_buffers.remove(&buffer_id);
644 None
645 }
646 })
647 .ok()
648 .flatten();
649 let buffer = if let Some(buffer) = buffer {
650 buffer
651 } else {
652 self.local_buffer_ids_by_path.remove(&project_path);
653 self.local_buffer_ids_by_entry_id.remove(&entry_id);
654 return None;
655 };
656
657 let events = buffer.update(cx, |buffer, cx| {
658 let file = buffer.file()?;
659 let old_file = File::from_dyn(Some(file))?;
660 if old_file.worktree != *worktree {
661 return None;
662 }
663
664 let snapshot_entry = old_file
665 .entry_id
666 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
667 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
668
669 let new_file = if let Some(entry) = snapshot_entry {
670 File {
671 disk_state: match entry.mtime {
672 Some(mtime) => DiskState::Present { mtime },
673 None => old_file.disk_state,
674 },
675 is_local: true,
676 entry_id: Some(entry.id),
677 path: entry.path.clone(),
678 worktree: worktree.clone(),
679 is_private: entry.is_private,
680 }
681 } else {
682 File {
683 disk_state: DiskState::Deleted,
684 is_local: true,
685 entry_id: old_file.entry_id,
686 path: old_file.path.clone(),
687 worktree: worktree.clone(),
688 is_private: old_file.is_private,
689 }
690 };
691
692 if new_file == *old_file {
693 return None;
694 }
695
696 let mut events = Vec::new();
697 if new_file.path != old_file.path {
698 self.local_buffer_ids_by_path.remove(&ProjectPath {
699 path: old_file.path.clone(),
700 worktree_id: old_file.worktree_id(cx),
701 });
702 self.local_buffer_ids_by_path.insert(
703 ProjectPath {
704 worktree_id: new_file.worktree_id(cx),
705 path: new_file.path.clone(),
706 },
707 buffer_id,
708 );
709 events.push(BufferStoreEvent::BufferChangedFilePath {
710 buffer: cx.handle(),
711 old_file: buffer.file().cloned(),
712 });
713 }
714
715 if new_file.entry_id != old_file.entry_id {
716 if let Some(entry_id) = old_file.entry_id {
717 self.local_buffer_ids_by_entry_id.remove(&entry_id);
718 }
719 if let Some(entry_id) = new_file.entry_id {
720 self.local_buffer_ids_by_entry_id
721 .insert(entry_id, buffer_id);
722 }
723 }
724
725 if let Some((client, project_id)) = &self.downstream_client(cx) {
726 client
727 .send(proto::UpdateBufferFile {
728 project_id: *project_id,
729 buffer_id: buffer_id.to_proto(),
730 file: Some(new_file.to_proto(cx)),
731 })
732 .ok();
733 }
734
735 buffer.file_updated(Arc::new(new_file), cx);
736 Some(events)
737 })?;
738 self.buffer_store
739 .update(cx, |_buffer_store, cx| {
740 for event in events {
741 cx.emit(event);
742 }
743 })
744 .log_err()?;
745
746 None
747 }
748
749 fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> {
750 self.buffer_store
751 .upgrade()?
752 .read(cx)
753 .downstream_client
754 .clone()
755 }
756
757 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
758 let file = File::from_dyn(buffer.read(cx).file())?;
759
760 let remote_id = buffer.read(cx).remote_id();
761 if let Some(entry_id) = file.entry_id {
762 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
763 Some(_) => {
764 return None;
765 }
766 None => {
767 self.local_buffer_ids_by_entry_id
768 .insert(entry_id, remote_id);
769 }
770 }
771 };
772 self.local_buffer_ids_by_path.insert(
773 ProjectPath {
774 worktree_id: file.worktree_id(cx),
775 path: file.path.clone(),
776 },
777 remote_id,
778 );
779
780 Some(())
781 }
782}
783
784impl BufferStoreImpl for Model<LocalBufferStore> {
785 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
786 None
787 }
788
789 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
790 Some(self.clone())
791 }
792
793 fn save_buffer(
794 &self,
795 buffer: Model<Buffer>,
796 cx: &mut ModelContext<BufferStore>,
797 ) -> Task<Result<()>> {
798 self.update(cx, |this, cx| {
799 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
800 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
801 };
802 let worktree = file.worktree.clone();
803 this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
804 })
805 }
806
807 fn save_buffer_as(
808 &self,
809 buffer: Model<Buffer>,
810 path: ProjectPath,
811 cx: &mut ModelContext<BufferStore>,
812 ) -> Task<Result<()>> {
813 self.update(cx, |this, cx| {
814 let Some(worktree) = this
815 .worktree_store
816 .read(cx)
817 .worktree_for_id(path.worktree_id, cx)
818 else {
819 return Task::ready(Err(anyhow!("no such worktree")));
820 };
821 this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
822 })
823 }
824
825 fn open_buffer(
826 &self,
827 path: Arc<Path>,
828 worktree: Model<Worktree>,
829 cx: &mut ModelContext<BufferStore>,
830 ) -> Task<Result<Model<Buffer>>> {
831 let buffer_store = cx.weak_model();
832 self.update(cx, |_, cx| {
833 let load_buffer = worktree.update(cx, |worktree, cx| {
834 let load_file = worktree.load_file(path.as_ref(), cx);
835 let reservation = cx.reserve_model();
836 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
837 cx.spawn(move |_, mut cx| async move {
838 let loaded = load_file.await?;
839 let text_buffer = cx
840 .background_executor()
841 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
842 .await;
843 cx.insert_model(reservation, |_| {
844 Buffer::build(
845 text_buffer,
846 loaded.diff_base,
847 Some(loaded.file),
848 Capability::ReadWrite,
849 )
850 })
851 })
852 });
853
854 cx.spawn(move |this, mut cx| async move {
855 let buffer = match load_buffer.await {
856 Ok(buffer) => Ok(buffer),
857 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
858 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
859 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
860 Buffer::build(
861 text_buffer,
862 None,
863 Some(Arc::new(File {
864 worktree,
865 path,
866 disk_state: DiskState::New,
867 entry_id: None,
868 is_local: true,
869 is_private: false,
870 })),
871 Capability::ReadWrite,
872 )
873 }),
874 Err(e) => Err(e),
875 }?;
876 this.update(&mut cx, |this, cx| {
877 buffer_store.update(cx, |buffer_store, cx| {
878 buffer_store.add_buffer(buffer.clone(), cx)
879 })??;
880 let buffer_id = buffer.read(cx).remote_id();
881 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
882 this.local_buffer_ids_by_path.insert(
883 ProjectPath {
884 worktree_id: file.worktree_id(cx),
885 path: file.path.clone(),
886 },
887 buffer_id,
888 );
889
890 if let Some(entry_id) = file.entry_id {
891 this.local_buffer_ids_by_entry_id
892 .insert(entry_id, buffer_id);
893 }
894 }
895
896 anyhow::Ok(())
897 })??;
898
899 Ok(buffer)
900 })
901 })
902 }
903
904 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
905 let handle = self.clone();
906 cx.spawn(|buffer_store, mut cx| async move {
907 let buffer = cx.new_model(|cx| {
908 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
909 })?;
910 buffer_store.update(&mut cx, |buffer_store, cx| {
911 buffer_store.add_buffer(buffer.clone(), cx).log_err();
912 let buffer_id = buffer.read(cx).remote_id();
913 handle.update(cx, |this, cx| {
914 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
915 this.local_buffer_ids_by_path.insert(
916 ProjectPath {
917 worktree_id: file.worktree_id(cx),
918 path: file.path.clone(),
919 },
920 buffer_id,
921 );
922
923 if let Some(entry_id) = file.entry_id {
924 this.local_buffer_ids_by_entry_id
925 .insert(entry_id, buffer_id);
926 }
927 }
928 });
929 })?;
930 Ok(buffer)
931 })
932 }
933
934 fn reload_buffers(
935 &self,
936 buffers: HashSet<Model<Buffer>>,
937 push_to_history: bool,
938 cx: &mut ModelContext<BufferStore>,
939 ) -> Task<Result<ProjectTransaction>> {
940 cx.spawn(move |_, mut cx| async move {
941 let mut project_transaction = ProjectTransaction::default();
942 for buffer in buffers {
943 let transaction = buffer
944 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
945 .await?;
946 buffer.update(&mut cx, |buffer, cx| {
947 if let Some(transaction) = transaction {
948 if !push_to_history {
949 buffer.forget_transaction(transaction.id);
950 }
951 project_transaction.0.insert(cx.handle(), transaction);
952 }
953 })?;
954 }
955
956 Ok(project_transaction)
957 })
958 }
959}
960
961impl BufferStore {
962 pub fn init(client: &AnyProtoClient) {
963 client.add_model_message_handler(Self::handle_buffer_reloaded);
964 client.add_model_message_handler(Self::handle_buffer_saved);
965 client.add_model_message_handler(Self::handle_update_buffer_file);
966 client.add_model_message_handler(Self::handle_update_diff_base);
967 client.add_model_request_handler(Self::handle_save_buffer);
968 client.add_model_request_handler(Self::handle_blame_buffer);
969 client.add_model_request_handler(Self::handle_reload_buffers);
970 client.add_model_request_handler(Self::handle_get_permalink_to_line);
971 }
972
973 /// Creates a buffer store, optionally retaining its buffers.
974 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
975 let this = cx.weak_model();
976 Self {
977 state: Box::new(cx.new_model(|cx| {
978 let subscription = cx.subscribe(
979 &worktree_store,
980 |this: &mut LocalBufferStore, _, event, cx| {
981 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
982 this.subscribe_to_worktree(worktree, cx);
983 }
984 },
985 );
986
987 LocalBufferStore {
988 local_buffer_ids_by_path: Default::default(),
989 local_buffer_ids_by_entry_id: Default::default(),
990 buffer_store: this,
991 worktree_store: worktree_store.clone(),
992 _subscription: subscription,
993 }
994 })),
995 downstream_client: None,
996 opened_buffers: Default::default(),
997 shared_buffers: Default::default(),
998 loading_buffers_by_path: Default::default(),
999 worktree_store,
1000 }
1001 }
1002
1003 pub fn remote(
1004 worktree_store: Model<WorktreeStore>,
1005 upstream_client: AnyProtoClient,
1006 remote_id: u64,
1007 cx: &mut ModelContext<Self>,
1008 ) -> Self {
1009 let this = cx.weak_model();
1010 Self {
1011 state: Box::new(cx.new_model(|_| RemoteBufferStore {
1012 shared_with_me: Default::default(),
1013 loading_remote_buffers_by_id: Default::default(),
1014 remote_buffer_listeners: Default::default(),
1015 project_id: remote_id,
1016 upstream_client,
1017 worktree_store: worktree_store.clone(),
1018 buffer_store: this,
1019 })),
1020 downstream_client: None,
1021 opened_buffers: Default::default(),
1022 loading_buffers_by_path: Default::default(),
1023 shared_buffers: Default::default(),
1024 worktree_store,
1025 }
1026 }
1027
1028 pub fn open_buffer(
1029 &mut self,
1030 project_path: ProjectPath,
1031 cx: &mut ModelContext<Self>,
1032 ) -> Task<Result<Model<Buffer>>> {
1033 let existing_buffer = self.get_by_path(&project_path, cx);
1034 if let Some(existing_buffer) = existing_buffer {
1035 return Task::ready(Ok(existing_buffer));
1036 }
1037
1038 let Some(worktree) = self
1039 .worktree_store
1040 .read(cx)
1041 .worktree_for_id(project_path.worktree_id, cx)
1042 else {
1043 return Task::ready(Err(anyhow!("no such worktree")));
1044 };
1045
1046 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1047 // If the given path is already being loaded, then wait for that existing
1048 // task to complete and return the same buffer.
1049 hash_map::Entry::Occupied(e) => e.get().clone(),
1050
1051 // Otherwise, record the fact that this path is now being loaded.
1052 hash_map::Entry::Vacant(entry) => {
1053 let (mut tx, rx) = postage::watch::channel();
1054 entry.insert(rx.clone());
1055
1056 let project_path = project_path.clone();
1057 let load_buffer = self
1058 .state
1059 .open_buffer(project_path.path.clone(), worktree, cx);
1060
1061 cx.spawn(move |this, mut cx| async move {
1062 let load_result = load_buffer.await;
1063 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
1064 // Record the fact that the buffer is no longer loading.
1065 this.loading_buffers_by_path.remove(&project_path);
1066 let buffer = load_result.map_err(Arc::new)?;
1067 Ok(buffer)
1068 })?);
1069 anyhow::Ok(())
1070 })
1071 .detach();
1072 rx
1073 }
1074 };
1075
1076 cx.background_executor().spawn(async move {
1077 Self::wait_for_loading_buffer(loading_watch)
1078 .await
1079 .map_err(|e| e.cloned())
1080 })
1081 }
1082
1083 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1084 self.state.create_buffer(cx)
1085 }
1086
1087 pub fn save_buffer(
1088 &mut self,
1089 buffer: Model<Buffer>,
1090 cx: &mut ModelContext<Self>,
1091 ) -> Task<Result<()>> {
1092 self.state.save_buffer(buffer, cx)
1093 }
1094
1095 pub fn save_buffer_as(
1096 &mut self,
1097 buffer: Model<Buffer>,
1098 path: ProjectPath,
1099 cx: &mut ModelContext<Self>,
1100 ) -> Task<Result<()>> {
1101 let old_file = buffer.read(cx).file().cloned();
1102 let task = self.state.save_buffer_as(buffer.clone(), path, cx);
1103 cx.spawn(|this, mut cx| async move {
1104 task.await?;
1105 this.update(&mut cx, |_, cx| {
1106 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1107 })
1108 })
1109 }
1110
1111 pub fn blame_buffer(
1112 &self,
1113 buffer: &Model<Buffer>,
1114 version: Option<clock::Global>,
1115 cx: &AppContext,
1116 ) -> Task<Result<Option<Blame>>> {
1117 let buffer = buffer.read(cx);
1118 let Some(file) = File::from_dyn(buffer.file()) else {
1119 return Task::ready(Err(anyhow!("buffer has no file")));
1120 };
1121
1122 match file.worktree.clone().read(cx) {
1123 Worktree::Local(worktree) => {
1124 let worktree = worktree.snapshot();
1125 let blame_params = maybe!({
1126 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1127 Some(repo_for_path) => repo_for_path,
1128 None => return Ok(None),
1129 };
1130
1131 let relative_path = repo_entry
1132 .relativize(&worktree, &file.path)
1133 .context("failed to relativize buffer path")?;
1134
1135 let repo = local_repo_entry.repo().clone();
1136
1137 let content = match version {
1138 Some(version) => buffer.rope_for_version(&version).clone(),
1139 None => buffer.as_rope().clone(),
1140 };
1141
1142 anyhow::Ok(Some((repo, relative_path, content)))
1143 });
1144
1145 cx.background_executor().spawn(async move {
1146 let Some((repo, relative_path, content)) = blame_params? else {
1147 return Ok(None);
1148 };
1149 repo.blame(&relative_path, content)
1150 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1151 .map(Some)
1152 })
1153 }
1154 Worktree::Remote(worktree) => {
1155 let buffer_id = buffer.remote_id();
1156 let version = buffer.version();
1157 let project_id = worktree.project_id();
1158 let client = worktree.client();
1159 cx.spawn(|_| async move {
1160 let response = client
1161 .request(proto::BlameBuffer {
1162 project_id,
1163 buffer_id: buffer_id.into(),
1164 version: serialize_version(&version),
1165 })
1166 .await?;
1167 Ok(deserialize_blame_buffer_response(response))
1168 })
1169 }
1170 }
1171 }
1172
1173 pub fn get_permalink_to_line(
1174 &self,
1175 buffer: &Model<Buffer>,
1176 selection: Range<u32>,
1177 cx: &AppContext,
1178 ) -> Task<Result<url::Url>> {
1179 let buffer = buffer.read(cx);
1180 let Some(file) = File::from_dyn(buffer.file()) else {
1181 return Task::ready(Err(anyhow!("buffer has no file")));
1182 };
1183
1184 match file.worktree.clone().read(cx) {
1185 Worktree::Local(worktree) => {
1186 let Some(repo) = worktree.local_git_repo(file.path()) else {
1187 return Task::ready(Err(anyhow!("no repository for buffer found")));
1188 };
1189
1190 let path = file.path().clone();
1191
1192 cx.spawn(|cx| async move {
1193 const REMOTE_NAME: &str = "origin";
1194 let origin_url = repo
1195 .remote_url(REMOTE_NAME)
1196 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1197
1198 let sha = repo
1199 .head_sha()
1200 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1201
1202 let provider_registry =
1203 cx.update(GitHostingProviderRegistry::default_global)?;
1204
1205 let (provider, remote) =
1206 parse_git_remote_url(provider_registry, &origin_url)
1207 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1208
1209 let path = path
1210 .to_str()
1211 .context("failed to convert buffer path to string")?;
1212
1213 Ok(provider.build_permalink(
1214 remote,
1215 BuildPermalinkParams {
1216 sha: &sha,
1217 path,
1218 selection: Some(selection),
1219 },
1220 ))
1221 })
1222 }
1223 Worktree::Remote(worktree) => {
1224 let buffer_id = buffer.remote_id();
1225 let project_id = worktree.project_id();
1226 let client = worktree.client();
1227 cx.spawn(|_| async move {
1228 let response = client
1229 .request(proto::GetPermalinkToLine {
1230 project_id,
1231 buffer_id: buffer_id.into(),
1232 selection: Some(proto::Range {
1233 start: selection.start as u64,
1234 end: selection.end as u64,
1235 }),
1236 })
1237 .await?;
1238
1239 url::Url::parse(&response.permalink).context("failed to parse permalink")
1240 })
1241 }
1242 }
1243 }
1244
1245 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1246 let remote_id = buffer.read(cx).remote_id();
1247 let is_remote = buffer.read(cx).replica_id() != 0;
1248 let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
1249
1250 let handle = cx.handle().downgrade();
1251 buffer.update(cx, move |_, cx| {
1252 cx.on_release(move |buffer, cx| {
1253 handle
1254 .update(cx, |_, cx| {
1255 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1256 })
1257 .ok();
1258 })
1259 .detach()
1260 });
1261
1262 match self.opened_buffers.entry(remote_id) {
1263 hash_map::Entry::Vacant(entry) => {
1264 entry.insert(open_buffer);
1265 }
1266 hash_map::Entry::Occupied(mut entry) => {
1267 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1268 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1269 } else if entry.get().upgrade().is_some() {
1270 if is_remote {
1271 return Ok(());
1272 } else {
1273 debug_panic!("buffer {} was already registered", remote_id);
1274 Err(anyhow!("buffer {} was already registered", remote_id))?;
1275 }
1276 }
1277 entry.insert(open_buffer);
1278 }
1279 }
1280
1281 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1282 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1283 Ok(())
1284 }
1285
1286 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1287 self.opened_buffers
1288 .values()
1289 .filter_map(|buffer| buffer.upgrade())
1290 }
1291
1292 pub fn loading_buffers(
1293 &self,
1294 ) -> impl Iterator<
1295 Item = (
1296 &ProjectPath,
1297 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1298 ),
1299 > {
1300 self.loading_buffers_by_path
1301 .iter()
1302 .map(|(path, rx)| (path, rx.clone()))
1303 }
1304
1305 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1306 self.buffers().find_map(|buffer| {
1307 let file = File::from_dyn(buffer.read(cx).file())?;
1308 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1309 Some(buffer)
1310 } else {
1311 None
1312 }
1313 })
1314 }
1315
1316 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1317 self.opened_buffers
1318 .get(&buffer_id)
1319 .and_then(|buffer| buffer.upgrade())
1320 }
1321
1322 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1323 self.get(buffer_id)
1324 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1325 }
1326
1327 pub fn get_possibly_incomplete(
1328 &self,
1329 buffer_id: BufferId,
1330 cx: &AppContext,
1331 ) -> Option<Model<Buffer>> {
1332 self.get(buffer_id).or_else(|| {
1333 self.state.as_remote().and_then(|remote| {
1334 remote
1335 .read(cx)
1336 .loading_remote_buffers_by_id
1337 .get(&buffer_id)
1338 .cloned()
1339 })
1340 })
1341 }
1342
1343 pub fn buffer_version_info(
1344 &self,
1345 cx: &AppContext,
1346 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1347 let buffers = self
1348 .buffers()
1349 .map(|buffer| {
1350 let buffer = buffer.read(cx);
1351 proto::BufferVersion {
1352 id: buffer.remote_id().into(),
1353 version: language::proto::serialize_version(&buffer.version),
1354 }
1355 })
1356 .collect();
1357 let incomplete_buffer_ids = self
1358 .state
1359 .as_remote()
1360 .map(|remote| remote.read(cx).incomplete_buffer_ids())
1361 .unwrap_or_default();
1362 (buffers, incomplete_buffer_ids)
1363 }
1364
1365 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1366 for open_buffer in self.opened_buffers.values_mut() {
1367 if let Some(buffer) = open_buffer.upgrade() {
1368 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1369 }
1370 }
1371
1372 for buffer in self.buffers() {
1373 buffer.update(cx, |buffer, cx| {
1374 buffer.set_capability(Capability::ReadOnly, cx)
1375 });
1376 }
1377
1378 if let Some(remote) = self.state.as_remote() {
1379 remote.update(cx, |remote, _| {
1380 // Wake up all futures currently waiting on a buffer to get opened,
1381 // to give them a chance to fail now that we've disconnected.
1382 remote.remote_buffer_listeners.clear()
1383 })
1384 }
1385 }
1386
1387 pub fn shared(
1388 &mut self,
1389 remote_id: u64,
1390 downstream_client: AnyProtoClient,
1391 _cx: &mut AppContext,
1392 ) {
1393 self.downstream_client = Some((downstream_client, remote_id));
1394 }
1395
1396 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1397 self.downstream_client.take();
1398 self.forget_shared_buffers();
1399 }
1400
1401 pub fn discard_incomplete(&mut self) {
1402 self.opened_buffers
1403 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1404 }
1405
1406 pub fn find_search_candidates(
1407 &mut self,
1408 query: &SearchQuery,
1409 mut limit: usize,
1410 fs: Arc<dyn Fs>,
1411 cx: &mut ModelContext<Self>,
1412 ) -> Receiver<Model<Buffer>> {
1413 let (tx, rx) = smol::channel::unbounded();
1414 let mut open_buffers = HashSet::default();
1415 let mut unnamed_buffers = Vec::new();
1416 for handle in self.buffers() {
1417 let buffer = handle.read(cx);
1418 if let Some(entry_id) = buffer.entry_id(cx) {
1419 open_buffers.insert(entry_id);
1420 } else {
1421 limit = limit.saturating_sub(1);
1422 unnamed_buffers.push(handle)
1423 };
1424 }
1425
1426 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1427 let mut project_paths_rx = self
1428 .worktree_store
1429 .update(cx, |worktree_store, cx| {
1430 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1431 })
1432 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1433
1434 cx.spawn(|this, mut cx| async move {
1435 for buffer in unnamed_buffers {
1436 tx.send(buffer).await.ok();
1437 }
1438
1439 while let Some(project_paths) = project_paths_rx.next().await {
1440 let buffers = this.update(&mut cx, |this, cx| {
1441 project_paths
1442 .into_iter()
1443 .map(|project_path| this.open_buffer(project_path, cx))
1444 .collect::<Vec<_>>()
1445 })?;
1446 for buffer_task in buffers {
1447 if let Some(buffer) = buffer_task.await.log_err() {
1448 if tx.send(buffer).await.is_err() {
1449 return anyhow::Ok(());
1450 }
1451 }
1452 }
1453 }
1454 anyhow::Ok(())
1455 })
1456 .detach();
1457 rx
1458 }
1459
1460 fn on_buffer_event(
1461 &mut self,
1462 buffer: Model<Buffer>,
1463 event: &BufferEvent,
1464 cx: &mut ModelContext<Self>,
1465 ) {
1466 match event {
1467 BufferEvent::FileHandleChanged => {
1468 if let Some(local) = self.state.as_local() {
1469 local.update(cx, |local, cx| {
1470 local.buffer_changed_file(buffer, cx);
1471 })
1472 }
1473 }
1474 BufferEvent::Reloaded => {
1475 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1476 return;
1477 };
1478 let buffer = buffer.read(cx);
1479 downstream_client
1480 .send(proto::BufferReloaded {
1481 project_id: *project_id,
1482 buffer_id: buffer.remote_id().to_proto(),
1483 version: serialize_version(&buffer.version()),
1484 mtime: buffer.saved_mtime().map(|t| t.into()),
1485 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1486 })
1487 .log_err();
1488 }
1489 _ => {}
1490 }
1491 }
1492
1493 pub async fn handle_update_buffer(
1494 this: Model<Self>,
1495 envelope: TypedEnvelope<proto::UpdateBuffer>,
1496 mut cx: AsyncAppContext,
1497 ) -> Result<proto::Ack> {
1498 let payload = envelope.payload.clone();
1499 let buffer_id = BufferId::new(payload.buffer_id)?;
1500 let ops = payload
1501 .operations
1502 .into_iter()
1503 .map(language::proto::deserialize_operation)
1504 .collect::<Result<Vec<_>, _>>()?;
1505 this.update(&mut cx, |this, cx| {
1506 match this.opened_buffers.entry(buffer_id) {
1507 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1508 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1509 OpenBuffer::Buffer(buffer) => {
1510 if let Some(buffer) = buffer.upgrade() {
1511 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1512 }
1513 }
1514 },
1515 hash_map::Entry::Vacant(e) => {
1516 e.insert(OpenBuffer::Operations(ops));
1517 }
1518 }
1519 Ok(proto::Ack {})
1520 })?
1521 }
1522
1523 pub fn handle_synchronize_buffers(
1524 &mut self,
1525 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1526 cx: &mut ModelContext<Self>,
1527 client: Arc<Client>,
1528 ) -> Result<proto::SynchronizeBuffersResponse> {
1529 let project_id = envelope.payload.project_id;
1530 let mut response = proto::SynchronizeBuffersResponse {
1531 buffers: Default::default(),
1532 };
1533 let Some(guest_id) = envelope.original_sender_id else {
1534 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1535 };
1536
1537 self.shared_buffers.entry(guest_id).or_default().clear();
1538 for buffer in envelope.payload.buffers {
1539 let buffer_id = BufferId::new(buffer.id)?;
1540 let remote_version = language::proto::deserialize_version(&buffer.version);
1541 if let Some(buffer) = self.get(buffer_id) {
1542 self.shared_buffers
1543 .entry(guest_id)
1544 .or_default()
1545 .insert(buffer.clone());
1546
1547 let buffer = buffer.read(cx);
1548 response.buffers.push(proto::BufferVersion {
1549 id: buffer_id.into(),
1550 version: language::proto::serialize_version(&buffer.version),
1551 });
1552
1553 let operations = buffer.serialize_ops(Some(remote_version), cx);
1554 let client = client.clone();
1555 if let Some(file) = buffer.file() {
1556 client
1557 .send(proto::UpdateBufferFile {
1558 project_id,
1559 buffer_id: buffer_id.into(),
1560 file: Some(file.to_proto(cx)),
1561 })
1562 .log_err();
1563 }
1564
1565 client
1566 .send(proto::UpdateDiffBase {
1567 project_id,
1568 buffer_id: buffer_id.into(),
1569 diff_base: buffer.diff_base().map(ToString::to_string),
1570 })
1571 .log_err();
1572
1573 client
1574 .send(proto::BufferReloaded {
1575 project_id,
1576 buffer_id: buffer_id.into(),
1577 version: language::proto::serialize_version(buffer.saved_version()),
1578 mtime: buffer.saved_mtime().map(|time| time.into()),
1579 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1580 as i32,
1581 })
1582 .log_err();
1583
1584 cx.background_executor()
1585 .spawn(
1586 async move {
1587 let operations = operations.await;
1588 for chunk in split_operations(operations) {
1589 client
1590 .request(proto::UpdateBuffer {
1591 project_id,
1592 buffer_id: buffer_id.into(),
1593 operations: chunk,
1594 })
1595 .await?;
1596 }
1597 anyhow::Ok(())
1598 }
1599 .log_err(),
1600 )
1601 .detach();
1602 }
1603 }
1604 Ok(response)
1605 }
1606
1607 pub fn handle_create_buffer_for_peer(
1608 &mut self,
1609 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1610 replica_id: u16,
1611 capability: Capability,
1612 cx: &mut ModelContext<Self>,
1613 ) -> Result<()> {
1614 let Some(remote) = self.state.as_remote() else {
1615 return Err(anyhow!("buffer store is not a remote"));
1616 };
1617
1618 if let Some(buffer) = remote.update(cx, |remote, cx| {
1619 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)
1620 })? {
1621 self.add_buffer(buffer, cx)?;
1622 }
1623
1624 Ok(())
1625 }
1626
1627 pub async fn handle_update_buffer_file(
1628 this: Model<Self>,
1629 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1630 mut cx: AsyncAppContext,
1631 ) -> Result<()> {
1632 let buffer_id = envelope.payload.buffer_id;
1633 let buffer_id = BufferId::new(buffer_id)?;
1634
1635 this.update(&mut cx, |this, cx| {
1636 let payload = envelope.payload.clone();
1637 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1638 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1639 let worktree = this
1640 .worktree_store
1641 .read(cx)
1642 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1643 .ok_or_else(|| anyhow!("no such worktree"))?;
1644 let file = File::from_proto(file, worktree, cx)?;
1645 let old_file = buffer.update(cx, |buffer, cx| {
1646 let old_file = buffer.file().cloned();
1647 let new_path = file.path.clone();
1648 buffer.file_updated(Arc::new(file), cx);
1649 if old_file
1650 .as_ref()
1651 .map_or(true, |old| *old.path() != new_path)
1652 {
1653 Some(old_file)
1654 } else {
1655 None
1656 }
1657 });
1658 if let Some(old_file) = old_file {
1659 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1660 }
1661 }
1662 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1663 downstream_client
1664 .send(proto::UpdateBufferFile {
1665 project_id: *project_id,
1666 buffer_id: buffer_id.into(),
1667 file: envelope.payload.file,
1668 })
1669 .log_err();
1670 }
1671 Ok(())
1672 })?
1673 }
1674
1675 pub async fn handle_update_diff_base(
1676 this: Model<Self>,
1677 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1678 mut cx: AsyncAppContext,
1679 ) -> Result<()> {
1680 this.update(&mut cx, |this, cx| {
1681 let buffer_id = envelope.payload.buffer_id;
1682 let buffer_id = BufferId::new(buffer_id)?;
1683 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1684 buffer.update(cx, |buffer, cx| {
1685 buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
1686 });
1687 }
1688 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1689 downstream_client
1690 .send(proto::UpdateDiffBase {
1691 project_id: *project_id,
1692 buffer_id: buffer_id.into(),
1693 diff_base: envelope.payload.diff_base,
1694 })
1695 .log_err();
1696 }
1697 Ok(())
1698 })?
1699 }
1700
1701 pub async fn handle_save_buffer(
1702 this: Model<Self>,
1703 envelope: TypedEnvelope<proto::SaveBuffer>,
1704 mut cx: AsyncAppContext,
1705 ) -> Result<proto::BufferSaved> {
1706 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1707 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1708 anyhow::Ok((
1709 this.get_existing(buffer_id)?,
1710 this.downstream_client
1711 .as_ref()
1712 .map(|(_, project_id)| *project_id)
1713 .context("project is not shared")?,
1714 ))
1715 })??;
1716 buffer
1717 .update(&mut cx, |buffer, _| {
1718 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1719 })?
1720 .await?;
1721 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1722
1723 if let Some(new_path) = envelope.payload.new_path {
1724 let new_path = ProjectPath::from_proto(new_path);
1725 this.update(&mut cx, |this, cx| {
1726 this.save_buffer_as(buffer.clone(), new_path, cx)
1727 })?
1728 .await?;
1729 } else {
1730 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1731 .await?;
1732 }
1733
1734 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1735 project_id,
1736 buffer_id: buffer_id.into(),
1737 version: serialize_version(buffer.saved_version()),
1738 mtime: buffer.saved_mtime().map(|time| time.into()),
1739 })
1740 }
1741
1742 pub async fn handle_close_buffer(
1743 this: Model<Self>,
1744 envelope: TypedEnvelope<proto::CloseBuffer>,
1745 mut cx: AsyncAppContext,
1746 ) -> Result<()> {
1747 let peer_id = envelope.sender_id;
1748 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1749 this.update(&mut cx, |this, _| {
1750 if let Some(buffer) = this.get(buffer_id) {
1751 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1752 if shared.remove(&buffer) {
1753 if shared.is_empty() {
1754 this.shared_buffers.remove(&peer_id);
1755 }
1756 return;
1757 }
1758 }
1759 };
1760 debug_panic!(
1761 "peer_id {} closed buffer_id {} which was either not open or already closed",
1762 peer_id,
1763 buffer_id
1764 )
1765 })
1766 }
1767
1768 pub async fn handle_buffer_saved(
1769 this: Model<Self>,
1770 envelope: TypedEnvelope<proto::BufferSaved>,
1771 mut cx: AsyncAppContext,
1772 ) -> Result<()> {
1773 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1774 let version = deserialize_version(&envelope.payload.version);
1775 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1776 this.update(&mut cx, move |this, cx| {
1777 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1778 buffer.update(cx, |buffer, cx| {
1779 buffer.did_save(version, mtime, cx);
1780 });
1781 }
1782
1783 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1784 downstream_client
1785 .send(proto::BufferSaved {
1786 project_id: *project_id,
1787 buffer_id: buffer_id.into(),
1788 mtime: envelope.payload.mtime,
1789 version: envelope.payload.version,
1790 })
1791 .log_err();
1792 }
1793 })
1794 }
1795
1796 pub async fn handle_buffer_reloaded(
1797 this: Model<Self>,
1798 envelope: TypedEnvelope<proto::BufferReloaded>,
1799 mut cx: AsyncAppContext,
1800 ) -> Result<()> {
1801 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1802 let version = deserialize_version(&envelope.payload.version);
1803 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1804 let line_ending = deserialize_line_ending(
1805 proto::LineEnding::from_i32(envelope.payload.line_ending)
1806 .ok_or_else(|| anyhow!("missing line ending"))?,
1807 );
1808 this.update(&mut cx, |this, cx| {
1809 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1810 buffer.update(cx, |buffer, cx| {
1811 buffer.did_reload(version, line_ending, mtime, cx);
1812 });
1813 }
1814
1815 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1816 downstream_client
1817 .send(proto::BufferReloaded {
1818 project_id: *project_id,
1819 buffer_id: buffer_id.into(),
1820 mtime: envelope.payload.mtime,
1821 version: envelope.payload.version,
1822 line_ending: envelope.payload.line_ending,
1823 })
1824 .log_err();
1825 }
1826 })
1827 }
1828
1829 pub async fn handle_blame_buffer(
1830 this: Model<Self>,
1831 envelope: TypedEnvelope<proto::BlameBuffer>,
1832 mut cx: AsyncAppContext,
1833 ) -> Result<proto::BlameBufferResponse> {
1834 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1835 let version = deserialize_version(&envelope.payload.version);
1836 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1837 buffer
1838 .update(&mut cx, |buffer, _| {
1839 buffer.wait_for_version(version.clone())
1840 })?
1841 .await?;
1842 let blame = this
1843 .update(&mut cx, |this, cx| {
1844 this.blame_buffer(&buffer, Some(version), cx)
1845 })?
1846 .await?;
1847 Ok(serialize_blame_buffer_response(blame))
1848 }
1849
1850 pub async fn handle_get_permalink_to_line(
1851 this: Model<Self>,
1852 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1853 mut cx: AsyncAppContext,
1854 ) -> Result<proto::GetPermalinkToLineResponse> {
1855 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1856 // let version = deserialize_version(&envelope.payload.version);
1857 let selection = {
1858 let proto_selection = envelope
1859 .payload
1860 .selection
1861 .context("no selection to get permalink for defined")?;
1862 proto_selection.start as u32..proto_selection.end as u32
1863 };
1864 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1865 let permalink = this
1866 .update(&mut cx, |this, cx| {
1867 this.get_permalink_to_line(&buffer, selection, cx)
1868 })?
1869 .await?;
1870 Ok(proto::GetPermalinkToLineResponse {
1871 permalink: permalink.to_string(),
1872 })
1873 }
1874
1875 pub async fn wait_for_loading_buffer(
1876 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1877 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1878 loop {
1879 if let Some(result) = receiver.borrow().as_ref() {
1880 match result {
1881 Ok(buffer) => return Ok(buffer.to_owned()),
1882 Err(e) => return Err(e.to_owned()),
1883 }
1884 }
1885 receiver.next().await;
1886 }
1887 }
1888
1889 pub fn reload_buffers(
1890 &self,
1891 buffers: HashSet<Model<Buffer>>,
1892 push_to_history: bool,
1893 cx: &mut ModelContext<Self>,
1894 ) -> Task<Result<ProjectTransaction>> {
1895 if buffers.is_empty() {
1896 return Task::ready(Ok(ProjectTransaction::default()));
1897 }
1898
1899 self.state.reload_buffers(buffers, push_to_history, cx)
1900 }
1901
1902 async fn handle_reload_buffers(
1903 this: Model<Self>,
1904 envelope: TypedEnvelope<proto::ReloadBuffers>,
1905 mut cx: AsyncAppContext,
1906 ) -> Result<proto::ReloadBuffersResponse> {
1907 let sender_id = envelope.original_sender_id().unwrap_or_default();
1908 let reload = this.update(&mut cx, |this, cx| {
1909 let mut buffers = HashSet::default();
1910 for buffer_id in &envelope.payload.buffer_ids {
1911 let buffer_id = BufferId::new(*buffer_id)?;
1912 buffers.insert(this.get_existing(buffer_id)?);
1913 }
1914 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1915 })??;
1916
1917 let project_transaction = reload.await?;
1918 let project_transaction = this.update(&mut cx, |this, cx| {
1919 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1920 })?;
1921 Ok(proto::ReloadBuffersResponse {
1922 transaction: Some(project_transaction),
1923 })
1924 }
1925
1926 pub fn create_buffer_for_peer(
1927 &mut self,
1928 buffer: &Model<Buffer>,
1929 peer_id: proto::PeerId,
1930 cx: &mut ModelContext<Self>,
1931 ) -> Task<Result<()>> {
1932 let buffer_id = buffer.read(cx).remote_id();
1933 if !self
1934 .shared_buffers
1935 .entry(peer_id)
1936 .or_default()
1937 .insert(buffer.clone())
1938 {
1939 return Task::ready(Ok(()));
1940 }
1941
1942 let Some((client, project_id)) = self.downstream_client.clone() else {
1943 return Task::ready(Ok(()));
1944 };
1945
1946 cx.spawn(|this, mut cx| async move {
1947 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1948 return anyhow::Ok(());
1949 };
1950
1951 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1952 let operations = operations.await;
1953 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1954
1955 let initial_state = proto::CreateBufferForPeer {
1956 project_id,
1957 peer_id: Some(peer_id),
1958 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1959 };
1960
1961 if client.send(initial_state).log_err().is_some() {
1962 let client = client.clone();
1963 cx.background_executor()
1964 .spawn(async move {
1965 let mut chunks = split_operations(operations).peekable();
1966 while let Some(chunk) = chunks.next() {
1967 let is_last = chunks.peek().is_none();
1968 client.send(proto::CreateBufferForPeer {
1969 project_id,
1970 peer_id: Some(peer_id),
1971 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1972 proto::BufferChunk {
1973 buffer_id: buffer_id.into(),
1974 operations: chunk,
1975 is_last,
1976 },
1977 )),
1978 })?;
1979 }
1980 anyhow::Ok(())
1981 })
1982 .await
1983 .log_err();
1984 }
1985 Ok(())
1986 })
1987 }
1988
1989 pub fn forget_shared_buffers(&mut self) {
1990 self.shared_buffers.clear();
1991 }
1992
1993 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1994 self.shared_buffers.remove(peer_id);
1995 }
1996
1997 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1998 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1999 self.shared_buffers.insert(new_peer_id, buffers);
2000 }
2001 }
2002
2003 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
2004 &self.shared_buffers
2005 }
2006
2007 pub fn create_local_buffer(
2008 &mut self,
2009 text: &str,
2010 language: Option<Arc<Language>>,
2011 cx: &mut ModelContext<Self>,
2012 ) -> Model<Buffer> {
2013 let buffer = cx.new_model(|cx| {
2014 Buffer::local(text, cx)
2015 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2016 });
2017
2018 self.add_buffer(buffer.clone(), cx).log_err();
2019 let buffer_id = buffer.read(cx).remote_id();
2020
2021 let local = self
2022 .state
2023 .as_local()
2024 .expect("local-only method called in a non-local context");
2025 local.update(cx, |this, cx| {
2026 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2027 this.local_buffer_ids_by_path.insert(
2028 ProjectPath {
2029 worktree_id: file.worktree_id(cx),
2030 path: file.path.clone(),
2031 },
2032 buffer_id,
2033 );
2034
2035 if let Some(entry_id) = file.entry_id {
2036 this.local_buffer_ids_by_entry_id
2037 .insert(entry_id, buffer_id);
2038 }
2039 }
2040 });
2041 buffer
2042 }
2043
2044 pub fn deserialize_project_transaction(
2045 &mut self,
2046 message: proto::ProjectTransaction,
2047 push_to_history: bool,
2048 cx: &mut ModelContext<Self>,
2049 ) -> Task<Result<ProjectTransaction>> {
2050 if let Some(remote) = self.state.as_remote() {
2051 remote.update(cx, |remote, cx| {
2052 remote.deserialize_project_transaction(message, push_to_history, cx)
2053 })
2054 } else {
2055 debug_panic!("not a remote buffer store");
2056 Task::ready(Err(anyhow!("not a remote buffer store")))
2057 }
2058 }
2059
2060 pub fn wait_for_remote_buffer(
2061 &self,
2062 id: BufferId,
2063 cx: &mut AppContext,
2064 ) -> Task<Result<Model<Buffer>>> {
2065 if let Some(remote) = self.state.as_remote() {
2066 remote.update(cx, |remote, cx| remote.wait_for_remote_buffer(id, cx))
2067 } else {
2068 debug_panic!("not a remote buffer store");
2069 Task::ready(Err(anyhow!("not a remote buffer store")))
2070 }
2071 }
2072
2073 pub fn serialize_project_transaction_for_peer(
2074 &mut self,
2075 project_transaction: ProjectTransaction,
2076 peer_id: proto::PeerId,
2077 cx: &mut ModelContext<Self>,
2078 ) -> proto::ProjectTransaction {
2079 let mut serialized_transaction = proto::ProjectTransaction {
2080 buffer_ids: Default::default(),
2081 transactions: Default::default(),
2082 };
2083 for (buffer, transaction) in project_transaction.0 {
2084 self.create_buffer_for_peer(&buffer, peer_id, cx)
2085 .detach_and_log_err(cx);
2086 serialized_transaction
2087 .buffer_ids
2088 .push(buffer.read(cx).remote_id().into());
2089 serialized_transaction
2090 .transactions
2091 .push(language::proto::serialize_transaction(&transaction));
2092 }
2093 serialized_transaction
2094 }
2095}
2096
2097impl OpenBuffer {
2098 fn upgrade(&self) -> Option<Model<Buffer>> {
2099 match self {
2100 OpenBuffer::Buffer(handle) => handle.upgrade(),
2101 OpenBuffer::Operations(_) => None,
2102 }
2103 }
2104}
2105
2106fn is_not_found_error(error: &anyhow::Error) -> bool {
2107 error
2108 .root_cause()
2109 .downcast_ref::<io::Error>()
2110 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2111}
2112
2113fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2114 let Some(blame) = blame else {
2115 return proto::BlameBufferResponse {
2116 blame_response: None,
2117 };
2118 };
2119
2120 let entries = blame
2121 .entries
2122 .into_iter()
2123 .map(|entry| proto::BlameEntry {
2124 sha: entry.sha.as_bytes().into(),
2125 start_line: entry.range.start,
2126 end_line: entry.range.end,
2127 original_line_number: entry.original_line_number,
2128 author: entry.author.clone(),
2129 author_mail: entry.author_mail.clone(),
2130 author_time: entry.author_time,
2131 author_tz: entry.author_tz.clone(),
2132 committer: entry.committer.clone(),
2133 committer_mail: entry.committer_mail.clone(),
2134 committer_time: entry.committer_time,
2135 committer_tz: entry.committer_tz.clone(),
2136 summary: entry.summary.clone(),
2137 previous: entry.previous.clone(),
2138 filename: entry.filename.clone(),
2139 })
2140 .collect::<Vec<_>>();
2141
2142 let messages = blame
2143 .messages
2144 .into_iter()
2145 .map(|(oid, message)| proto::CommitMessage {
2146 oid: oid.as_bytes().into(),
2147 message,
2148 })
2149 .collect::<Vec<_>>();
2150
2151 let permalinks = blame
2152 .permalinks
2153 .into_iter()
2154 .map(|(oid, url)| proto::CommitPermalink {
2155 oid: oid.as_bytes().into(),
2156 permalink: url.to_string(),
2157 })
2158 .collect::<Vec<_>>();
2159
2160 proto::BlameBufferResponse {
2161 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2162 entries,
2163 messages,
2164 permalinks,
2165 remote_url: blame.remote_url,
2166 }),
2167 }
2168}
2169
2170fn deserialize_blame_buffer_response(
2171 response: proto::BlameBufferResponse,
2172) -> Option<git::blame::Blame> {
2173 let response = response.blame_response?;
2174 let entries = response
2175 .entries
2176 .into_iter()
2177 .filter_map(|entry| {
2178 Some(git::blame::BlameEntry {
2179 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2180 range: entry.start_line..entry.end_line,
2181 original_line_number: entry.original_line_number,
2182 committer: entry.committer,
2183 committer_time: entry.committer_time,
2184 committer_tz: entry.committer_tz,
2185 committer_mail: entry.committer_mail,
2186 author: entry.author,
2187 author_mail: entry.author_mail,
2188 author_time: entry.author_time,
2189 author_tz: entry.author_tz,
2190 summary: entry.summary,
2191 previous: entry.previous,
2192 filename: entry.filename,
2193 })
2194 })
2195 .collect::<Vec<_>>();
2196
2197 let messages = response
2198 .messages
2199 .into_iter()
2200 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2201 .collect::<HashMap<_, _>>();
2202
2203 let permalinks = response
2204 .permalinks
2205 .into_iter()
2206 .filter_map(|permalink| {
2207 Some((
2208 git::Oid::from_bytes(&permalink.oid).ok()?,
2209 Url::from_str(&permalink.permalink).ok()?,
2210 ))
2211 })
2212 .collect::<HashMap<_, _>>();
2213
2214 Some(Blame {
2215 entries,
2216 permalinks,
2217 messages,
2218 remote_url: response.remote_url,
2219 })
2220}