1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, NoRepositoryError, ProjectPath,
5};
6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
7use anyhow::{anyhow, Context as _, Result};
8use client::Client;
9use collections::{hash_map, HashMap, HashSet};
10use fs::Fs;
11use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
12use git::blame::Blame;
13use gpui::{
14 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
15 Task, WeakModel,
16};
17use http_client::Url;
18use language::{
19 proto::{
20 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
21 split_operations,
22 },
23 Buffer, BufferEvent, Capability, File as _, Language, Operation,
24};
25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
26use smol::channel::Receiver;
27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
28use text::BufferId;
29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
31
32trait BufferStoreImpl {
33 fn open_buffer(
34 &self,
35 path: Arc<Path>,
36 worktree: Model<Worktree>,
37 cx: &mut ModelContext<BufferStore>,
38 ) -> Task<Result<Model<Buffer>>>;
39
40 fn save_buffer(
41 &self,
42 buffer: Model<Buffer>,
43 cx: &mut ModelContext<BufferStore>,
44 ) -> Task<Result<()>>;
45
46 fn save_buffer_as(
47 &self,
48 buffer: Model<Buffer>,
49 path: ProjectPath,
50 cx: &mut ModelContext<BufferStore>,
51 ) -> Task<Result<()>>;
52
53 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>>;
54
55 fn reload_buffers(
56 &self,
57 buffers: HashSet<Model<Buffer>>,
58 push_to_history: bool,
59 cx: &mut ModelContext<BufferStore>,
60 ) -> Task<Result<ProjectTransaction>>;
61
62 fn as_remote(&self) -> Option<Model<RemoteBufferStore>>;
63 fn as_local(&self) -> Option<Model<LocalBufferStore>>;
64}
65
66struct RemoteBufferStore {
67 shared_with_me: HashSet<Model<Buffer>>,
68 upstream_client: AnyProtoClient,
69 project_id: u64,
70 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
71 remote_buffer_listeners:
72 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
73 worktree_store: Model<WorktreeStore>,
74 buffer_store: WeakModel<BufferStore>,
75}
76
77struct LocalBufferStore {
78 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
79 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
80 buffer_store: WeakModel<BufferStore>,
81 worktree_store: Model<WorktreeStore>,
82 _subscription: Subscription,
83}
84
85/// A set of open buffers.
86pub struct BufferStore {
87 state: Box<dyn BufferStoreImpl>,
88 #[allow(clippy::type_complexity)]
89 loading_buffers_by_path: HashMap<
90 ProjectPath,
91 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
92 >,
93 worktree_store: Model<WorktreeStore>,
94 opened_buffers: HashMap<BufferId, OpenBuffer>,
95 downstream_client: Option<(AnyProtoClient, u64)>,
96 shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
97}
98
99enum OpenBuffer {
100 Buffer(WeakModel<Buffer>),
101 Operations(Vec<Operation>),
102}
103
104pub enum BufferStoreEvent {
105 BufferAdded(Model<Buffer>),
106 BufferDropped(BufferId),
107 BufferChangedFilePath {
108 buffer: Model<Buffer>,
109 old_file: Option<Arc<dyn language::File>>,
110 },
111}
112
113#[derive(Default, Debug)]
114pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
115
116impl EventEmitter<BufferStoreEvent> for BufferStore {}
117
118impl RemoteBufferStore {
119 pub fn wait_for_remote_buffer(
120 &mut self,
121 id: BufferId,
122 cx: &mut AppContext,
123 ) -> Task<Result<Model<Buffer>>> {
124 let buffer_store = self.buffer_store.clone();
125 let (tx, rx) = oneshot::channel();
126 self.remote_buffer_listeners.entry(id).or_default().push(tx);
127
128 cx.spawn(|cx| async move {
129 if let Some(buffer) = buffer_store
130 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
131 .ok()
132 .flatten()
133 {
134 return Ok(buffer);
135 }
136
137 cx.background_executor()
138 .spawn(async move { rx.await? })
139 .await
140 })
141 }
142
143 fn save_remote_buffer(
144 &self,
145 buffer_handle: Model<Buffer>,
146 new_path: Option<proto::ProjectPath>,
147 cx: &ModelContext<Self>,
148 ) -> Task<Result<()>> {
149 let buffer = buffer_handle.read(cx);
150 let buffer_id = buffer.remote_id().into();
151 let version = buffer.version();
152 let rpc = self.upstream_client.clone();
153 let project_id = self.project_id;
154 cx.spawn(move |_, mut cx| async move {
155 let response = rpc
156 .request(proto::SaveBuffer {
157 project_id,
158 buffer_id,
159 new_path,
160 version: serialize_version(&version),
161 })
162 .await?;
163 let version = deserialize_version(&response.version);
164 let mtime = response.mtime.map(|mtime| mtime.into());
165
166 buffer_handle.update(&mut cx, |buffer, cx| {
167 buffer.did_save(version.clone(), mtime, cx);
168 })?;
169
170 Ok(())
171 })
172 }
173
174 pub fn handle_create_buffer_for_peer(
175 &mut self,
176 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
177 replica_id: u16,
178 capability: Capability,
179 cx: &mut ModelContext<Self>,
180 ) -> Result<Option<Model<Buffer>>> {
181 match envelope
182 .payload
183 .variant
184 .ok_or_else(|| anyhow!("missing variant"))?
185 {
186 proto::create_buffer_for_peer::Variant::State(mut state) => {
187 let buffer_id = BufferId::new(state.id)?;
188
189 let buffer_result = maybe!({
190 let mut buffer_file = None;
191 if let Some(file) = state.file.take() {
192 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
193 let worktree = self
194 .worktree_store
195 .read(cx)
196 .worktree_for_id(worktree_id, cx)
197 .ok_or_else(|| {
198 anyhow!("no worktree found for id {}", file.worktree_id)
199 })?;
200 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
201 as Arc<dyn language::File>);
202 }
203 Buffer::from_proto(replica_id, capability, state, buffer_file)
204 });
205
206 match buffer_result {
207 Ok(buffer) => {
208 let buffer = cx.new_model(|_| buffer);
209 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
210 }
211 Err(error) => {
212 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
213 for listener in listeners {
214 listener.send(Err(anyhow!(error.cloned()))).ok();
215 }
216 }
217 }
218 }
219 }
220 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
221 let buffer_id = BufferId::new(chunk.buffer_id)?;
222 let buffer = self
223 .loading_remote_buffers_by_id
224 .get(&buffer_id)
225 .cloned()
226 .ok_or_else(|| {
227 anyhow!(
228 "received chunk for buffer {} without initial state",
229 chunk.buffer_id
230 )
231 })?;
232
233 let result = maybe!({
234 let operations = chunk
235 .operations
236 .into_iter()
237 .map(language::proto::deserialize_operation)
238 .collect::<Result<Vec<_>>>()?;
239 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
240 anyhow::Ok(())
241 });
242
243 if let Err(error) = result {
244 self.loading_remote_buffers_by_id.remove(&buffer_id);
245 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
246 for listener in listeners {
247 listener.send(Err(error.cloned())).ok();
248 }
249 }
250 } else if chunk.is_last {
251 self.loading_remote_buffers_by_id.remove(&buffer_id);
252 if self.upstream_client.is_via_collab() {
253 // retain buffers sent by peers to avoid races.
254 self.shared_with_me.insert(buffer.clone());
255 }
256
257 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
258 for sender in senders {
259 sender.send(Ok(buffer.clone())).ok();
260 }
261 }
262 return Ok(Some(buffer));
263 }
264 }
265 }
266 return Ok(None);
267 }
268
269 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
270 self.loading_remote_buffers_by_id
271 .keys()
272 .copied()
273 .collect::<Vec<_>>()
274 }
275
276 pub fn deserialize_project_transaction(
277 &self,
278 message: proto::ProjectTransaction,
279 push_to_history: bool,
280 cx: &mut ModelContext<Self>,
281 ) -> Task<Result<ProjectTransaction>> {
282 cx.spawn(|this, mut cx| async move {
283 let mut project_transaction = ProjectTransaction::default();
284 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
285 {
286 let buffer_id = BufferId::new(buffer_id)?;
287 let buffer = this
288 .update(&mut cx, |this, cx| {
289 this.wait_for_remote_buffer(buffer_id, cx)
290 })?
291 .await?;
292 let transaction = language::proto::deserialize_transaction(transaction)?;
293 project_transaction.0.insert(buffer, transaction);
294 }
295
296 for (buffer, transaction) in &project_transaction.0 {
297 buffer
298 .update(&mut cx, |buffer, _| {
299 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
300 })?
301 .await?;
302
303 if push_to_history {
304 buffer.update(&mut cx, |buffer, _| {
305 buffer.push_transaction(transaction.clone(), Instant::now());
306 })?;
307 }
308 }
309
310 Ok(project_transaction)
311 })
312 }
313}
314
315impl BufferStoreImpl for Model<RemoteBufferStore> {
316 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
317 Some(self.clone())
318 }
319
320 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
321 None
322 }
323
324 fn save_buffer(
325 &self,
326 buffer: Model<Buffer>,
327 cx: &mut ModelContext<BufferStore>,
328 ) -> Task<Result<()>> {
329 self.update(cx, |this, cx| {
330 this.save_remote_buffer(buffer.clone(), None, cx)
331 })
332 }
333 fn save_buffer_as(
334 &self,
335 buffer: Model<Buffer>,
336 path: ProjectPath,
337 cx: &mut ModelContext<BufferStore>,
338 ) -> Task<Result<()>> {
339 self.update(cx, |this, cx| {
340 this.save_remote_buffer(buffer, Some(path.to_proto()), cx)
341 })
342 }
343
344 fn open_buffer(
345 &self,
346 path: Arc<Path>,
347 worktree: Model<Worktree>,
348 cx: &mut ModelContext<BufferStore>,
349 ) -> Task<Result<Model<Buffer>>> {
350 self.update(cx, |this, cx| {
351 let worktree_id = worktree.read(cx).id().to_proto();
352 let project_id = this.project_id;
353 let client = this.upstream_client.clone();
354 let path_string = path.clone().to_string_lossy().to_string();
355 cx.spawn(move |this, mut cx| async move {
356 let response = client
357 .request(proto::OpenBufferByPath {
358 project_id,
359 worktree_id,
360 path: path_string,
361 })
362 .await?;
363 let buffer_id = BufferId::new(response.buffer_id)?;
364
365 let buffer = this
366 .update(&mut cx, {
367 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
368 })?
369 .await?;
370
371 Ok(buffer)
372 })
373 })
374 }
375
376 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
377 self.update(cx, |this, cx| {
378 let create = this.upstream_client.request(proto::OpenNewBuffer {
379 project_id: this.project_id,
380 });
381 cx.spawn(|this, mut cx| async move {
382 let response = create.await?;
383 let buffer_id = BufferId::new(response.buffer_id)?;
384
385 this.update(&mut cx, |this, cx| {
386 this.wait_for_remote_buffer(buffer_id, cx)
387 })?
388 .await
389 })
390 })
391 }
392
393 fn reload_buffers(
394 &self,
395 buffers: HashSet<Model<Buffer>>,
396 push_to_history: bool,
397 cx: &mut ModelContext<BufferStore>,
398 ) -> Task<Result<ProjectTransaction>> {
399 self.update(cx, |this, cx| {
400 let request = this.upstream_client.request(proto::ReloadBuffers {
401 project_id: this.project_id,
402 buffer_ids: buffers
403 .iter()
404 .map(|buffer| buffer.read(cx).remote_id().to_proto())
405 .collect(),
406 });
407
408 cx.spawn(|this, mut cx| async move {
409 let response = request
410 .await?
411 .transaction
412 .ok_or_else(|| anyhow!("missing transaction"))?;
413 this.update(&mut cx, |this, cx| {
414 this.deserialize_project_transaction(response, push_to_history, cx)
415 })?
416 .await
417 })
418 })
419 }
420}
421
422impl LocalBufferStore {
423 fn save_local_buffer(
424 &self,
425 buffer_handle: Model<Buffer>,
426 worktree: Model<Worktree>,
427 path: Arc<Path>,
428 mut has_changed_file: bool,
429 cx: &mut ModelContext<Self>,
430 ) -> Task<Result<()>> {
431 let buffer = buffer_handle.read(cx);
432
433 let text = buffer.as_rope().clone();
434 let line_ending = buffer.line_ending();
435 let version = buffer.version();
436 let buffer_id = buffer.remote_id();
437 if buffer.file().is_some_and(|file| !file.is_created()) {
438 has_changed_file = true;
439 }
440
441 let save = worktree.update(cx, |worktree, cx| {
442 worktree.write_file(path.as_ref(), text, line_ending, cx)
443 });
444
445 cx.spawn(move |this, mut cx| async move {
446 let new_file = save.await?;
447 let mtime = new_file.mtime;
448 this.update(&mut cx, |this, cx| {
449 if let Some((downstream_client, project_id)) = this.downstream_client(cx) {
450 if has_changed_file {
451 downstream_client
452 .send(proto::UpdateBufferFile {
453 project_id,
454 buffer_id: buffer_id.to_proto(),
455 file: Some(language::File::to_proto(&*new_file, cx)),
456 })
457 .log_err();
458 }
459 downstream_client
460 .send(proto::BufferSaved {
461 project_id,
462 buffer_id: buffer_id.to_proto(),
463 version: serialize_version(&version),
464 mtime: mtime.map(|time| time.into()),
465 })
466 .log_err();
467 }
468 })?;
469 buffer_handle.update(&mut cx, |buffer, cx| {
470 if has_changed_file {
471 buffer.file_updated(new_file, cx);
472 }
473 buffer.did_save(version.clone(), mtime, cx);
474 })
475 })
476 }
477
478 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
479 cx.subscribe(worktree, |this, worktree, event, cx| {
480 if worktree.read(cx).is_local() {
481 match event {
482 worktree::Event::UpdatedEntries(changes) => {
483 this.local_worktree_entries_changed(&worktree, changes, cx);
484 }
485 worktree::Event::UpdatedGitRepositories(updated_repos) => {
486 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
487 }
488 _ => {}
489 }
490 }
491 })
492 .detach();
493 }
494
495 fn local_worktree_entries_changed(
496 &mut self,
497 worktree_handle: &Model<Worktree>,
498 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
499 cx: &mut ModelContext<Self>,
500 ) {
501 let snapshot = worktree_handle.read(cx).snapshot();
502 for (path, entry_id, _) in changes {
503 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
504 }
505 }
506
507 fn local_worktree_git_repos_changed(
508 &mut self,
509 worktree_handle: Model<Worktree>,
510 changed_repos: &UpdatedGitRepositoriesSet,
511 cx: &mut ModelContext<Self>,
512 ) {
513 debug_assert!(worktree_handle.read(cx).is_local());
514 let Some(buffer_store) = self.buffer_store.upgrade() else {
515 return;
516 };
517
518 // Identify the loading buffers whose containing repository that has changed.
519 let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| {
520 let future_buffers = buffer_store
521 .loading_buffers()
522 .filter_map(|(project_path, receiver)| {
523 if project_path.worktree_id != worktree_handle.read(cx).id() {
524 return None;
525 }
526 let path = &project_path.path;
527 changed_repos
528 .iter()
529 .find(|(work_dir, _)| path.starts_with(work_dir))?;
530 let path = path.clone();
531 Some(async move {
532 BufferStore::wait_for_loading_buffer(receiver)
533 .await
534 .ok()
535 .map(|buffer| (buffer, path))
536 })
537 })
538 .collect::<FuturesUnordered<_>>();
539
540 // Identify the current buffers whose containing repository has changed.
541 let current_buffers = buffer_store
542 .buffers()
543 .filter_map(|buffer| {
544 let file = File::from_dyn(buffer.read(cx).file())?;
545 if file.worktree != worktree_handle {
546 return None;
547 }
548 changed_repos
549 .iter()
550 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
551 Some((buffer, file.path.clone()))
552 })
553 .collect::<Vec<_>>();
554 (future_buffers, current_buffers)
555 });
556
557 if future_buffers.len() + current_buffers.len() == 0 {
558 return;
559 }
560
561 cx.spawn(move |this, mut cx| async move {
562 // Wait for all of the buffers to load.
563 let future_buffers = future_buffers.collect::<Vec<_>>().await;
564
565 // Reload the diff base for every buffer whose containing git repository has changed.
566 let snapshot =
567 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
568 let diff_bases_by_buffer = cx
569 .background_executor()
570 .spawn(async move {
571 let mut diff_base_tasks = future_buffers
572 .into_iter()
573 .flatten()
574 .chain(current_buffers)
575 .filter_map(|(buffer, path)| {
576 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
577 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
578 Some(async move {
579 let base_text =
580 local_repo_entry.repo().load_index_text(&relative_path);
581 Some((buffer, base_text))
582 })
583 })
584 .collect::<FuturesUnordered<_>>();
585
586 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
587 while let Some(diff_base) = diff_base_tasks.next().await {
588 if let Some(diff_base) = diff_base {
589 diff_bases.push(diff_base);
590 }
591 }
592 diff_bases
593 })
594 .await;
595
596 this.update(&mut cx, |this, cx| {
597 // Assign the new diff bases on all of the buffers.
598 for (buffer, diff_base) in diff_bases_by_buffer {
599 let buffer_id = buffer.update(cx, |buffer, cx| {
600 buffer.set_diff_base(diff_base.clone(), cx);
601 buffer.remote_id().to_proto()
602 });
603 if let Some((client, project_id)) = &this.downstream_client(cx) {
604 client
605 .send(proto::UpdateDiffBase {
606 project_id: *project_id,
607 buffer_id,
608 diff_base,
609 })
610 .log_err();
611 }
612 }
613 })
614 })
615 .detach_and_log_err(cx);
616 }
617
618 fn local_worktree_entry_changed(
619 &mut self,
620 entry_id: ProjectEntryId,
621 path: &Arc<Path>,
622 worktree: &Model<worktree::Worktree>,
623 snapshot: &worktree::Snapshot,
624 cx: &mut ModelContext<Self>,
625 ) -> Option<()> {
626 let project_path = ProjectPath {
627 worktree_id: snapshot.id(),
628 path: path.clone(),
629 };
630 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
631 Some(&buffer_id) => buffer_id,
632 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
633 };
634 let buffer = self
635 .buffer_store
636 .update(cx, |buffer_store, _| {
637 if let Some(buffer) = buffer_store.get(buffer_id) {
638 Some(buffer)
639 } else {
640 buffer_store.opened_buffers.remove(&buffer_id);
641 None
642 }
643 })
644 .ok()
645 .flatten();
646 let buffer = if let Some(buffer) = buffer {
647 buffer
648 } else {
649 self.local_buffer_ids_by_path.remove(&project_path);
650 self.local_buffer_ids_by_entry_id.remove(&entry_id);
651 return None;
652 };
653
654 let events = buffer.update(cx, |buffer, cx| {
655 let file = buffer.file()?;
656 let old_file = File::from_dyn(Some(file))?;
657 if old_file.worktree != *worktree {
658 return None;
659 }
660
661 let new_file = if let Some(entry) = old_file
662 .entry_id
663 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
664 {
665 File {
666 is_local: true,
667 entry_id: Some(entry.id),
668 mtime: entry.mtime,
669 path: entry.path.clone(),
670 worktree: worktree.clone(),
671 is_deleted: false,
672 is_private: entry.is_private,
673 }
674 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
675 File {
676 is_local: true,
677 entry_id: Some(entry.id),
678 mtime: entry.mtime,
679 path: entry.path.clone(),
680 worktree: worktree.clone(),
681 is_deleted: false,
682 is_private: entry.is_private,
683 }
684 } else {
685 File {
686 is_local: true,
687 entry_id: old_file.entry_id,
688 path: old_file.path.clone(),
689 mtime: old_file.mtime,
690 worktree: worktree.clone(),
691 is_deleted: true,
692 is_private: old_file.is_private,
693 }
694 };
695
696 if new_file == *old_file {
697 return None;
698 }
699
700 let mut events = Vec::new();
701 if new_file.path != old_file.path {
702 self.local_buffer_ids_by_path.remove(&ProjectPath {
703 path: old_file.path.clone(),
704 worktree_id: old_file.worktree_id(cx),
705 });
706 self.local_buffer_ids_by_path.insert(
707 ProjectPath {
708 worktree_id: new_file.worktree_id(cx),
709 path: new_file.path.clone(),
710 },
711 buffer_id,
712 );
713 events.push(BufferStoreEvent::BufferChangedFilePath {
714 buffer: cx.handle(),
715 old_file: buffer.file().cloned(),
716 });
717 }
718
719 if new_file.entry_id != old_file.entry_id {
720 if let Some(entry_id) = old_file.entry_id {
721 self.local_buffer_ids_by_entry_id.remove(&entry_id);
722 }
723 if let Some(entry_id) = new_file.entry_id {
724 self.local_buffer_ids_by_entry_id
725 .insert(entry_id, buffer_id);
726 }
727 }
728
729 if let Some((client, project_id)) = &self.downstream_client(cx) {
730 client
731 .send(proto::UpdateBufferFile {
732 project_id: *project_id,
733 buffer_id: buffer_id.to_proto(),
734 file: Some(new_file.to_proto(cx)),
735 })
736 .ok();
737 }
738
739 buffer.file_updated(Arc::new(new_file), cx);
740 Some(events)
741 })?;
742 self.buffer_store
743 .update(cx, |_buffer_store, cx| {
744 for event in events {
745 cx.emit(event);
746 }
747 })
748 .log_err()?;
749
750 None
751 }
752
753 fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> {
754 self.buffer_store
755 .upgrade()?
756 .read(cx)
757 .downstream_client
758 .clone()
759 }
760
761 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
762 let file = File::from_dyn(buffer.read(cx).file())?;
763
764 let remote_id = buffer.read(cx).remote_id();
765 if let Some(entry_id) = file.entry_id {
766 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
767 Some(_) => {
768 return None;
769 }
770 None => {
771 self.local_buffer_ids_by_entry_id
772 .insert(entry_id, remote_id);
773 }
774 }
775 };
776 self.local_buffer_ids_by_path.insert(
777 ProjectPath {
778 worktree_id: file.worktree_id(cx),
779 path: file.path.clone(),
780 },
781 remote_id,
782 );
783
784 Some(())
785 }
786}
787
788impl BufferStoreImpl for Model<LocalBufferStore> {
789 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
790 None
791 }
792
793 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
794 Some(self.clone())
795 }
796
797 fn save_buffer(
798 &self,
799 buffer: Model<Buffer>,
800 cx: &mut ModelContext<BufferStore>,
801 ) -> Task<Result<()>> {
802 self.update(cx, |this, cx| {
803 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
804 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
805 };
806 let worktree = file.worktree.clone();
807 this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
808 })
809 }
810
811 fn save_buffer_as(
812 &self,
813 buffer: Model<Buffer>,
814 path: ProjectPath,
815 cx: &mut ModelContext<BufferStore>,
816 ) -> Task<Result<()>> {
817 self.update(cx, |this, cx| {
818 let Some(worktree) = this
819 .worktree_store
820 .read(cx)
821 .worktree_for_id(path.worktree_id, cx)
822 else {
823 return Task::ready(Err(anyhow!("no such worktree")));
824 };
825 this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
826 })
827 }
828
829 fn open_buffer(
830 &self,
831 path: Arc<Path>,
832 worktree: Model<Worktree>,
833 cx: &mut ModelContext<BufferStore>,
834 ) -> Task<Result<Model<Buffer>>> {
835 let buffer_store = cx.weak_model();
836 self.update(cx, |_, cx| {
837 let load_buffer = worktree.update(cx, |worktree, cx| {
838 let load_file = worktree.load_file(path.as_ref(), cx);
839 let reservation = cx.reserve_model();
840 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
841 cx.spawn(move |_, mut cx| async move {
842 let loaded = load_file.await?;
843 let text_buffer = cx
844 .background_executor()
845 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
846 .await;
847 cx.insert_model(reservation, |_| {
848 Buffer::build(
849 text_buffer,
850 loaded.diff_base,
851 Some(loaded.file),
852 Capability::ReadWrite,
853 )
854 })
855 })
856 });
857
858 cx.spawn(move |this, mut cx| async move {
859 let buffer = match load_buffer.await {
860 Ok(buffer) => Ok(buffer),
861 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
862 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
863 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
864 Buffer::build(
865 text_buffer,
866 None,
867 Some(Arc::new(File {
868 worktree,
869 path,
870 mtime: None,
871 entry_id: None,
872 is_local: true,
873 is_deleted: false,
874 is_private: false,
875 })),
876 Capability::ReadWrite,
877 )
878 }),
879 Err(e) => Err(e),
880 }?;
881 this.update(&mut cx, |this, cx| {
882 buffer_store.update(cx, |buffer_store, cx| {
883 buffer_store.add_buffer(buffer.clone(), cx)
884 })??;
885 let buffer_id = buffer.read(cx).remote_id();
886 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
887 this.local_buffer_ids_by_path.insert(
888 ProjectPath {
889 worktree_id: file.worktree_id(cx),
890 path: file.path.clone(),
891 },
892 buffer_id,
893 );
894
895 if let Some(entry_id) = file.entry_id {
896 this.local_buffer_ids_by_entry_id
897 .insert(entry_id, buffer_id);
898 }
899 }
900
901 anyhow::Ok(())
902 })??;
903
904 Ok(buffer)
905 })
906 })
907 }
908
909 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
910 let handle = self.clone();
911 cx.spawn(|buffer_store, mut cx| async move {
912 let buffer = cx.new_model(|cx| {
913 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
914 })?;
915 buffer_store.update(&mut cx, |buffer_store, cx| {
916 buffer_store.add_buffer(buffer.clone(), cx).log_err();
917 let buffer_id = buffer.read(cx).remote_id();
918 handle.update(cx, |this, cx| {
919 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
920 this.local_buffer_ids_by_path.insert(
921 ProjectPath {
922 worktree_id: file.worktree_id(cx),
923 path: file.path.clone(),
924 },
925 buffer_id,
926 );
927
928 if let Some(entry_id) = file.entry_id {
929 this.local_buffer_ids_by_entry_id
930 .insert(entry_id, buffer_id);
931 }
932 }
933 });
934 })?;
935 Ok(buffer)
936 })
937 }
938
939 fn reload_buffers(
940 &self,
941 buffers: HashSet<Model<Buffer>>,
942 push_to_history: bool,
943 cx: &mut ModelContext<BufferStore>,
944 ) -> Task<Result<ProjectTransaction>> {
945 cx.spawn(move |_, mut cx| async move {
946 let mut project_transaction = ProjectTransaction::default();
947 for buffer in buffers {
948 let transaction = buffer
949 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
950 .await?;
951 buffer.update(&mut cx, |buffer, cx| {
952 if let Some(transaction) = transaction {
953 if !push_to_history {
954 buffer.forget_transaction(transaction.id);
955 }
956 project_transaction.0.insert(cx.handle(), transaction);
957 }
958 })?;
959 }
960
961 Ok(project_transaction)
962 })
963 }
964}
965
966impl BufferStore {
967 pub fn init(client: &AnyProtoClient) {
968 client.add_model_message_handler(Self::handle_buffer_reloaded);
969 client.add_model_message_handler(Self::handle_buffer_saved);
970 client.add_model_message_handler(Self::handle_update_buffer_file);
971 client.add_model_message_handler(Self::handle_update_diff_base);
972 client.add_model_request_handler(Self::handle_save_buffer);
973 client.add_model_request_handler(Self::handle_blame_buffer);
974 client.add_model_request_handler(Self::handle_reload_buffers);
975 client.add_model_request_handler(Self::handle_get_permalink_to_line);
976 }
977
978 /// Creates a buffer store, optionally retaining its buffers.
979 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
980 let this = cx.weak_model();
981 Self {
982 state: Box::new(cx.new_model(|cx| {
983 let subscription = cx.subscribe(
984 &worktree_store,
985 |this: &mut LocalBufferStore, _, event, cx| {
986 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
987 this.subscribe_to_worktree(worktree, cx);
988 }
989 },
990 );
991
992 LocalBufferStore {
993 local_buffer_ids_by_path: Default::default(),
994 local_buffer_ids_by_entry_id: Default::default(),
995 buffer_store: this,
996 worktree_store: worktree_store.clone(),
997 _subscription: subscription,
998 }
999 })),
1000 downstream_client: None,
1001 opened_buffers: Default::default(),
1002 shared_buffers: Default::default(),
1003 loading_buffers_by_path: Default::default(),
1004 worktree_store,
1005 }
1006 }
1007
1008 pub fn remote(
1009 worktree_store: Model<WorktreeStore>,
1010 upstream_client: AnyProtoClient,
1011 remote_id: u64,
1012 cx: &mut ModelContext<Self>,
1013 ) -> Self {
1014 let this = cx.weak_model();
1015 Self {
1016 state: Box::new(cx.new_model(|_| RemoteBufferStore {
1017 shared_with_me: Default::default(),
1018 loading_remote_buffers_by_id: Default::default(),
1019 remote_buffer_listeners: Default::default(),
1020 project_id: remote_id,
1021 upstream_client,
1022 worktree_store: worktree_store.clone(),
1023 buffer_store: this,
1024 })),
1025 downstream_client: None,
1026 opened_buffers: Default::default(),
1027 loading_buffers_by_path: Default::default(),
1028 shared_buffers: Default::default(),
1029 worktree_store,
1030 }
1031 }
1032
1033 pub fn open_buffer(
1034 &mut self,
1035 project_path: ProjectPath,
1036 cx: &mut ModelContext<Self>,
1037 ) -> Task<Result<Model<Buffer>>> {
1038 let existing_buffer = self.get_by_path(&project_path, cx);
1039 if let Some(existing_buffer) = existing_buffer {
1040 return Task::ready(Ok(existing_buffer));
1041 }
1042
1043 let Some(worktree) = self
1044 .worktree_store
1045 .read(cx)
1046 .worktree_for_id(project_path.worktree_id, cx)
1047 else {
1048 return Task::ready(Err(anyhow!("no such worktree")));
1049 };
1050
1051 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1052 // If the given path is already being loaded, then wait for that existing
1053 // task to complete and return the same buffer.
1054 hash_map::Entry::Occupied(e) => e.get().clone(),
1055
1056 // Otherwise, record the fact that this path is now being loaded.
1057 hash_map::Entry::Vacant(entry) => {
1058 let (mut tx, rx) = postage::watch::channel();
1059 entry.insert(rx.clone());
1060
1061 let project_path = project_path.clone();
1062 let load_buffer = self
1063 .state
1064 .open_buffer(project_path.path.clone(), worktree, cx);
1065
1066 cx.spawn(move |this, mut cx| async move {
1067 let load_result = load_buffer.await;
1068 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
1069 // Record the fact that the buffer is no longer loading.
1070 this.loading_buffers_by_path.remove(&project_path);
1071 let buffer = load_result.map_err(Arc::new)?;
1072 Ok(buffer)
1073 })?);
1074 anyhow::Ok(())
1075 })
1076 .detach();
1077 rx
1078 }
1079 };
1080
1081 cx.background_executor().spawn(async move {
1082 Self::wait_for_loading_buffer(loading_watch)
1083 .await
1084 .map_err(|e| e.cloned())
1085 })
1086 }
1087
1088 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1089 self.state.create_buffer(cx)
1090 }
1091
1092 pub fn save_buffer(
1093 &mut self,
1094 buffer: Model<Buffer>,
1095 cx: &mut ModelContext<Self>,
1096 ) -> Task<Result<()>> {
1097 self.state.save_buffer(buffer, cx)
1098 }
1099
1100 pub fn save_buffer_as(
1101 &mut self,
1102 buffer: Model<Buffer>,
1103 path: ProjectPath,
1104 cx: &mut ModelContext<Self>,
1105 ) -> Task<Result<()>> {
1106 let old_file = buffer.read(cx).file().cloned();
1107 let task = self.state.save_buffer_as(buffer.clone(), path, cx);
1108 cx.spawn(|this, mut cx| async move {
1109 task.await?;
1110 this.update(&mut cx, |_, cx| {
1111 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1112 })
1113 })
1114 }
1115
1116 pub fn blame_buffer(
1117 &self,
1118 buffer: &Model<Buffer>,
1119 version: Option<clock::Global>,
1120 cx: &AppContext,
1121 ) -> Task<Result<Blame>> {
1122 let buffer = buffer.read(cx);
1123 let Some(file) = File::from_dyn(buffer.file()) else {
1124 return Task::ready(Err(anyhow!("buffer has no file")));
1125 };
1126
1127 match file.worktree.clone().read(cx) {
1128 Worktree::Local(worktree) => {
1129 let worktree = worktree.snapshot();
1130 let blame_params = maybe!({
1131 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1132 Some(repo_for_path) => repo_for_path,
1133 None => anyhow::bail!(NoRepositoryError {}),
1134 };
1135
1136 let relative_path = repo_entry
1137 .relativize(&worktree, &file.path)
1138 .context("failed to relativize buffer path")?;
1139
1140 let repo = local_repo_entry.repo().clone();
1141
1142 let content = match version {
1143 Some(version) => buffer.rope_for_version(&version).clone(),
1144 None => buffer.as_rope().clone(),
1145 };
1146
1147 anyhow::Ok((repo, relative_path, content))
1148 });
1149
1150 cx.background_executor().spawn(async move {
1151 let (repo, relative_path, content) = blame_params?;
1152 repo.blame(&relative_path, content)
1153 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1154 })
1155 }
1156 Worktree::Remote(worktree) => {
1157 let buffer_id = buffer.remote_id();
1158 let version = buffer.version();
1159 let project_id = worktree.project_id();
1160 let client = worktree.client();
1161 cx.spawn(|_| async move {
1162 let response = client
1163 .request(proto::BlameBuffer {
1164 project_id,
1165 buffer_id: buffer_id.into(),
1166 version: serialize_version(&version),
1167 })
1168 .await?;
1169 Ok(deserialize_blame_buffer_response(response))
1170 })
1171 }
1172 }
1173 }
1174
1175 pub fn get_permalink_to_line(
1176 &self,
1177 buffer: &Model<Buffer>,
1178 selection: Range<u32>,
1179 cx: &AppContext,
1180 ) -> Task<Result<url::Url>> {
1181 let buffer = buffer.read(cx);
1182 let Some(file) = File::from_dyn(buffer.file()) else {
1183 return Task::ready(Err(anyhow!("buffer has no file")));
1184 };
1185
1186 match file.worktree.clone().read(cx) {
1187 Worktree::Local(worktree) => {
1188 let Some(repo) = worktree.local_git_repo(file.path()) else {
1189 return Task::ready(Err(anyhow!("no repository for buffer found")));
1190 };
1191
1192 let path = file.path().clone();
1193
1194 cx.spawn(|cx| async move {
1195 const REMOTE_NAME: &str = "origin";
1196 let origin_url = repo
1197 .remote_url(REMOTE_NAME)
1198 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1199
1200 let sha = repo
1201 .head_sha()
1202 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1203
1204 let provider_registry =
1205 cx.update(GitHostingProviderRegistry::default_global)?;
1206
1207 let (provider, remote) =
1208 parse_git_remote_url(provider_registry, &origin_url)
1209 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1210
1211 let path = path
1212 .to_str()
1213 .context("failed to convert buffer path to string")?;
1214
1215 Ok(provider.build_permalink(
1216 remote,
1217 BuildPermalinkParams {
1218 sha: &sha,
1219 path,
1220 selection: Some(selection),
1221 },
1222 ))
1223 })
1224 }
1225 Worktree::Remote(worktree) => {
1226 let buffer_id = buffer.remote_id();
1227 let project_id = worktree.project_id();
1228 let client = worktree.client();
1229 cx.spawn(|_| async move {
1230 let response = client
1231 .request(proto::GetPermalinkToLine {
1232 project_id,
1233 buffer_id: buffer_id.into(),
1234 selection: Some(proto::Range {
1235 start: selection.start as u64,
1236 end: selection.end as u64,
1237 }),
1238 })
1239 .await?;
1240
1241 url::Url::parse(&response.permalink).context("failed to parse permalink")
1242 })
1243 }
1244 }
1245 }
1246
1247 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1248 let remote_id = buffer.read(cx).remote_id();
1249 let is_remote = buffer.read(cx).replica_id() != 0;
1250 let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
1251
1252 let handle = cx.handle().downgrade();
1253 buffer.update(cx, move |_, cx| {
1254 cx.on_release(move |buffer, cx| {
1255 handle
1256 .update(cx, |_, cx| {
1257 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1258 })
1259 .ok();
1260 })
1261 .detach()
1262 });
1263
1264 match self.opened_buffers.entry(remote_id) {
1265 hash_map::Entry::Vacant(entry) => {
1266 entry.insert(open_buffer);
1267 }
1268 hash_map::Entry::Occupied(mut entry) => {
1269 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1270 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1271 } else if entry.get().upgrade().is_some() {
1272 if is_remote {
1273 return Ok(());
1274 } else {
1275 debug_panic!("buffer {} was already registered", remote_id);
1276 Err(anyhow!("buffer {} was already registered", remote_id))?;
1277 }
1278 }
1279 entry.insert(open_buffer);
1280 }
1281 }
1282
1283 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1284 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1285 Ok(())
1286 }
1287
1288 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1289 self.opened_buffers
1290 .values()
1291 .filter_map(|buffer| buffer.upgrade())
1292 }
1293
1294 pub fn loading_buffers(
1295 &self,
1296 ) -> impl Iterator<
1297 Item = (
1298 &ProjectPath,
1299 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1300 ),
1301 > {
1302 self.loading_buffers_by_path
1303 .iter()
1304 .map(|(path, rx)| (path, rx.clone()))
1305 }
1306
1307 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1308 self.buffers().find_map(|buffer| {
1309 let file = File::from_dyn(buffer.read(cx).file())?;
1310 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1311 Some(buffer)
1312 } else {
1313 None
1314 }
1315 })
1316 }
1317
1318 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1319 self.opened_buffers
1320 .get(&buffer_id)
1321 .and_then(|buffer| buffer.upgrade())
1322 }
1323
1324 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1325 self.get(buffer_id)
1326 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1327 }
1328
1329 pub fn get_possibly_incomplete(
1330 &self,
1331 buffer_id: BufferId,
1332 cx: &AppContext,
1333 ) -> Option<Model<Buffer>> {
1334 self.get(buffer_id).or_else(|| {
1335 self.state.as_remote().and_then(|remote| {
1336 remote
1337 .read(cx)
1338 .loading_remote_buffers_by_id
1339 .get(&buffer_id)
1340 .cloned()
1341 })
1342 })
1343 }
1344
1345 pub fn buffer_version_info(
1346 &self,
1347 cx: &AppContext,
1348 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1349 let buffers = self
1350 .buffers()
1351 .map(|buffer| {
1352 let buffer = buffer.read(cx);
1353 proto::BufferVersion {
1354 id: buffer.remote_id().into(),
1355 version: language::proto::serialize_version(&buffer.version),
1356 }
1357 })
1358 .collect();
1359 let incomplete_buffer_ids = self
1360 .state
1361 .as_remote()
1362 .map(|remote| remote.read(cx).incomplete_buffer_ids())
1363 .unwrap_or_default();
1364 (buffers, incomplete_buffer_ids)
1365 }
1366
1367 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1368 for open_buffer in self.opened_buffers.values_mut() {
1369 if let Some(buffer) = open_buffer.upgrade() {
1370 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1371 }
1372 }
1373
1374 for buffer in self.buffers() {
1375 buffer.update(cx, |buffer, cx| {
1376 buffer.set_capability(Capability::ReadOnly, cx)
1377 });
1378 }
1379
1380 if let Some(remote) = self.state.as_remote() {
1381 remote.update(cx, |remote, _| {
1382 // Wake up all futures currently waiting on a buffer to get opened,
1383 // to give them a chance to fail now that we've disconnected.
1384 remote.remote_buffer_listeners.clear()
1385 })
1386 }
1387 }
1388
1389 pub fn shared(
1390 &mut self,
1391 remote_id: u64,
1392 downstream_client: AnyProtoClient,
1393 _cx: &mut AppContext,
1394 ) {
1395 self.downstream_client = Some((downstream_client, remote_id));
1396 }
1397
1398 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1399 self.downstream_client.take();
1400 self.forget_shared_buffers();
1401 }
1402
1403 pub fn discard_incomplete(&mut self) {
1404 self.opened_buffers
1405 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1406 }
1407
1408 pub fn find_search_candidates(
1409 &mut self,
1410 query: &SearchQuery,
1411 mut limit: usize,
1412 fs: Arc<dyn Fs>,
1413 cx: &mut ModelContext<Self>,
1414 ) -> Receiver<Model<Buffer>> {
1415 let (tx, rx) = smol::channel::unbounded();
1416 let mut open_buffers = HashSet::default();
1417 let mut unnamed_buffers = Vec::new();
1418 for handle in self.buffers() {
1419 let buffer = handle.read(cx);
1420 if let Some(entry_id) = buffer.entry_id(cx) {
1421 open_buffers.insert(entry_id);
1422 } else {
1423 limit = limit.saturating_sub(1);
1424 unnamed_buffers.push(handle)
1425 };
1426 }
1427
1428 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1429 let mut project_paths_rx = self
1430 .worktree_store
1431 .update(cx, |worktree_store, cx| {
1432 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1433 })
1434 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1435
1436 cx.spawn(|this, mut cx| async move {
1437 for buffer in unnamed_buffers {
1438 tx.send(buffer).await.ok();
1439 }
1440
1441 while let Some(project_paths) = project_paths_rx.next().await {
1442 let buffers = this.update(&mut cx, |this, cx| {
1443 project_paths
1444 .into_iter()
1445 .map(|project_path| this.open_buffer(project_path, cx))
1446 .collect::<Vec<_>>()
1447 })?;
1448 for buffer_task in buffers {
1449 if let Some(buffer) = buffer_task.await.log_err() {
1450 if tx.send(buffer).await.is_err() {
1451 return anyhow::Ok(());
1452 }
1453 }
1454 }
1455 }
1456 anyhow::Ok(())
1457 })
1458 .detach();
1459 rx
1460 }
1461
1462 fn on_buffer_event(
1463 &mut self,
1464 buffer: Model<Buffer>,
1465 event: &BufferEvent,
1466 cx: &mut ModelContext<Self>,
1467 ) {
1468 match event {
1469 BufferEvent::FileHandleChanged => {
1470 if let Some(local) = self.state.as_local() {
1471 local.update(cx, |local, cx| {
1472 local.buffer_changed_file(buffer, cx);
1473 })
1474 }
1475 }
1476 BufferEvent::Reloaded => {
1477 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1478 return;
1479 };
1480 let buffer = buffer.read(cx);
1481 downstream_client
1482 .send(proto::BufferReloaded {
1483 project_id: *project_id,
1484 buffer_id: buffer.remote_id().to_proto(),
1485 version: serialize_version(&buffer.version()),
1486 mtime: buffer.saved_mtime().map(|t| t.into()),
1487 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1488 })
1489 .log_err();
1490 }
1491 _ => {}
1492 }
1493 }
1494
1495 pub async fn handle_update_buffer(
1496 this: Model<Self>,
1497 envelope: TypedEnvelope<proto::UpdateBuffer>,
1498 mut cx: AsyncAppContext,
1499 ) -> Result<proto::Ack> {
1500 let payload = envelope.payload.clone();
1501 let buffer_id = BufferId::new(payload.buffer_id)?;
1502 let ops = payload
1503 .operations
1504 .into_iter()
1505 .map(language::proto::deserialize_operation)
1506 .collect::<Result<Vec<_>, _>>()?;
1507 this.update(&mut cx, |this, cx| {
1508 match this.opened_buffers.entry(buffer_id) {
1509 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1510 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1511 OpenBuffer::Buffer(buffer) => {
1512 if let Some(buffer) = buffer.upgrade() {
1513 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1514 }
1515 }
1516 },
1517 hash_map::Entry::Vacant(e) => {
1518 e.insert(OpenBuffer::Operations(ops));
1519 }
1520 }
1521 Ok(proto::Ack {})
1522 })?
1523 }
1524
1525 pub fn handle_synchronize_buffers(
1526 &mut self,
1527 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1528 cx: &mut ModelContext<Self>,
1529 client: Arc<Client>,
1530 ) -> Result<proto::SynchronizeBuffersResponse> {
1531 let project_id = envelope.payload.project_id;
1532 let mut response = proto::SynchronizeBuffersResponse {
1533 buffers: Default::default(),
1534 };
1535 let Some(guest_id) = envelope.original_sender_id else {
1536 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1537 };
1538
1539 self.shared_buffers.entry(guest_id).or_default().clear();
1540 for buffer in envelope.payload.buffers {
1541 let buffer_id = BufferId::new(buffer.id)?;
1542 let remote_version = language::proto::deserialize_version(&buffer.version);
1543 if let Some(buffer) = self.get(buffer_id) {
1544 self.shared_buffers
1545 .entry(guest_id)
1546 .or_default()
1547 .insert(buffer.clone());
1548
1549 let buffer = buffer.read(cx);
1550 response.buffers.push(proto::BufferVersion {
1551 id: buffer_id.into(),
1552 version: language::proto::serialize_version(&buffer.version),
1553 });
1554
1555 let operations = buffer.serialize_ops(Some(remote_version), cx);
1556 let client = client.clone();
1557 if let Some(file) = buffer.file() {
1558 client
1559 .send(proto::UpdateBufferFile {
1560 project_id,
1561 buffer_id: buffer_id.into(),
1562 file: Some(file.to_proto(cx)),
1563 })
1564 .log_err();
1565 }
1566
1567 client
1568 .send(proto::UpdateDiffBase {
1569 project_id,
1570 buffer_id: buffer_id.into(),
1571 diff_base: buffer.diff_base().map(ToString::to_string),
1572 })
1573 .log_err();
1574
1575 client
1576 .send(proto::BufferReloaded {
1577 project_id,
1578 buffer_id: buffer_id.into(),
1579 version: language::proto::serialize_version(buffer.saved_version()),
1580 mtime: buffer.saved_mtime().map(|time| time.into()),
1581 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1582 as i32,
1583 })
1584 .log_err();
1585
1586 cx.background_executor()
1587 .spawn(
1588 async move {
1589 let operations = operations.await;
1590 for chunk in split_operations(operations) {
1591 client
1592 .request(proto::UpdateBuffer {
1593 project_id,
1594 buffer_id: buffer_id.into(),
1595 operations: chunk,
1596 })
1597 .await?;
1598 }
1599 anyhow::Ok(())
1600 }
1601 .log_err(),
1602 )
1603 .detach();
1604 }
1605 }
1606 Ok(response)
1607 }
1608
1609 pub fn handle_create_buffer_for_peer(
1610 &mut self,
1611 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1612 replica_id: u16,
1613 capability: Capability,
1614 cx: &mut ModelContext<Self>,
1615 ) -> Result<()> {
1616 let Some(remote) = self.state.as_remote() else {
1617 return Err(anyhow!("buffer store is not a remote"));
1618 };
1619
1620 if let Some(buffer) = remote.update(cx, |remote, cx| {
1621 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)
1622 })? {
1623 self.add_buffer(buffer, cx)?;
1624 }
1625
1626 Ok(())
1627 }
1628
1629 pub async fn handle_update_buffer_file(
1630 this: Model<Self>,
1631 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1632 mut cx: AsyncAppContext,
1633 ) -> Result<()> {
1634 let buffer_id = envelope.payload.buffer_id;
1635 let buffer_id = BufferId::new(buffer_id)?;
1636
1637 this.update(&mut cx, |this, cx| {
1638 let payload = envelope.payload.clone();
1639 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1640 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1641 let worktree = this
1642 .worktree_store
1643 .read(cx)
1644 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1645 .ok_or_else(|| anyhow!("no such worktree"))?;
1646 let file = File::from_proto(file, worktree, cx)?;
1647 let old_file = buffer.update(cx, |buffer, cx| {
1648 let old_file = buffer.file().cloned();
1649 let new_path = file.path.clone();
1650 buffer.file_updated(Arc::new(file), cx);
1651 if old_file
1652 .as_ref()
1653 .map_or(true, |old| *old.path() != new_path)
1654 {
1655 Some(old_file)
1656 } else {
1657 None
1658 }
1659 });
1660 if let Some(old_file) = old_file {
1661 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1662 }
1663 }
1664 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1665 downstream_client
1666 .send(proto::UpdateBufferFile {
1667 project_id: *project_id,
1668 buffer_id: buffer_id.into(),
1669 file: envelope.payload.file,
1670 })
1671 .log_err();
1672 }
1673 Ok(())
1674 })?
1675 }
1676
1677 pub async fn handle_update_diff_base(
1678 this: Model<Self>,
1679 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1680 mut cx: AsyncAppContext,
1681 ) -> Result<()> {
1682 this.update(&mut cx, |this, cx| {
1683 let buffer_id = envelope.payload.buffer_id;
1684 let buffer_id = BufferId::new(buffer_id)?;
1685 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1686 buffer.update(cx, |buffer, cx| {
1687 buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
1688 });
1689 }
1690 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1691 downstream_client
1692 .send(proto::UpdateDiffBase {
1693 project_id: *project_id,
1694 buffer_id: buffer_id.into(),
1695 diff_base: envelope.payload.diff_base,
1696 })
1697 .log_err();
1698 }
1699 Ok(())
1700 })?
1701 }
1702
1703 pub async fn handle_save_buffer(
1704 this: Model<Self>,
1705 envelope: TypedEnvelope<proto::SaveBuffer>,
1706 mut cx: AsyncAppContext,
1707 ) -> Result<proto::BufferSaved> {
1708 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1709 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1710 anyhow::Ok((
1711 this.get_existing(buffer_id)?,
1712 this.downstream_client
1713 .as_ref()
1714 .map(|(_, project_id)| *project_id)
1715 .context("project is not shared")?,
1716 ))
1717 })??;
1718 buffer
1719 .update(&mut cx, |buffer, _| {
1720 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1721 })?
1722 .await?;
1723 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1724
1725 if let Some(new_path) = envelope.payload.new_path {
1726 let new_path = ProjectPath::from_proto(new_path);
1727 this.update(&mut cx, |this, cx| {
1728 this.save_buffer_as(buffer.clone(), new_path, cx)
1729 })?
1730 .await?;
1731 } else {
1732 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1733 .await?;
1734 }
1735
1736 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1737 project_id,
1738 buffer_id: buffer_id.into(),
1739 version: serialize_version(buffer.saved_version()),
1740 mtime: buffer.saved_mtime().map(|time| time.into()),
1741 })
1742 }
1743
1744 pub async fn handle_close_buffer(
1745 this: Model<Self>,
1746 envelope: TypedEnvelope<proto::CloseBuffer>,
1747 mut cx: AsyncAppContext,
1748 ) -> Result<()> {
1749 let peer_id = envelope.sender_id;
1750 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1751 this.update(&mut cx, |this, _| {
1752 if let Some(buffer) = this.get(buffer_id) {
1753 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1754 if shared.remove(&buffer) {
1755 if shared.is_empty() {
1756 this.shared_buffers.remove(&peer_id);
1757 }
1758 return;
1759 }
1760 }
1761 };
1762 debug_panic!(
1763 "peer_id {} closed buffer_id {} which was either not open or already closed",
1764 peer_id,
1765 buffer_id
1766 )
1767 })
1768 }
1769
1770 pub async fn handle_buffer_saved(
1771 this: Model<Self>,
1772 envelope: TypedEnvelope<proto::BufferSaved>,
1773 mut cx: AsyncAppContext,
1774 ) -> Result<()> {
1775 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1776 let version = deserialize_version(&envelope.payload.version);
1777 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1778 this.update(&mut cx, move |this, cx| {
1779 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1780 buffer.update(cx, |buffer, cx| {
1781 buffer.did_save(version, mtime, cx);
1782 });
1783 }
1784
1785 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1786 downstream_client
1787 .send(proto::BufferSaved {
1788 project_id: *project_id,
1789 buffer_id: buffer_id.into(),
1790 mtime: envelope.payload.mtime,
1791 version: envelope.payload.version,
1792 })
1793 .log_err();
1794 }
1795 })
1796 }
1797
1798 pub async fn handle_buffer_reloaded(
1799 this: Model<Self>,
1800 envelope: TypedEnvelope<proto::BufferReloaded>,
1801 mut cx: AsyncAppContext,
1802 ) -> Result<()> {
1803 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1804 let version = deserialize_version(&envelope.payload.version);
1805 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1806 let line_ending = deserialize_line_ending(
1807 proto::LineEnding::from_i32(envelope.payload.line_ending)
1808 .ok_or_else(|| anyhow!("missing line ending"))?,
1809 );
1810 this.update(&mut cx, |this, cx| {
1811 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1812 buffer.update(cx, |buffer, cx| {
1813 buffer.did_reload(version, line_ending, mtime, cx);
1814 });
1815 }
1816
1817 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1818 downstream_client
1819 .send(proto::BufferReloaded {
1820 project_id: *project_id,
1821 buffer_id: buffer_id.into(),
1822 mtime: envelope.payload.mtime,
1823 version: envelope.payload.version,
1824 line_ending: envelope.payload.line_ending,
1825 })
1826 .log_err();
1827 }
1828 })
1829 }
1830
1831 pub async fn handle_blame_buffer(
1832 this: Model<Self>,
1833 envelope: TypedEnvelope<proto::BlameBuffer>,
1834 mut cx: AsyncAppContext,
1835 ) -> Result<proto::BlameBufferResponse> {
1836 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1837 let version = deserialize_version(&envelope.payload.version);
1838 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1839 buffer
1840 .update(&mut cx, |buffer, _| {
1841 buffer.wait_for_version(version.clone())
1842 })?
1843 .await?;
1844 let blame = this
1845 .update(&mut cx, |this, cx| {
1846 this.blame_buffer(&buffer, Some(version), cx)
1847 })?
1848 .await?;
1849 Ok(serialize_blame_buffer_response(blame))
1850 }
1851
1852 pub async fn handle_get_permalink_to_line(
1853 this: Model<Self>,
1854 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1855 mut cx: AsyncAppContext,
1856 ) -> Result<proto::GetPermalinkToLineResponse> {
1857 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1858 // let version = deserialize_version(&envelope.payload.version);
1859 let selection = {
1860 let proto_selection = envelope
1861 .payload
1862 .selection
1863 .context("no selection to get permalink for defined")?;
1864 proto_selection.start as u32..proto_selection.end as u32
1865 };
1866 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1867 let permalink = this
1868 .update(&mut cx, |this, cx| {
1869 this.get_permalink_to_line(&buffer, selection, cx)
1870 })?
1871 .await?;
1872 Ok(proto::GetPermalinkToLineResponse {
1873 permalink: permalink.to_string(),
1874 })
1875 }
1876
1877 pub async fn wait_for_loading_buffer(
1878 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1879 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1880 loop {
1881 if let Some(result) = receiver.borrow().as_ref() {
1882 match result {
1883 Ok(buffer) => return Ok(buffer.to_owned()),
1884 Err(e) => return Err(e.to_owned()),
1885 }
1886 }
1887 receiver.next().await;
1888 }
1889 }
1890
1891 pub fn reload_buffers(
1892 &self,
1893 buffers: HashSet<Model<Buffer>>,
1894 push_to_history: bool,
1895 cx: &mut ModelContext<Self>,
1896 ) -> Task<Result<ProjectTransaction>> {
1897 if buffers.is_empty() {
1898 return Task::ready(Ok(ProjectTransaction::default()));
1899 }
1900
1901 self.state.reload_buffers(buffers, push_to_history, cx)
1902 }
1903
1904 async fn handle_reload_buffers(
1905 this: Model<Self>,
1906 envelope: TypedEnvelope<proto::ReloadBuffers>,
1907 mut cx: AsyncAppContext,
1908 ) -> Result<proto::ReloadBuffersResponse> {
1909 let sender_id = envelope.original_sender_id().unwrap_or_default();
1910 let reload = this.update(&mut cx, |this, cx| {
1911 let mut buffers = HashSet::default();
1912 for buffer_id in &envelope.payload.buffer_ids {
1913 let buffer_id = BufferId::new(*buffer_id)?;
1914 buffers.insert(this.get_existing(buffer_id)?);
1915 }
1916 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1917 })??;
1918
1919 let project_transaction = reload.await?;
1920 let project_transaction = this.update(&mut cx, |this, cx| {
1921 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1922 })?;
1923 Ok(proto::ReloadBuffersResponse {
1924 transaction: Some(project_transaction),
1925 })
1926 }
1927
1928 pub fn create_buffer_for_peer(
1929 &mut self,
1930 buffer: &Model<Buffer>,
1931 peer_id: proto::PeerId,
1932 cx: &mut ModelContext<Self>,
1933 ) -> Task<Result<()>> {
1934 let buffer_id = buffer.read(cx).remote_id();
1935 if !self
1936 .shared_buffers
1937 .entry(peer_id)
1938 .or_default()
1939 .insert(buffer.clone())
1940 {
1941 return Task::ready(Ok(()));
1942 }
1943
1944 let Some((client, project_id)) = self.downstream_client.clone() else {
1945 return Task::ready(Ok(()));
1946 };
1947
1948 cx.spawn(|this, mut cx| async move {
1949 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1950 return anyhow::Ok(());
1951 };
1952
1953 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1954 let operations = operations.await;
1955 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1956
1957 let initial_state = proto::CreateBufferForPeer {
1958 project_id,
1959 peer_id: Some(peer_id),
1960 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1961 };
1962
1963 if client.send(initial_state).log_err().is_some() {
1964 let client = client.clone();
1965 cx.background_executor()
1966 .spawn(async move {
1967 let mut chunks = split_operations(operations).peekable();
1968 while let Some(chunk) = chunks.next() {
1969 let is_last = chunks.peek().is_none();
1970 client.send(proto::CreateBufferForPeer {
1971 project_id,
1972 peer_id: Some(peer_id),
1973 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1974 proto::BufferChunk {
1975 buffer_id: buffer_id.into(),
1976 operations: chunk,
1977 is_last,
1978 },
1979 )),
1980 })?;
1981 }
1982 anyhow::Ok(())
1983 })
1984 .await
1985 .log_err();
1986 }
1987 Ok(())
1988 })
1989 }
1990
1991 pub fn forget_shared_buffers(&mut self) {
1992 self.shared_buffers.clear();
1993 }
1994
1995 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1996 self.shared_buffers.remove(peer_id);
1997 }
1998
1999 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2000 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2001 self.shared_buffers.insert(new_peer_id, buffers);
2002 }
2003 }
2004
2005 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
2006 &self.shared_buffers
2007 }
2008
2009 pub fn create_local_buffer(
2010 &mut self,
2011 text: &str,
2012 language: Option<Arc<Language>>,
2013 cx: &mut ModelContext<Self>,
2014 ) -> Model<Buffer> {
2015 let buffer = cx.new_model(|cx| {
2016 Buffer::local(text, cx)
2017 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2018 });
2019
2020 self.add_buffer(buffer.clone(), cx).log_err();
2021 let buffer_id = buffer.read(cx).remote_id();
2022
2023 let local = self
2024 .state
2025 .as_local()
2026 .expect("local-only method called in a non-local context");
2027 local.update(cx, |this, cx| {
2028 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2029 this.local_buffer_ids_by_path.insert(
2030 ProjectPath {
2031 worktree_id: file.worktree_id(cx),
2032 path: file.path.clone(),
2033 },
2034 buffer_id,
2035 );
2036
2037 if let Some(entry_id) = file.entry_id {
2038 this.local_buffer_ids_by_entry_id
2039 .insert(entry_id, buffer_id);
2040 }
2041 }
2042 });
2043 buffer
2044 }
2045
2046 pub fn deserialize_project_transaction(
2047 &mut self,
2048 message: proto::ProjectTransaction,
2049 push_to_history: bool,
2050 cx: &mut ModelContext<Self>,
2051 ) -> Task<Result<ProjectTransaction>> {
2052 if let Some(remote) = self.state.as_remote() {
2053 remote.update(cx, |remote, cx| {
2054 remote.deserialize_project_transaction(message, push_to_history, cx)
2055 })
2056 } else {
2057 debug_panic!("not a remote buffer store");
2058 Task::ready(Err(anyhow!("not a remote buffer store")))
2059 }
2060 }
2061
2062 pub fn wait_for_remote_buffer(
2063 &self,
2064 id: BufferId,
2065 cx: &mut AppContext,
2066 ) -> Task<Result<Model<Buffer>>> {
2067 if let Some(remote) = self.state.as_remote() {
2068 remote.update(cx, |remote, cx| remote.wait_for_remote_buffer(id, cx))
2069 } else {
2070 debug_panic!("not a remote buffer store");
2071 Task::ready(Err(anyhow!("not a remote buffer store")))
2072 }
2073 }
2074
2075 pub fn serialize_project_transaction_for_peer(
2076 &mut self,
2077 project_transaction: ProjectTransaction,
2078 peer_id: proto::PeerId,
2079 cx: &mut ModelContext<Self>,
2080 ) -> proto::ProjectTransaction {
2081 let mut serialized_transaction = proto::ProjectTransaction {
2082 buffer_ids: Default::default(),
2083 transactions: Default::default(),
2084 };
2085 for (buffer, transaction) in project_transaction.0 {
2086 self.create_buffer_for_peer(&buffer, peer_id, cx)
2087 .detach_and_log_err(cx);
2088 serialized_transaction
2089 .buffer_ids
2090 .push(buffer.read(cx).remote_id().into());
2091 serialized_transaction
2092 .transactions
2093 .push(language::proto::serialize_transaction(&transaction));
2094 }
2095 serialized_transaction
2096 }
2097}
2098
2099impl OpenBuffer {
2100 fn upgrade(&self) -> Option<Model<Buffer>> {
2101 match self {
2102 OpenBuffer::Buffer(handle) => handle.upgrade(),
2103 OpenBuffer::Operations(_) => None,
2104 }
2105 }
2106}
2107
2108fn is_not_found_error(error: &anyhow::Error) -> bool {
2109 error
2110 .root_cause()
2111 .downcast_ref::<io::Error>()
2112 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2113}
2114
2115fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
2116 let entries = blame
2117 .entries
2118 .into_iter()
2119 .map(|entry| proto::BlameEntry {
2120 sha: entry.sha.as_bytes().into(),
2121 start_line: entry.range.start,
2122 end_line: entry.range.end,
2123 original_line_number: entry.original_line_number,
2124 author: entry.author.clone(),
2125 author_mail: entry.author_mail.clone(),
2126 author_time: entry.author_time,
2127 author_tz: entry.author_tz.clone(),
2128 committer: entry.committer.clone(),
2129 committer_mail: entry.committer_mail.clone(),
2130 committer_time: entry.committer_time,
2131 committer_tz: entry.committer_tz.clone(),
2132 summary: entry.summary.clone(),
2133 previous: entry.previous.clone(),
2134 filename: entry.filename.clone(),
2135 })
2136 .collect::<Vec<_>>();
2137
2138 let messages = blame
2139 .messages
2140 .into_iter()
2141 .map(|(oid, message)| proto::CommitMessage {
2142 oid: oid.as_bytes().into(),
2143 message,
2144 })
2145 .collect::<Vec<_>>();
2146
2147 let permalinks = blame
2148 .permalinks
2149 .into_iter()
2150 .map(|(oid, url)| proto::CommitPermalink {
2151 oid: oid.as_bytes().into(),
2152 permalink: url.to_string(),
2153 })
2154 .collect::<Vec<_>>();
2155
2156 proto::BlameBufferResponse {
2157 entries,
2158 messages,
2159 permalinks,
2160 remote_url: blame.remote_url,
2161 }
2162}
2163
2164fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
2165 let entries = response
2166 .entries
2167 .into_iter()
2168 .filter_map(|entry| {
2169 Some(git::blame::BlameEntry {
2170 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2171 range: entry.start_line..entry.end_line,
2172 original_line_number: entry.original_line_number,
2173 committer: entry.committer,
2174 committer_time: entry.committer_time,
2175 committer_tz: entry.committer_tz,
2176 committer_mail: entry.committer_mail,
2177 author: entry.author,
2178 author_mail: entry.author_mail,
2179 author_time: entry.author_time,
2180 author_tz: entry.author_tz,
2181 summary: entry.summary,
2182 previous: entry.previous,
2183 filename: entry.filename,
2184 })
2185 })
2186 .collect::<Vec<_>>();
2187
2188 let messages = response
2189 .messages
2190 .into_iter()
2191 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2192 .collect::<HashMap<_, _>>();
2193
2194 let permalinks = response
2195 .permalinks
2196 .into_iter()
2197 .filter_map(|permalink| {
2198 Some((
2199 git::Oid::from_bytes(&permalink.oid).ok()?,
2200 Url::from_str(&permalink.permalink).ok()?,
2201 ))
2202 })
2203 .collect::<HashMap<_, _>>();
2204
2205 Blame {
2206 entries,
2207 permalinks,
2208 messages,
2209 remote_url: response.remote_url,
2210 }
2211}