1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 ProjectItem as _, ProjectPath,
5};
6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
7use anyhow::{anyhow, Context as _, Result};
8use client::Client;
9use collections::{hash_map, HashMap, HashSet};
10use fs::Fs;
11use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
12use git::blame::Blame;
13use gpui::{
14 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
15 Task, WeakModel,
16};
17use http_client::Url;
18use language::{
19 proto::{
20 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
21 split_operations,
22 },
23 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
24};
25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
26use smol::channel::Receiver;
27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
28use text::BufferId;
29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
31
32/// A set of open buffers.
33pub struct BufferStore {
34 state: BufferStoreState,
35 #[allow(clippy::type_complexity)]
36 loading_buffers_by_path: HashMap<
37 ProjectPath,
38 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
39 >,
40 worktree_store: Model<WorktreeStore>,
41 opened_buffers: HashMap<BufferId, OpenBuffer>,
42 downstream_client: Option<(AnyProtoClient, u64)>,
43 shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
44}
45
46enum BufferStoreState {
47 Local(LocalBufferStore),
48 Remote(RemoteBufferStore),
49}
50
51struct RemoteBufferStore {
52 shared_with_me: HashSet<Model<Buffer>>,
53 upstream_client: AnyProtoClient,
54 project_id: u64,
55 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
56 remote_buffer_listeners:
57 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
58 worktree_store: Model<WorktreeStore>,
59}
60
61struct LocalBufferStore {
62 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
63 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
64 worktree_store: Model<WorktreeStore>,
65 _subscription: Subscription,
66}
67
68enum OpenBuffer {
69 Buffer(WeakModel<Buffer>),
70 Operations(Vec<Operation>),
71}
72
73pub enum BufferStoreEvent {
74 BufferAdded(Model<Buffer>),
75 BufferDropped(BufferId),
76 BufferChangedFilePath {
77 buffer: Model<Buffer>,
78 old_file: Option<Arc<dyn language::File>>,
79 },
80}
81
82#[derive(Default, Debug)]
83pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
84
85impl EventEmitter<BufferStoreEvent> for BufferStore {}
86
87impl RemoteBufferStore {
88 pub fn wait_for_remote_buffer(
89 &mut self,
90 id: BufferId,
91 cx: &mut ModelContext<BufferStore>,
92 ) -> Task<Result<Model<Buffer>>> {
93 let (tx, rx) = oneshot::channel();
94 self.remote_buffer_listeners.entry(id).or_default().push(tx);
95
96 cx.spawn(|this, cx| async move {
97 if let Some(buffer) = this
98 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
99 .ok()
100 .flatten()
101 {
102 return Ok(buffer);
103 }
104
105 cx.background_executor()
106 .spawn(async move { rx.await? })
107 .await
108 })
109 }
110
111 fn save_remote_buffer(
112 &self,
113 buffer_handle: Model<Buffer>,
114 new_path: Option<proto::ProjectPath>,
115 cx: &ModelContext<BufferStore>,
116 ) -> Task<Result<()>> {
117 let buffer = buffer_handle.read(cx);
118 let buffer_id = buffer.remote_id().into();
119 let version = buffer.version();
120 let rpc = self.upstream_client.clone();
121 let project_id = self.project_id;
122 cx.spawn(move |_, mut cx| async move {
123 let response = rpc
124 .request(proto::SaveBuffer {
125 project_id,
126 buffer_id,
127 new_path,
128 version: serialize_version(&version),
129 })
130 .await?;
131 let version = deserialize_version(&response.version);
132 let mtime = response.mtime.map(|mtime| mtime.into());
133
134 buffer_handle.update(&mut cx, |buffer, cx| {
135 buffer.did_save(version.clone(), mtime, cx);
136 })?;
137
138 Ok(())
139 })
140 }
141
142 pub fn handle_create_buffer_for_peer(
143 &mut self,
144 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
145 replica_id: u16,
146 capability: Capability,
147 cx: &mut ModelContext<BufferStore>,
148 ) -> Result<Option<Model<Buffer>>> {
149 match envelope
150 .payload
151 .variant
152 .ok_or_else(|| anyhow!("missing variant"))?
153 {
154 proto::create_buffer_for_peer::Variant::State(mut state) => {
155 let buffer_id = BufferId::new(state.id)?;
156
157 let buffer_result = maybe!({
158 let mut buffer_file = None;
159 if let Some(file) = state.file.take() {
160 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
161 let worktree = self
162 .worktree_store
163 .read(cx)
164 .worktree_for_id(worktree_id, cx)
165 .ok_or_else(|| {
166 anyhow!("no worktree found for id {}", file.worktree_id)
167 })?;
168 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
169 as Arc<dyn language::File>);
170 }
171 Buffer::from_proto(replica_id, capability, state, buffer_file)
172 });
173
174 match buffer_result {
175 Ok(buffer) => {
176 let buffer = cx.new_model(|_| buffer);
177 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
178 }
179 Err(error) => {
180 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
181 for listener in listeners {
182 listener.send(Err(anyhow!(error.cloned()))).ok();
183 }
184 }
185 }
186 }
187 }
188 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
189 let buffer_id = BufferId::new(chunk.buffer_id)?;
190 let buffer = self
191 .loading_remote_buffers_by_id
192 .get(&buffer_id)
193 .cloned()
194 .ok_or_else(|| {
195 anyhow!(
196 "received chunk for buffer {} without initial state",
197 chunk.buffer_id
198 )
199 })?;
200
201 let result = maybe!({
202 let operations = chunk
203 .operations
204 .into_iter()
205 .map(language::proto::deserialize_operation)
206 .collect::<Result<Vec<_>>>()?;
207 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
208 anyhow::Ok(())
209 });
210
211 if let Err(error) = result {
212 self.loading_remote_buffers_by_id.remove(&buffer_id);
213 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
214 for listener in listeners {
215 listener.send(Err(error.cloned())).ok();
216 }
217 }
218 } else if chunk.is_last {
219 self.loading_remote_buffers_by_id.remove(&buffer_id);
220 if self.upstream_client.is_via_collab() {
221 // retain buffers sent by peers to avoid races.
222 self.shared_with_me.insert(buffer.clone());
223 }
224
225 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
226 for sender in senders {
227 sender.send(Ok(buffer.clone())).ok();
228 }
229 }
230 return Ok(Some(buffer));
231 }
232 }
233 }
234 return Ok(None);
235 }
236
237 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
238 self.loading_remote_buffers_by_id
239 .keys()
240 .copied()
241 .collect::<Vec<_>>()
242 }
243
244 pub fn deserialize_project_transaction(
245 &self,
246 message: proto::ProjectTransaction,
247 push_to_history: bool,
248 cx: &mut ModelContext<BufferStore>,
249 ) -> Task<Result<ProjectTransaction>> {
250 cx.spawn(|this, mut cx| async move {
251 let mut project_transaction = ProjectTransaction::default();
252 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
253 {
254 let buffer_id = BufferId::new(buffer_id)?;
255 let buffer = this
256 .update(&mut cx, |this, cx| {
257 this.wait_for_remote_buffer(buffer_id, cx)
258 })?
259 .await?;
260 let transaction = language::proto::deserialize_transaction(transaction)?;
261 project_transaction.0.insert(buffer, transaction);
262 }
263
264 for (buffer, transaction) in &project_transaction.0 {
265 buffer
266 .update(&mut cx, |buffer, _| {
267 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
268 })?
269 .await?;
270
271 if push_to_history {
272 buffer.update(&mut cx, |buffer, _| {
273 buffer.push_transaction(transaction.clone(), Instant::now());
274 })?;
275 }
276 }
277
278 Ok(project_transaction)
279 })
280 }
281
282 fn open_buffer(
283 &self,
284 path: Arc<Path>,
285 worktree: Model<Worktree>,
286 cx: &mut ModelContext<BufferStore>,
287 ) -> Task<Result<Model<Buffer>>> {
288 let worktree_id = worktree.read(cx).id().to_proto();
289 let project_id = self.project_id;
290 let client = self.upstream_client.clone();
291 let path_string = path.clone().to_string_lossy().to_string();
292 cx.spawn(move |this, mut cx| async move {
293 let response = client
294 .request(proto::OpenBufferByPath {
295 project_id,
296 worktree_id,
297 path: path_string,
298 })
299 .await?;
300 let buffer_id = BufferId::new(response.buffer_id)?;
301
302 let buffer = this
303 .update(&mut cx, {
304 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
305 })?
306 .await?;
307
308 Ok(buffer)
309 })
310 }
311
312 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
313 let create = self.upstream_client.request(proto::OpenNewBuffer {
314 project_id: self.project_id,
315 });
316 cx.spawn(|this, mut cx| async move {
317 let response = create.await?;
318 let buffer_id = BufferId::new(response.buffer_id)?;
319
320 this.update(&mut cx, |this, cx| {
321 this.wait_for_remote_buffer(buffer_id, cx)
322 })?
323 .await
324 })
325 }
326
327 fn reload_buffers(
328 &self,
329 buffers: HashSet<Model<Buffer>>,
330 push_to_history: bool,
331 cx: &mut ModelContext<BufferStore>,
332 ) -> Task<Result<ProjectTransaction>> {
333 let request = self.upstream_client.request(proto::ReloadBuffers {
334 project_id: self.project_id,
335 buffer_ids: buffers
336 .iter()
337 .map(|buffer| buffer.read(cx).remote_id().to_proto())
338 .collect(),
339 });
340
341 cx.spawn(|this, mut cx| async move {
342 let response = request
343 .await?
344 .transaction
345 .ok_or_else(|| anyhow!("missing transaction"))?;
346 this.update(&mut cx, |this, cx| {
347 this.deserialize_project_transaction(response, push_to_history, cx)
348 })?
349 .await
350 })
351 }
352}
353
354impl LocalBufferStore {
355 fn save_local_buffer(
356 &self,
357 buffer_handle: Model<Buffer>,
358 worktree: Model<Worktree>,
359 path: Arc<Path>,
360 mut has_changed_file: bool,
361 cx: &mut ModelContext<BufferStore>,
362 ) -> Task<Result<()>> {
363 let buffer = buffer_handle.read(cx);
364
365 let text = buffer.as_rope().clone();
366 let line_ending = buffer.line_ending();
367 let version = buffer.version();
368 let buffer_id = buffer.remote_id();
369 if buffer
370 .file()
371 .is_some_and(|file| file.disk_state() == DiskState::New)
372 {
373 has_changed_file = true;
374 }
375
376 let save = worktree.update(cx, |worktree, cx| {
377 worktree.write_file(path.as_ref(), text, line_ending, cx)
378 });
379
380 cx.spawn(move |this, mut cx| async move {
381 let new_file = save.await?;
382 let mtime = new_file.disk_state().mtime();
383 this.update(&mut cx, |this, cx| {
384 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
385 if has_changed_file {
386 downstream_client
387 .send(proto::UpdateBufferFile {
388 project_id,
389 buffer_id: buffer_id.to_proto(),
390 file: Some(language::File::to_proto(&*new_file, cx)),
391 })
392 .log_err();
393 }
394 downstream_client
395 .send(proto::BufferSaved {
396 project_id,
397 buffer_id: buffer_id.to_proto(),
398 version: serialize_version(&version),
399 mtime: mtime.map(|time| time.into()),
400 })
401 .log_err();
402 }
403 })?;
404 buffer_handle.update(&mut cx, |buffer, cx| {
405 if has_changed_file {
406 buffer.file_updated(new_file, cx);
407 }
408 buffer.did_save(version.clone(), mtime, cx);
409 })
410 })
411 }
412
413 fn subscribe_to_worktree(
414 &mut self,
415 worktree: &Model<Worktree>,
416 cx: &mut ModelContext<BufferStore>,
417 ) {
418 cx.subscribe(worktree, |this, worktree, event, cx| {
419 if worktree.read(cx).is_local() {
420 match event {
421 worktree::Event::UpdatedEntries(changes) => {
422 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
423 }
424 worktree::Event::UpdatedGitRepositories(updated_repos) => {
425 Self::local_worktree_git_repos_changed(
426 this,
427 worktree.clone(),
428 updated_repos,
429 cx,
430 )
431 }
432 _ => {}
433 }
434 }
435 })
436 .detach();
437 }
438
439 fn local_worktree_entries_changed(
440 this: &mut BufferStore,
441 worktree_handle: &Model<Worktree>,
442 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
443 cx: &mut ModelContext<BufferStore>,
444 ) {
445 let snapshot = worktree_handle.read(cx).snapshot();
446 for (path, entry_id, _) in changes {
447 Self::local_worktree_entry_changed(
448 this,
449 *entry_id,
450 path,
451 worktree_handle,
452 &snapshot,
453 cx,
454 );
455 }
456 }
457
458 fn local_worktree_git_repos_changed(
459 this: &mut BufferStore,
460 worktree_handle: Model<Worktree>,
461 changed_repos: &UpdatedGitRepositoriesSet,
462 cx: &mut ModelContext<BufferStore>,
463 ) {
464 debug_assert!(worktree_handle.read(cx).is_local());
465
466 // Identify the loading buffers whose containing repository that has changed.
467 let future_buffers = this
468 .loading_buffers()
469 .filter_map(|(project_path, receiver)| {
470 if project_path.worktree_id != worktree_handle.read(cx).id() {
471 return None;
472 }
473 let path = &project_path.path;
474 changed_repos
475 .iter()
476 .find(|(work_dir, _)| path.starts_with(work_dir))?;
477 let path = path.clone();
478 Some(async move {
479 BufferStore::wait_for_loading_buffer(receiver)
480 .await
481 .ok()
482 .map(|buffer| (buffer, path))
483 })
484 })
485 .collect::<FuturesUnordered<_>>();
486
487 // Identify the current buffers whose containing repository has changed.
488 let current_buffers = this
489 .buffers()
490 .filter_map(|buffer| {
491 let file = File::from_dyn(buffer.read(cx).file())?;
492 if file.worktree != worktree_handle {
493 return None;
494 }
495 changed_repos
496 .iter()
497 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
498 Some((buffer, file.path.clone()))
499 })
500 .collect::<Vec<_>>();
501
502 if future_buffers.len() + current_buffers.len() == 0 {
503 return;
504 }
505
506 cx.spawn(move |this, mut cx| async move {
507 // Wait for all of the buffers to load.
508 let future_buffers = future_buffers.collect::<Vec<_>>().await;
509
510 // Reload the diff base for every buffer whose containing git repository has changed.
511 let snapshot =
512 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
513 let diff_bases_by_buffer = cx
514 .background_executor()
515 .spawn(async move {
516 let mut diff_base_tasks = future_buffers
517 .into_iter()
518 .flatten()
519 .chain(current_buffers)
520 .filter_map(|(buffer, path)| {
521 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
522 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
523 Some(async move {
524 let base_text =
525 local_repo_entry.repo().load_index_text(&relative_path);
526 Some((buffer, base_text))
527 })
528 })
529 .collect::<FuturesUnordered<_>>();
530
531 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
532 while let Some(diff_base) = diff_base_tasks.next().await {
533 if let Some(diff_base) = diff_base {
534 diff_bases.push(diff_base);
535 }
536 }
537 diff_bases
538 })
539 .await;
540
541 this.update(&mut cx, |this, cx| {
542 // Assign the new diff bases on all of the buffers.
543 for (buffer, diff_base) in diff_bases_by_buffer {
544 let buffer_id = buffer.update(cx, |buffer, cx| {
545 buffer.set_diff_base(diff_base.clone(), cx);
546 buffer.remote_id().to_proto()
547 });
548 if let Some((client, project_id)) = &this.downstream_client.clone() {
549 client
550 .send(proto::UpdateDiffBase {
551 project_id: *project_id,
552 buffer_id,
553 diff_base,
554 })
555 .log_err();
556 }
557 }
558 })
559 })
560 .detach_and_log_err(cx);
561 }
562
563 fn local_worktree_entry_changed(
564 this: &mut BufferStore,
565 entry_id: ProjectEntryId,
566 path: &Arc<Path>,
567 worktree: &Model<worktree::Worktree>,
568 snapshot: &worktree::Snapshot,
569 cx: &mut ModelContext<BufferStore>,
570 ) -> Option<()> {
571 let project_path = ProjectPath {
572 worktree_id: snapshot.id(),
573 path: path.clone(),
574 };
575
576 let buffer_id = {
577 let local = this.as_local_mut()?;
578 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
579 Some(&buffer_id) => buffer_id,
580 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
581 }
582 };
583
584 let buffer = if let Some(buffer) = this.get(buffer_id) {
585 Some(buffer)
586 } else {
587 this.opened_buffers.remove(&buffer_id);
588 None
589 };
590
591 let buffer = if let Some(buffer) = buffer {
592 buffer
593 } else {
594 let this = this.as_local_mut()?;
595 this.local_buffer_ids_by_path.remove(&project_path);
596 this.local_buffer_ids_by_entry_id.remove(&entry_id);
597 return None;
598 };
599
600 let events = buffer.update(cx, |buffer, cx| {
601 let local = this.as_local_mut()?;
602 let file = buffer.file()?;
603 let old_file = File::from_dyn(Some(file))?;
604 if old_file.worktree != *worktree {
605 return None;
606 }
607
608 let snapshot_entry = old_file
609 .entry_id
610 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
611 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
612
613 let new_file = if let Some(entry) = snapshot_entry {
614 File {
615 disk_state: match entry.mtime {
616 Some(mtime) => DiskState::Present { mtime },
617 None => old_file.disk_state,
618 },
619 is_local: true,
620 entry_id: Some(entry.id),
621 path: entry.path.clone(),
622 worktree: worktree.clone(),
623 is_private: entry.is_private,
624 }
625 } else {
626 File {
627 disk_state: DiskState::Deleted,
628 is_local: true,
629 entry_id: old_file.entry_id,
630 path: old_file.path.clone(),
631 worktree: worktree.clone(),
632 is_private: old_file.is_private,
633 }
634 };
635
636 if new_file == *old_file {
637 return None;
638 }
639
640 let mut events = Vec::new();
641 if new_file.path != old_file.path {
642 local.local_buffer_ids_by_path.remove(&ProjectPath {
643 path: old_file.path.clone(),
644 worktree_id: old_file.worktree_id(cx),
645 });
646 local.local_buffer_ids_by_path.insert(
647 ProjectPath {
648 worktree_id: new_file.worktree_id(cx),
649 path: new_file.path.clone(),
650 },
651 buffer_id,
652 );
653 events.push(BufferStoreEvent::BufferChangedFilePath {
654 buffer: cx.handle(),
655 old_file: buffer.file().cloned(),
656 });
657 }
658
659 if new_file.entry_id != old_file.entry_id {
660 if let Some(entry_id) = old_file.entry_id {
661 local.local_buffer_ids_by_entry_id.remove(&entry_id);
662 }
663 if let Some(entry_id) = new_file.entry_id {
664 local
665 .local_buffer_ids_by_entry_id
666 .insert(entry_id, buffer_id);
667 }
668 }
669
670 if let Some((client, project_id)) = &this.downstream_client {
671 client
672 .send(proto::UpdateBufferFile {
673 project_id: *project_id,
674 buffer_id: buffer_id.to_proto(),
675 file: Some(new_file.to_proto(cx)),
676 })
677 .ok();
678 }
679
680 buffer.file_updated(Arc::new(new_file), cx);
681 Some(events)
682 })?;
683
684 for event in events {
685 cx.emit(event);
686 }
687
688 None
689 }
690
691 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
692 let file = File::from_dyn(buffer.read(cx).file())?;
693
694 let remote_id = buffer.read(cx).remote_id();
695 if let Some(entry_id) = file.entry_id {
696 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
697 Some(_) => {
698 return None;
699 }
700 None => {
701 self.local_buffer_ids_by_entry_id
702 .insert(entry_id, remote_id);
703 }
704 }
705 };
706 self.local_buffer_ids_by_path.insert(
707 ProjectPath {
708 worktree_id: file.worktree_id(cx),
709 path: file.path.clone(),
710 },
711 remote_id,
712 );
713
714 Some(())
715 }
716
717 fn save_buffer(
718 &self,
719 buffer: Model<Buffer>,
720 cx: &mut ModelContext<BufferStore>,
721 ) -> Task<Result<()>> {
722 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
723 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
724 };
725 let worktree = file.worktree.clone();
726 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
727 }
728
729 fn save_buffer_as(
730 &self,
731 buffer: Model<Buffer>,
732 path: ProjectPath,
733 cx: &mut ModelContext<BufferStore>,
734 ) -> Task<Result<()>> {
735 let Some(worktree) = self
736 .worktree_store
737 .read(cx)
738 .worktree_for_id(path.worktree_id, cx)
739 else {
740 return Task::ready(Err(anyhow!("no such worktree")));
741 };
742 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
743 }
744
745 fn open_buffer(
746 &self,
747 path: Arc<Path>,
748 worktree: Model<Worktree>,
749 cx: &mut ModelContext<BufferStore>,
750 ) -> Task<Result<Model<Buffer>>> {
751 let load_buffer = worktree.update(cx, |worktree, cx| {
752 let load_file = worktree.load_file(path.as_ref(), cx);
753 let reservation = cx.reserve_model();
754 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
755 cx.spawn(move |_, mut cx| async move {
756 let loaded = load_file.await?;
757 let text_buffer = cx
758 .background_executor()
759 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
760 .await;
761 cx.insert_model(reservation, |_| {
762 Buffer::build(
763 text_buffer,
764 loaded.diff_base,
765 Some(loaded.file),
766 Capability::ReadWrite,
767 )
768 })
769 })
770 });
771
772 cx.spawn(move |this, mut cx| async move {
773 let buffer = match load_buffer.await {
774 Ok(buffer) => Ok(buffer),
775 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
776 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
777 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
778 Buffer::build(
779 text_buffer,
780 None,
781 Some(Arc::new(File {
782 worktree,
783 path,
784 disk_state: DiskState::New,
785 entry_id: None,
786 is_local: true,
787 is_private: false,
788 })),
789 Capability::ReadWrite,
790 )
791 }),
792 Err(e) => Err(e),
793 }?;
794 this.update(&mut cx, |this, cx| {
795 this.add_buffer(buffer.clone(), cx)?;
796 let buffer_id = buffer.read(cx).remote_id();
797 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
798 let this = this.as_local_mut().unwrap();
799 this.local_buffer_ids_by_path.insert(
800 ProjectPath {
801 worktree_id: file.worktree_id(cx),
802 path: file.path.clone(),
803 },
804 buffer_id,
805 );
806
807 if let Some(entry_id) = file.entry_id {
808 this.local_buffer_ids_by_entry_id
809 .insert(entry_id, buffer_id);
810 }
811 }
812
813 anyhow::Ok(())
814 })??;
815
816 Ok(buffer)
817 })
818 }
819
820 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
821 cx.spawn(|buffer_store, mut cx| async move {
822 let buffer = cx.new_model(|cx| {
823 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
824 })?;
825 buffer_store.update(&mut cx, |buffer_store, cx| {
826 buffer_store.add_buffer(buffer.clone(), cx).log_err();
827 })?;
828 Ok(buffer)
829 })
830 }
831
832 fn reload_buffers(
833 &self,
834 buffers: HashSet<Model<Buffer>>,
835 push_to_history: bool,
836 cx: &mut ModelContext<BufferStore>,
837 ) -> Task<Result<ProjectTransaction>> {
838 cx.spawn(move |_, mut cx| async move {
839 let mut project_transaction = ProjectTransaction::default();
840 for buffer in buffers {
841 let transaction = buffer
842 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
843 .await?;
844 buffer.update(&mut cx, |buffer, cx| {
845 if let Some(transaction) = transaction {
846 if !push_to_history {
847 buffer.forget_transaction(transaction.id);
848 }
849 project_transaction.0.insert(cx.handle(), transaction);
850 }
851 })?;
852 }
853
854 Ok(project_transaction)
855 })
856 }
857}
858
859impl BufferStore {
860 pub fn init(client: &AnyProtoClient) {
861 client.add_model_message_handler(Self::handle_buffer_reloaded);
862 client.add_model_message_handler(Self::handle_buffer_saved);
863 client.add_model_message_handler(Self::handle_update_buffer_file);
864 client.add_model_message_handler(Self::handle_update_diff_base);
865 client.add_model_request_handler(Self::handle_save_buffer);
866 client.add_model_request_handler(Self::handle_blame_buffer);
867 client.add_model_request_handler(Self::handle_reload_buffers);
868 client.add_model_request_handler(Self::handle_get_permalink_to_line);
869 }
870
871 /// Creates a buffer store, optionally retaining its buffers.
872 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
873 Self {
874 state: BufferStoreState::Local(LocalBufferStore {
875 local_buffer_ids_by_path: Default::default(),
876 local_buffer_ids_by_entry_id: Default::default(),
877 worktree_store: worktree_store.clone(),
878 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
879 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
880 let this = this.as_local_mut().unwrap();
881 this.subscribe_to_worktree(worktree, cx);
882 }
883 }),
884 }),
885 downstream_client: None,
886 opened_buffers: Default::default(),
887 shared_buffers: Default::default(),
888 loading_buffers_by_path: Default::default(),
889 worktree_store,
890 }
891 }
892
893 pub fn remote(
894 worktree_store: Model<WorktreeStore>,
895 upstream_client: AnyProtoClient,
896 remote_id: u64,
897 _cx: &mut ModelContext<Self>,
898 ) -> Self {
899 Self {
900 state: BufferStoreState::Remote(RemoteBufferStore {
901 shared_with_me: Default::default(),
902 loading_remote_buffers_by_id: Default::default(),
903 remote_buffer_listeners: Default::default(),
904 project_id: remote_id,
905 upstream_client,
906 worktree_store: worktree_store.clone(),
907 }),
908 downstream_client: None,
909 opened_buffers: Default::default(),
910 loading_buffers_by_path: Default::default(),
911 shared_buffers: Default::default(),
912 worktree_store,
913 }
914 }
915
916 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
917 match &mut self.state {
918 BufferStoreState::Local(state) => Some(state),
919 _ => None,
920 }
921 }
922
923 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
924 match &mut self.state {
925 BufferStoreState::Remote(state) => Some(state),
926 _ => None,
927 }
928 }
929
930 fn as_remote(&self) -> Option<&RemoteBufferStore> {
931 match &self.state {
932 BufferStoreState::Remote(state) => Some(state),
933 _ => None,
934 }
935 }
936
937 pub fn open_buffer(
938 &mut self,
939 project_path: ProjectPath,
940 cx: &mut ModelContext<Self>,
941 ) -> Task<Result<Model<Buffer>>> {
942 let existing_buffer = self.get_by_path(&project_path, cx);
943 if let Some(existing_buffer) = existing_buffer {
944 return Task::ready(Ok(existing_buffer));
945 }
946
947 let Some(worktree) = self
948 .worktree_store
949 .read(cx)
950 .worktree_for_id(project_path.worktree_id, cx)
951 else {
952 return Task::ready(Err(anyhow!("no such worktree")));
953 };
954
955 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
956 // If the given path is already being loaded, then wait for that existing
957 // task to complete and return the same buffer.
958 hash_map::Entry::Occupied(e) => e.get().clone(),
959
960 // Otherwise, record the fact that this path is now being loaded.
961 hash_map::Entry::Vacant(entry) => {
962 let (mut tx, rx) = postage::watch::channel();
963 entry.insert(rx.clone());
964
965 let path = project_path.path.clone();
966 let load_buffer = match &self.state {
967 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
968 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
969 };
970
971 cx.spawn(move |this, mut cx| async move {
972 let load_result = load_buffer.await;
973 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
974 // Record the fact that the buffer is no longer loading.
975 this.loading_buffers_by_path.remove(&project_path);
976 let buffer = load_result.map_err(Arc::new)?;
977 Ok(buffer)
978 })?);
979 anyhow::Ok(())
980 })
981 .detach();
982 rx
983 }
984 };
985
986 cx.background_executor().spawn(async move {
987 Self::wait_for_loading_buffer(loading_watch)
988 .await
989 .map_err(|e| e.cloned())
990 })
991 }
992
993 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
994 match &self.state {
995 BufferStoreState::Local(this) => this.create_buffer(cx),
996 BufferStoreState::Remote(this) => this.create_buffer(cx),
997 }
998 }
999
1000 pub fn save_buffer(
1001 &mut self,
1002 buffer: Model<Buffer>,
1003 cx: &mut ModelContext<Self>,
1004 ) -> Task<Result<()>> {
1005 match &mut self.state {
1006 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1007 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1008 }
1009 }
1010
1011 pub fn save_buffer_as(
1012 &mut self,
1013 buffer: Model<Buffer>,
1014 path: ProjectPath,
1015 cx: &mut ModelContext<Self>,
1016 ) -> Task<Result<()>> {
1017 let old_file = buffer.read(cx).file().cloned();
1018 let task = match &self.state {
1019 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1020 BufferStoreState::Remote(this) => {
1021 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1022 }
1023 };
1024 cx.spawn(|this, mut cx| async move {
1025 task.await?;
1026 this.update(&mut cx, |_, cx| {
1027 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1028 })
1029 })
1030 }
1031
1032 pub fn blame_buffer(
1033 &self,
1034 buffer: &Model<Buffer>,
1035 version: Option<clock::Global>,
1036 cx: &AppContext,
1037 ) -> Task<Result<Option<Blame>>> {
1038 let buffer = buffer.read(cx);
1039 let Some(file) = File::from_dyn(buffer.file()) else {
1040 return Task::ready(Err(anyhow!("buffer has no file")));
1041 };
1042
1043 match file.worktree.clone().read(cx) {
1044 Worktree::Local(worktree) => {
1045 let worktree = worktree.snapshot();
1046 let blame_params = maybe!({
1047 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1048 Some(repo_for_path) => repo_for_path,
1049 None => return Ok(None),
1050 };
1051
1052 let relative_path = repo_entry
1053 .relativize(&worktree, &file.path)
1054 .context("failed to relativize buffer path")?;
1055
1056 let repo = local_repo_entry.repo().clone();
1057
1058 let content = match version {
1059 Some(version) => buffer.rope_for_version(&version).clone(),
1060 None => buffer.as_rope().clone(),
1061 };
1062
1063 anyhow::Ok(Some((repo, relative_path, content)))
1064 });
1065
1066 cx.background_executor().spawn(async move {
1067 let Some((repo, relative_path, content)) = blame_params? else {
1068 return Ok(None);
1069 };
1070 repo.blame(&relative_path, content)
1071 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1072 .map(Some)
1073 })
1074 }
1075 Worktree::Remote(worktree) => {
1076 let buffer_id = buffer.remote_id();
1077 let version = buffer.version();
1078 let project_id = worktree.project_id();
1079 let client = worktree.client();
1080 cx.spawn(|_| async move {
1081 let response = client
1082 .request(proto::BlameBuffer {
1083 project_id,
1084 buffer_id: buffer_id.into(),
1085 version: serialize_version(&version),
1086 })
1087 .await?;
1088 Ok(deserialize_blame_buffer_response(response))
1089 })
1090 }
1091 }
1092 }
1093
1094 pub fn get_permalink_to_line(
1095 &self,
1096 buffer: &Model<Buffer>,
1097 selection: Range<u32>,
1098 cx: &AppContext,
1099 ) -> Task<Result<url::Url>> {
1100 let buffer = buffer.read(cx);
1101 let Some(file) = File::from_dyn(buffer.file()) else {
1102 return Task::ready(Err(anyhow!("buffer has no file")));
1103 };
1104
1105 match file.worktree.clone().read(cx) {
1106 Worktree::Local(worktree) => {
1107 let Some(repo) = worktree.local_git_repo(file.path()) else {
1108 return Task::ready(Err(anyhow!("no repository for buffer found")));
1109 };
1110
1111 let path = file.path().clone();
1112
1113 cx.spawn(|cx| async move {
1114 const REMOTE_NAME: &str = "origin";
1115 let origin_url = repo
1116 .remote_url(REMOTE_NAME)
1117 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1118
1119 let sha = repo
1120 .head_sha()
1121 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1122
1123 let provider_registry =
1124 cx.update(GitHostingProviderRegistry::default_global)?;
1125
1126 let (provider, remote) =
1127 parse_git_remote_url(provider_registry, &origin_url)
1128 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1129
1130 let path = path
1131 .to_str()
1132 .context("failed to convert buffer path to string")?;
1133
1134 Ok(provider.build_permalink(
1135 remote,
1136 BuildPermalinkParams {
1137 sha: &sha,
1138 path,
1139 selection: Some(selection),
1140 },
1141 ))
1142 })
1143 }
1144 Worktree::Remote(worktree) => {
1145 let buffer_id = buffer.remote_id();
1146 let project_id = worktree.project_id();
1147 let client = worktree.client();
1148 cx.spawn(|_| async move {
1149 let response = client
1150 .request(proto::GetPermalinkToLine {
1151 project_id,
1152 buffer_id: buffer_id.into(),
1153 selection: Some(proto::Range {
1154 start: selection.start as u64,
1155 end: selection.end as u64,
1156 }),
1157 })
1158 .await?;
1159
1160 url::Url::parse(&response.permalink).context("failed to parse permalink")
1161 })
1162 }
1163 }
1164 }
1165
1166 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1167 let remote_id = buffer.read(cx).remote_id();
1168 let is_remote = buffer.read(cx).replica_id() != 0;
1169 let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
1170
1171 let handle = cx.handle().downgrade();
1172 buffer.update(cx, move |_, cx| {
1173 cx.on_release(move |buffer, cx| {
1174 handle
1175 .update(cx, |_, cx| {
1176 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1177 })
1178 .ok();
1179 })
1180 .detach()
1181 });
1182
1183 match self.opened_buffers.entry(remote_id) {
1184 hash_map::Entry::Vacant(entry) => {
1185 entry.insert(open_buffer);
1186 }
1187 hash_map::Entry::Occupied(mut entry) => {
1188 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1189 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1190 } else if entry.get().upgrade().is_some() {
1191 if is_remote {
1192 return Ok(());
1193 } else {
1194 debug_panic!("buffer {} was already registered", remote_id);
1195 Err(anyhow!("buffer {} was already registered", remote_id))?;
1196 }
1197 }
1198 entry.insert(open_buffer);
1199 }
1200 }
1201
1202 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1203 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1204 Ok(())
1205 }
1206
1207 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1208 self.opened_buffers
1209 .values()
1210 .filter_map(|buffer| buffer.upgrade())
1211 }
1212
1213 pub fn loading_buffers(
1214 &self,
1215 ) -> impl Iterator<
1216 Item = (
1217 &ProjectPath,
1218 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1219 ),
1220 > {
1221 self.loading_buffers_by_path
1222 .iter()
1223 .map(|(path, rx)| (path, rx.clone()))
1224 }
1225
1226 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1227 self.buffers().find_map(|buffer| {
1228 let file = File::from_dyn(buffer.read(cx).file())?;
1229 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1230 Some(buffer)
1231 } else {
1232 None
1233 }
1234 })
1235 }
1236
1237 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1238 self.opened_buffers
1239 .get(&buffer_id)
1240 .and_then(|buffer| buffer.upgrade())
1241 }
1242
1243 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1244 self.get(buffer_id)
1245 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1246 }
1247
1248 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1249 self.get(buffer_id).or_else(|| {
1250 self.as_remote()
1251 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1252 })
1253 }
1254
1255 pub fn buffer_version_info(
1256 &self,
1257 cx: &AppContext,
1258 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1259 let buffers = self
1260 .buffers()
1261 .map(|buffer| {
1262 let buffer = buffer.read(cx);
1263 proto::BufferVersion {
1264 id: buffer.remote_id().into(),
1265 version: language::proto::serialize_version(&buffer.version),
1266 }
1267 })
1268 .collect();
1269 let incomplete_buffer_ids = self
1270 .as_remote()
1271 .map(|remote| remote.incomplete_buffer_ids())
1272 .unwrap_or_default();
1273 (buffers, incomplete_buffer_ids)
1274 }
1275
1276 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1277 for open_buffer in self.opened_buffers.values_mut() {
1278 if let Some(buffer) = open_buffer.upgrade() {
1279 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1280 }
1281 }
1282
1283 for buffer in self.buffers() {
1284 buffer.update(cx, |buffer, cx| {
1285 buffer.set_capability(Capability::ReadOnly, cx)
1286 });
1287 }
1288
1289 if let Some(remote) = self.as_remote_mut() {
1290 // Wake up all futures currently waiting on a buffer to get opened,
1291 // to give them a chance to fail now that we've disconnected.
1292 remote.remote_buffer_listeners.clear()
1293 }
1294 }
1295
1296 pub fn shared(
1297 &mut self,
1298 remote_id: u64,
1299 downstream_client: AnyProtoClient,
1300 _cx: &mut AppContext,
1301 ) {
1302 self.downstream_client = Some((downstream_client, remote_id));
1303 }
1304
1305 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1306 self.downstream_client.take();
1307 self.forget_shared_buffers();
1308 }
1309
1310 pub fn discard_incomplete(&mut self) {
1311 self.opened_buffers
1312 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1313 }
1314
1315 pub fn find_search_candidates(
1316 &mut self,
1317 query: &SearchQuery,
1318 mut limit: usize,
1319 fs: Arc<dyn Fs>,
1320 cx: &mut ModelContext<Self>,
1321 ) -> Receiver<Model<Buffer>> {
1322 let (tx, rx) = smol::channel::unbounded();
1323 let mut open_buffers = HashSet::default();
1324 let mut unnamed_buffers = Vec::new();
1325 for handle in self.buffers() {
1326 let buffer = handle.read(cx);
1327 if let Some(entry_id) = buffer.entry_id(cx) {
1328 open_buffers.insert(entry_id);
1329 } else {
1330 limit = limit.saturating_sub(1);
1331 unnamed_buffers.push(handle)
1332 };
1333 }
1334
1335 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1336 let mut project_paths_rx = self
1337 .worktree_store
1338 .update(cx, |worktree_store, cx| {
1339 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1340 })
1341 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1342
1343 cx.spawn(|this, mut cx| async move {
1344 for buffer in unnamed_buffers {
1345 tx.send(buffer).await.ok();
1346 }
1347
1348 while let Some(project_paths) = project_paths_rx.next().await {
1349 let buffers = this.update(&mut cx, |this, cx| {
1350 project_paths
1351 .into_iter()
1352 .map(|project_path| this.open_buffer(project_path, cx))
1353 .collect::<Vec<_>>()
1354 })?;
1355 for buffer_task in buffers {
1356 if let Some(buffer) = buffer_task.await.log_err() {
1357 if tx.send(buffer).await.is_err() {
1358 return anyhow::Ok(());
1359 }
1360 }
1361 }
1362 }
1363 anyhow::Ok(())
1364 })
1365 .detach();
1366 rx
1367 }
1368
1369 fn on_buffer_event(
1370 &mut self,
1371 buffer: Model<Buffer>,
1372 event: &BufferEvent,
1373 cx: &mut ModelContext<Self>,
1374 ) {
1375 match event {
1376 BufferEvent::FileHandleChanged => {
1377 if let Some(local) = self.as_local_mut() {
1378 local.buffer_changed_file(buffer, cx);
1379 }
1380 }
1381 BufferEvent::Reloaded => {
1382 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1383 return;
1384 };
1385 let buffer = buffer.read(cx);
1386 downstream_client
1387 .send(proto::BufferReloaded {
1388 project_id: *project_id,
1389 buffer_id: buffer.remote_id().to_proto(),
1390 version: serialize_version(&buffer.version()),
1391 mtime: buffer.saved_mtime().map(|t| t.into()),
1392 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1393 })
1394 .log_err();
1395 }
1396 _ => {}
1397 }
1398 }
1399
1400 pub async fn handle_update_buffer(
1401 this: Model<Self>,
1402 envelope: TypedEnvelope<proto::UpdateBuffer>,
1403 mut cx: AsyncAppContext,
1404 ) -> Result<proto::Ack> {
1405 let payload = envelope.payload.clone();
1406 let buffer_id = BufferId::new(payload.buffer_id)?;
1407 let ops = payload
1408 .operations
1409 .into_iter()
1410 .map(language::proto::deserialize_operation)
1411 .collect::<Result<Vec<_>, _>>()?;
1412 this.update(&mut cx, |this, cx| {
1413 match this.opened_buffers.entry(buffer_id) {
1414 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1415 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1416 OpenBuffer::Buffer(buffer) => {
1417 if let Some(buffer) = buffer.upgrade() {
1418 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1419 }
1420 }
1421 },
1422 hash_map::Entry::Vacant(e) => {
1423 e.insert(OpenBuffer::Operations(ops));
1424 }
1425 }
1426 Ok(proto::Ack {})
1427 })?
1428 }
1429
1430 pub fn handle_synchronize_buffers(
1431 &mut self,
1432 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1433 cx: &mut ModelContext<Self>,
1434 client: Arc<Client>,
1435 ) -> Result<proto::SynchronizeBuffersResponse> {
1436 let project_id = envelope.payload.project_id;
1437 let mut response = proto::SynchronizeBuffersResponse {
1438 buffers: Default::default(),
1439 };
1440 let Some(guest_id) = envelope.original_sender_id else {
1441 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1442 };
1443
1444 self.shared_buffers.entry(guest_id).or_default().clear();
1445 for buffer in envelope.payload.buffers {
1446 let buffer_id = BufferId::new(buffer.id)?;
1447 let remote_version = language::proto::deserialize_version(&buffer.version);
1448 if let Some(buffer) = self.get(buffer_id) {
1449 self.shared_buffers
1450 .entry(guest_id)
1451 .or_default()
1452 .insert(buffer.clone());
1453
1454 let buffer = buffer.read(cx);
1455 response.buffers.push(proto::BufferVersion {
1456 id: buffer_id.into(),
1457 version: language::proto::serialize_version(&buffer.version),
1458 });
1459
1460 let operations = buffer.serialize_ops(Some(remote_version), cx);
1461 let client = client.clone();
1462 if let Some(file) = buffer.file() {
1463 client
1464 .send(proto::UpdateBufferFile {
1465 project_id,
1466 buffer_id: buffer_id.into(),
1467 file: Some(file.to_proto(cx)),
1468 })
1469 .log_err();
1470 }
1471
1472 client
1473 .send(proto::UpdateDiffBase {
1474 project_id,
1475 buffer_id: buffer_id.into(),
1476 diff_base: buffer.diff_base().map(ToString::to_string),
1477 })
1478 .log_err();
1479
1480 client
1481 .send(proto::BufferReloaded {
1482 project_id,
1483 buffer_id: buffer_id.into(),
1484 version: language::proto::serialize_version(buffer.saved_version()),
1485 mtime: buffer.saved_mtime().map(|time| time.into()),
1486 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1487 as i32,
1488 })
1489 .log_err();
1490
1491 cx.background_executor()
1492 .spawn(
1493 async move {
1494 let operations = operations.await;
1495 for chunk in split_operations(operations) {
1496 client
1497 .request(proto::UpdateBuffer {
1498 project_id,
1499 buffer_id: buffer_id.into(),
1500 operations: chunk,
1501 })
1502 .await?;
1503 }
1504 anyhow::Ok(())
1505 }
1506 .log_err(),
1507 )
1508 .detach();
1509 }
1510 }
1511 Ok(response)
1512 }
1513
1514 pub fn handle_create_buffer_for_peer(
1515 &mut self,
1516 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1517 replica_id: u16,
1518 capability: Capability,
1519 cx: &mut ModelContext<Self>,
1520 ) -> Result<()> {
1521 let Some(remote) = self.as_remote_mut() else {
1522 return Err(anyhow!("buffer store is not a remote"));
1523 };
1524
1525 if let Some(buffer) =
1526 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1527 {
1528 self.add_buffer(buffer, cx)?;
1529 }
1530
1531 Ok(())
1532 }
1533
1534 pub async fn handle_update_buffer_file(
1535 this: Model<Self>,
1536 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1537 mut cx: AsyncAppContext,
1538 ) -> Result<()> {
1539 let buffer_id = envelope.payload.buffer_id;
1540 let buffer_id = BufferId::new(buffer_id)?;
1541
1542 this.update(&mut cx, |this, cx| {
1543 let payload = envelope.payload.clone();
1544 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1545 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1546 let worktree = this
1547 .worktree_store
1548 .read(cx)
1549 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1550 .ok_or_else(|| anyhow!("no such worktree"))?;
1551 let file = File::from_proto(file, worktree, cx)?;
1552 let old_file = buffer.update(cx, |buffer, cx| {
1553 let old_file = buffer.file().cloned();
1554 let new_path = file.path.clone();
1555 buffer.file_updated(Arc::new(file), cx);
1556 if old_file
1557 .as_ref()
1558 .map_or(true, |old| *old.path() != new_path)
1559 {
1560 Some(old_file)
1561 } else {
1562 None
1563 }
1564 });
1565 if let Some(old_file) = old_file {
1566 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1567 }
1568 }
1569 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1570 downstream_client
1571 .send(proto::UpdateBufferFile {
1572 project_id: *project_id,
1573 buffer_id: buffer_id.into(),
1574 file: envelope.payload.file,
1575 })
1576 .log_err();
1577 }
1578 Ok(())
1579 })?
1580 }
1581
1582 pub async fn handle_update_diff_base(
1583 this: Model<Self>,
1584 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1585 mut cx: AsyncAppContext,
1586 ) -> Result<()> {
1587 this.update(&mut cx, |this, cx| {
1588 let buffer_id = envelope.payload.buffer_id;
1589 let buffer_id = BufferId::new(buffer_id)?;
1590 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1591 buffer.update(cx, |buffer, cx| {
1592 buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
1593 });
1594 }
1595 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1596 downstream_client
1597 .send(proto::UpdateDiffBase {
1598 project_id: *project_id,
1599 buffer_id: buffer_id.into(),
1600 diff_base: envelope.payload.diff_base,
1601 })
1602 .log_err();
1603 }
1604 Ok(())
1605 })?
1606 }
1607
1608 pub async fn handle_save_buffer(
1609 this: Model<Self>,
1610 envelope: TypedEnvelope<proto::SaveBuffer>,
1611 mut cx: AsyncAppContext,
1612 ) -> Result<proto::BufferSaved> {
1613 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1614 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1615 anyhow::Ok((
1616 this.get_existing(buffer_id)?,
1617 this.downstream_client
1618 .as_ref()
1619 .map(|(_, project_id)| *project_id)
1620 .context("project is not shared")?,
1621 ))
1622 })??;
1623 buffer
1624 .update(&mut cx, |buffer, _| {
1625 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1626 })?
1627 .await?;
1628 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1629
1630 if let Some(new_path) = envelope.payload.new_path {
1631 let new_path = ProjectPath::from_proto(new_path);
1632 this.update(&mut cx, |this, cx| {
1633 this.save_buffer_as(buffer.clone(), new_path, cx)
1634 })?
1635 .await?;
1636 } else {
1637 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1638 .await?;
1639 }
1640
1641 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1642 project_id,
1643 buffer_id: buffer_id.into(),
1644 version: serialize_version(buffer.saved_version()),
1645 mtime: buffer.saved_mtime().map(|time| time.into()),
1646 })
1647 }
1648
1649 pub async fn handle_close_buffer(
1650 this: Model<Self>,
1651 envelope: TypedEnvelope<proto::CloseBuffer>,
1652 mut cx: AsyncAppContext,
1653 ) -> Result<()> {
1654 let peer_id = envelope.sender_id;
1655 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1656 this.update(&mut cx, |this, _| {
1657 if let Some(buffer) = this.get(buffer_id) {
1658 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1659 if shared.remove(&buffer) {
1660 if shared.is_empty() {
1661 this.shared_buffers.remove(&peer_id);
1662 }
1663 return;
1664 }
1665 }
1666 };
1667 debug_panic!(
1668 "peer_id {} closed buffer_id {} which was either not open or already closed",
1669 peer_id,
1670 buffer_id
1671 )
1672 })
1673 }
1674
1675 pub async fn handle_buffer_saved(
1676 this: Model<Self>,
1677 envelope: TypedEnvelope<proto::BufferSaved>,
1678 mut cx: AsyncAppContext,
1679 ) -> Result<()> {
1680 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1681 let version = deserialize_version(&envelope.payload.version);
1682 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1683 this.update(&mut cx, move |this, cx| {
1684 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1685 buffer.update(cx, |buffer, cx| {
1686 buffer.did_save(version, mtime, cx);
1687 });
1688 }
1689
1690 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1691 downstream_client
1692 .send(proto::BufferSaved {
1693 project_id: *project_id,
1694 buffer_id: buffer_id.into(),
1695 mtime: envelope.payload.mtime,
1696 version: envelope.payload.version,
1697 })
1698 .log_err();
1699 }
1700 })
1701 }
1702
1703 pub async fn handle_buffer_reloaded(
1704 this: Model<Self>,
1705 envelope: TypedEnvelope<proto::BufferReloaded>,
1706 mut cx: AsyncAppContext,
1707 ) -> Result<()> {
1708 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1709 let version = deserialize_version(&envelope.payload.version);
1710 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1711 let line_ending = deserialize_line_ending(
1712 proto::LineEnding::from_i32(envelope.payload.line_ending)
1713 .ok_or_else(|| anyhow!("missing line ending"))?,
1714 );
1715 this.update(&mut cx, |this, cx| {
1716 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1717 buffer.update(cx, |buffer, cx| {
1718 buffer.did_reload(version, line_ending, mtime, cx);
1719 });
1720 }
1721
1722 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1723 downstream_client
1724 .send(proto::BufferReloaded {
1725 project_id: *project_id,
1726 buffer_id: buffer_id.into(),
1727 mtime: envelope.payload.mtime,
1728 version: envelope.payload.version,
1729 line_ending: envelope.payload.line_ending,
1730 })
1731 .log_err();
1732 }
1733 })
1734 }
1735
1736 pub async fn handle_blame_buffer(
1737 this: Model<Self>,
1738 envelope: TypedEnvelope<proto::BlameBuffer>,
1739 mut cx: AsyncAppContext,
1740 ) -> Result<proto::BlameBufferResponse> {
1741 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1742 let version = deserialize_version(&envelope.payload.version);
1743 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1744 buffer
1745 .update(&mut cx, |buffer, _| {
1746 buffer.wait_for_version(version.clone())
1747 })?
1748 .await?;
1749 let blame = this
1750 .update(&mut cx, |this, cx| {
1751 this.blame_buffer(&buffer, Some(version), cx)
1752 })?
1753 .await?;
1754 Ok(serialize_blame_buffer_response(blame))
1755 }
1756
1757 pub async fn handle_get_permalink_to_line(
1758 this: Model<Self>,
1759 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1760 mut cx: AsyncAppContext,
1761 ) -> Result<proto::GetPermalinkToLineResponse> {
1762 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1763 // let version = deserialize_version(&envelope.payload.version);
1764 let selection = {
1765 let proto_selection = envelope
1766 .payload
1767 .selection
1768 .context("no selection to get permalink for defined")?;
1769 proto_selection.start as u32..proto_selection.end as u32
1770 };
1771 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1772 let permalink = this
1773 .update(&mut cx, |this, cx| {
1774 this.get_permalink_to_line(&buffer, selection, cx)
1775 })?
1776 .await?;
1777 Ok(proto::GetPermalinkToLineResponse {
1778 permalink: permalink.to_string(),
1779 })
1780 }
1781
1782 pub async fn wait_for_loading_buffer(
1783 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1784 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1785 loop {
1786 if let Some(result) = receiver.borrow().as_ref() {
1787 match result {
1788 Ok(buffer) => return Ok(buffer.to_owned()),
1789 Err(e) => return Err(e.to_owned()),
1790 }
1791 }
1792 receiver.next().await;
1793 }
1794 }
1795
1796 pub fn reload_buffers(
1797 &self,
1798 buffers: HashSet<Model<Buffer>>,
1799 push_to_history: bool,
1800 cx: &mut ModelContext<Self>,
1801 ) -> Task<Result<ProjectTransaction>> {
1802 if buffers.is_empty() {
1803 return Task::ready(Ok(ProjectTransaction::default()));
1804 }
1805 match &self.state {
1806 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1807 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1808 }
1809 }
1810
1811 async fn handle_reload_buffers(
1812 this: Model<Self>,
1813 envelope: TypedEnvelope<proto::ReloadBuffers>,
1814 mut cx: AsyncAppContext,
1815 ) -> Result<proto::ReloadBuffersResponse> {
1816 let sender_id = envelope.original_sender_id().unwrap_or_default();
1817 let reload = this.update(&mut cx, |this, cx| {
1818 let mut buffers = HashSet::default();
1819 for buffer_id in &envelope.payload.buffer_ids {
1820 let buffer_id = BufferId::new(*buffer_id)?;
1821 buffers.insert(this.get_existing(buffer_id)?);
1822 }
1823 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1824 })??;
1825
1826 let project_transaction = reload.await?;
1827 let project_transaction = this.update(&mut cx, |this, cx| {
1828 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1829 })?;
1830 Ok(proto::ReloadBuffersResponse {
1831 transaction: Some(project_transaction),
1832 })
1833 }
1834
1835 pub fn create_buffer_for_peer(
1836 &mut self,
1837 buffer: &Model<Buffer>,
1838 peer_id: proto::PeerId,
1839 cx: &mut ModelContext<Self>,
1840 ) -> Task<Result<()>> {
1841 let buffer_id = buffer.read(cx).remote_id();
1842 if !self
1843 .shared_buffers
1844 .entry(peer_id)
1845 .or_default()
1846 .insert(buffer.clone())
1847 {
1848 return Task::ready(Ok(()));
1849 }
1850
1851 let Some((client, project_id)) = self.downstream_client.clone() else {
1852 return Task::ready(Ok(()));
1853 };
1854
1855 cx.spawn(|this, mut cx| async move {
1856 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1857 return anyhow::Ok(());
1858 };
1859
1860 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1861 let operations = operations.await;
1862 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1863
1864 let initial_state = proto::CreateBufferForPeer {
1865 project_id,
1866 peer_id: Some(peer_id),
1867 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1868 };
1869
1870 if client.send(initial_state).log_err().is_some() {
1871 let client = client.clone();
1872 cx.background_executor()
1873 .spawn(async move {
1874 let mut chunks = split_operations(operations).peekable();
1875 while let Some(chunk) = chunks.next() {
1876 let is_last = chunks.peek().is_none();
1877 client.send(proto::CreateBufferForPeer {
1878 project_id,
1879 peer_id: Some(peer_id),
1880 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1881 proto::BufferChunk {
1882 buffer_id: buffer_id.into(),
1883 operations: chunk,
1884 is_last,
1885 },
1886 )),
1887 })?;
1888 }
1889 anyhow::Ok(())
1890 })
1891 .await
1892 .log_err();
1893 }
1894 Ok(())
1895 })
1896 }
1897
1898 pub fn forget_shared_buffers(&mut self) {
1899 self.shared_buffers.clear();
1900 }
1901
1902 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1903 self.shared_buffers.remove(peer_id);
1904 }
1905
1906 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1907 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1908 self.shared_buffers.insert(new_peer_id, buffers);
1909 }
1910 }
1911
1912 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
1913 &self.shared_buffers
1914 }
1915
1916 pub fn create_local_buffer(
1917 &mut self,
1918 text: &str,
1919 language: Option<Arc<Language>>,
1920 cx: &mut ModelContext<Self>,
1921 ) -> Model<Buffer> {
1922 let buffer = cx.new_model(|cx| {
1923 Buffer::local(text, cx)
1924 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1925 });
1926
1927 self.add_buffer(buffer.clone(), cx).log_err();
1928 let buffer_id = buffer.read(cx).remote_id();
1929
1930 let this = self
1931 .as_local_mut()
1932 .expect("local-only method called in a non-local context");
1933 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1934 this.local_buffer_ids_by_path.insert(
1935 ProjectPath {
1936 worktree_id: file.worktree_id(cx),
1937 path: file.path.clone(),
1938 },
1939 buffer_id,
1940 );
1941
1942 if let Some(entry_id) = file.entry_id {
1943 this.local_buffer_ids_by_entry_id
1944 .insert(entry_id, buffer_id);
1945 }
1946 }
1947 buffer
1948 }
1949
1950 pub fn deserialize_project_transaction(
1951 &mut self,
1952 message: proto::ProjectTransaction,
1953 push_to_history: bool,
1954 cx: &mut ModelContext<Self>,
1955 ) -> Task<Result<ProjectTransaction>> {
1956 if let Some(this) = self.as_remote_mut() {
1957 this.deserialize_project_transaction(message, push_to_history, cx)
1958 } else {
1959 debug_panic!("not a remote buffer store");
1960 Task::ready(Err(anyhow!("not a remote buffer store")))
1961 }
1962 }
1963
1964 pub fn wait_for_remote_buffer(
1965 &mut self,
1966 id: BufferId,
1967 cx: &mut ModelContext<BufferStore>,
1968 ) -> Task<Result<Model<Buffer>>> {
1969 if let Some(this) = self.as_remote_mut() {
1970 this.wait_for_remote_buffer(id, cx)
1971 } else {
1972 debug_panic!("not a remote buffer store");
1973 Task::ready(Err(anyhow!("not a remote buffer store")))
1974 }
1975 }
1976
1977 pub fn serialize_project_transaction_for_peer(
1978 &mut self,
1979 project_transaction: ProjectTransaction,
1980 peer_id: proto::PeerId,
1981 cx: &mut ModelContext<Self>,
1982 ) -> proto::ProjectTransaction {
1983 let mut serialized_transaction = proto::ProjectTransaction {
1984 buffer_ids: Default::default(),
1985 transactions: Default::default(),
1986 };
1987 for (buffer, transaction) in project_transaction.0 {
1988 self.create_buffer_for_peer(&buffer, peer_id, cx)
1989 .detach_and_log_err(cx);
1990 serialized_transaction
1991 .buffer_ids
1992 .push(buffer.read(cx).remote_id().into());
1993 serialized_transaction
1994 .transactions
1995 .push(language::proto::serialize_transaction(&transaction));
1996 }
1997 serialized_transaction
1998 }
1999}
2000
2001impl OpenBuffer {
2002 fn upgrade(&self) -> Option<Model<Buffer>> {
2003 match self {
2004 OpenBuffer::Buffer(handle) => handle.upgrade(),
2005 OpenBuffer::Operations(_) => None,
2006 }
2007 }
2008}
2009
2010fn is_not_found_error(error: &anyhow::Error) -> bool {
2011 error
2012 .root_cause()
2013 .downcast_ref::<io::Error>()
2014 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2015}
2016
2017fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2018 let Some(blame) = blame else {
2019 return proto::BlameBufferResponse {
2020 blame_response: None,
2021 };
2022 };
2023
2024 let entries = blame
2025 .entries
2026 .into_iter()
2027 .map(|entry| proto::BlameEntry {
2028 sha: entry.sha.as_bytes().into(),
2029 start_line: entry.range.start,
2030 end_line: entry.range.end,
2031 original_line_number: entry.original_line_number,
2032 author: entry.author.clone(),
2033 author_mail: entry.author_mail.clone(),
2034 author_time: entry.author_time,
2035 author_tz: entry.author_tz.clone(),
2036 committer: entry.committer.clone(),
2037 committer_mail: entry.committer_mail.clone(),
2038 committer_time: entry.committer_time,
2039 committer_tz: entry.committer_tz.clone(),
2040 summary: entry.summary.clone(),
2041 previous: entry.previous.clone(),
2042 filename: entry.filename.clone(),
2043 })
2044 .collect::<Vec<_>>();
2045
2046 let messages = blame
2047 .messages
2048 .into_iter()
2049 .map(|(oid, message)| proto::CommitMessage {
2050 oid: oid.as_bytes().into(),
2051 message,
2052 })
2053 .collect::<Vec<_>>();
2054
2055 let permalinks = blame
2056 .permalinks
2057 .into_iter()
2058 .map(|(oid, url)| proto::CommitPermalink {
2059 oid: oid.as_bytes().into(),
2060 permalink: url.to_string(),
2061 })
2062 .collect::<Vec<_>>();
2063
2064 proto::BlameBufferResponse {
2065 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2066 entries,
2067 messages,
2068 permalinks,
2069 remote_url: blame.remote_url,
2070 }),
2071 }
2072}
2073
2074fn deserialize_blame_buffer_response(
2075 response: proto::BlameBufferResponse,
2076) -> Option<git::blame::Blame> {
2077 let response = response.blame_response?;
2078 let entries = response
2079 .entries
2080 .into_iter()
2081 .filter_map(|entry| {
2082 Some(git::blame::BlameEntry {
2083 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2084 range: entry.start_line..entry.end_line,
2085 original_line_number: entry.original_line_number,
2086 committer: entry.committer,
2087 committer_time: entry.committer_time,
2088 committer_tz: entry.committer_tz,
2089 committer_mail: entry.committer_mail,
2090 author: entry.author,
2091 author_mail: entry.author_mail,
2092 author_time: entry.author_time,
2093 author_tz: entry.author_tz,
2094 summary: entry.summary,
2095 previous: entry.previous,
2096 filename: entry.filename,
2097 })
2098 })
2099 .collect::<Vec<_>>();
2100
2101 let messages = response
2102 .messages
2103 .into_iter()
2104 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2105 .collect::<HashMap<_, _>>();
2106
2107 let permalinks = response
2108 .permalinks
2109 .into_iter()
2110 .filter_map(|permalink| {
2111 Some((
2112 git::Oid::from_bytes(&permalink.oid).ok()?,
2113 Url::from_str(&permalink.permalink).ok()?,
2114 ))
2115 })
2116 .collect::<HashMap<_, _>>();
2117
2118 Some(Blame {
2119 entries,
2120 permalinks,
2121 messages,
2122 remote_url: response.remote_url,
2123 })
2124}