1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, NoRepositoryError, ProjectPath,
5};
6use anyhow::{anyhow, Context as _, Result};
7use collections::{hash_map, HashMap, HashSet};
8use fs::Fs;
9use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
10use git::blame::Blame;
11use gpui::{
12 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
13};
14use http_client::Url;
15use language::{
16 proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
17 Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
18};
19use rpc::{
20 proto::{self, AnyProtoClient, EnvelopedMessage, PeerId},
21 ErrorExt as _, TypedEnvelope,
22};
23use smol::channel::Receiver;
24use std::{io, path::Path, str::FromStr as _, sync::Arc};
25use text::BufferId;
26use util::{debug_panic, maybe, ResultExt as _};
27use worktree::{
28 File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
29 WorktreeId,
30};
31
32/// A set of open buffers.
33pub struct BufferStore {
34 remote_id: Option<u64>,
35 #[allow(unused)]
36 worktree_store: Model<WorktreeStore>,
37 opened_buffers: HashMap<BufferId, OpenBuffer>,
38 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
39 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
40 #[allow(clippy::type_complexity)]
41 loading_buffers_by_path: HashMap<
42 ProjectPath,
43 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
44 >,
45 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
46 remote_buffer_listeners:
47 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
48}
49
50enum OpenBuffer {
51 Strong(Model<Buffer>),
52 Weak(WeakModel<Buffer>),
53 Operations(Vec<Operation>),
54}
55
56pub enum BufferStoreEvent {
57 BufferAdded(Model<Buffer>),
58 BufferChangedFilePath {
59 buffer: Model<Buffer>,
60 old_file: Option<Arc<dyn language::File>>,
61 },
62 MessageToReplicas(Box<proto::Envelope>),
63}
64
65impl EventEmitter<BufferStoreEvent> for BufferStore {}
66
67impl BufferStore {
68 /// Creates a buffer store, optionally retaining its buffers.
69 ///
70 /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
71 /// and won't be released unless they are explicitly removed, or `retain_buffers`
72 /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
73 /// weak handles.
74 pub fn new(
75 worktree_store: Model<WorktreeStore>,
76 remote_id: Option<u64>,
77 cx: &mut ModelContext<Self>,
78 ) -> Self {
79 cx.subscribe(&worktree_store, |this, _, event, cx| match event {
80 WorktreeStoreEvent::WorktreeAdded(worktree) => {
81 this.subscribe_to_worktree(worktree, cx);
82 }
83 _ => {}
84 })
85 .detach();
86
87 Self {
88 remote_id,
89 worktree_store,
90 opened_buffers: Default::default(),
91 remote_buffer_listeners: Default::default(),
92 loading_remote_buffers_by_id: Default::default(),
93 local_buffer_ids_by_path: Default::default(),
94 local_buffer_ids_by_entry_id: Default::default(),
95 loading_buffers_by_path: Default::default(),
96 }
97 }
98
99 pub fn open_buffer(
100 &mut self,
101 project_path: ProjectPath,
102 cx: &mut ModelContext<Self>,
103 ) -> Task<Result<Model<Buffer>>> {
104 let existing_buffer = self.get_by_path(&project_path, cx);
105 if let Some(existing_buffer) = existing_buffer {
106 return Task::ready(Ok(existing_buffer));
107 }
108
109 let Some(worktree) = self
110 .worktree_store
111 .read(cx)
112 .worktree_for_id(project_path.worktree_id, cx)
113 else {
114 return Task::ready(Err(anyhow!("no such worktree")));
115 };
116
117 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
118 // If the given path is already being loaded, then wait for that existing
119 // task to complete and return the same buffer.
120 hash_map::Entry::Occupied(e) => e.get().clone(),
121
122 // Otherwise, record the fact that this path is now being loaded.
123 hash_map::Entry::Vacant(entry) => {
124 let (mut tx, rx) = postage::watch::channel();
125 entry.insert(rx.clone());
126
127 let project_path = project_path.clone();
128 let load_buffer = match worktree.read(cx) {
129 Worktree::Local(_) => {
130 self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
131 }
132 Worktree::Remote(tree) => {
133 self.open_remote_buffer_internal(&project_path.path, tree, cx)
134 }
135 };
136
137 cx.spawn(move |this, mut cx| async move {
138 let load_result = load_buffer.await;
139 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
140 // Record the fact that the buffer is no longer loading.
141 this.loading_buffers_by_path.remove(&project_path);
142 let buffer = load_result.map_err(Arc::new)?;
143 Ok(buffer)
144 })?);
145 anyhow::Ok(())
146 })
147 .detach();
148 rx
149 }
150 };
151
152 cx.background_executor().spawn(async move {
153 Self::wait_for_loading_buffer(loading_watch)
154 .await
155 .map_err(|e| e.cloned())
156 })
157 }
158
159 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
160 cx.subscribe(worktree, |this, worktree, event, cx| {
161 if worktree.read(cx).is_local() {
162 match event {
163 worktree::Event::UpdatedEntries(changes) => {
164 this.local_worktree_entries_changed(&worktree, changes, cx);
165 }
166 worktree::Event::UpdatedGitRepositories(updated_repos) => {
167 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
168 }
169 _ => {}
170 }
171 }
172 })
173 .detach();
174 }
175
176 fn local_worktree_entries_changed(
177 &mut self,
178 worktree_handle: &Model<Worktree>,
179 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
180 cx: &mut ModelContext<Self>,
181 ) {
182 let snapshot = worktree_handle.read(cx).snapshot();
183 for (path, entry_id, _) in changes {
184 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
185 }
186 }
187
188 fn local_worktree_git_repos_changed(
189 &mut self,
190 worktree_handle: Model<Worktree>,
191 changed_repos: &UpdatedGitRepositoriesSet,
192 cx: &mut ModelContext<Self>,
193 ) {
194 debug_assert!(worktree_handle.read(cx).is_local());
195
196 // Identify the loading buffers whose containing repository that has changed.
197 let future_buffers = self
198 .loading_buffers()
199 .filter_map(|(project_path, receiver)| {
200 if project_path.worktree_id != worktree_handle.read(cx).id() {
201 return None;
202 }
203 let path = &project_path.path;
204 changed_repos
205 .iter()
206 .find(|(work_dir, _)| path.starts_with(work_dir))?;
207 let path = path.clone();
208 Some(async move {
209 Self::wait_for_loading_buffer(receiver)
210 .await
211 .ok()
212 .map(|buffer| (buffer, path))
213 })
214 })
215 .collect::<FuturesUnordered<_>>();
216
217 // Identify the current buffers whose containing repository has changed.
218 let current_buffers = self
219 .buffers()
220 .filter_map(|buffer| {
221 let file = File::from_dyn(buffer.read(cx).file())?;
222 if file.worktree != worktree_handle {
223 return None;
224 }
225 changed_repos
226 .iter()
227 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
228 Some((buffer, file.path.clone()))
229 })
230 .collect::<Vec<_>>();
231
232 if future_buffers.len() + current_buffers.len() == 0 {
233 return;
234 }
235
236 cx.spawn(move |this, mut cx| async move {
237 // Wait for all of the buffers to load.
238 let future_buffers = future_buffers.collect::<Vec<_>>().await;
239
240 // Reload the diff base for every buffer whose containing git repository has changed.
241 let snapshot =
242 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
243 let diff_bases_by_buffer = cx
244 .background_executor()
245 .spawn(async move {
246 let mut diff_base_tasks = future_buffers
247 .into_iter()
248 .flatten()
249 .chain(current_buffers)
250 .filter_map(|(buffer, path)| {
251 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
252 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
253 Some(async move {
254 let base_text =
255 local_repo_entry.repo().load_index_text(&relative_path);
256 Some((buffer, base_text))
257 })
258 })
259 .collect::<FuturesUnordered<_>>();
260
261 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
262 while let Some(diff_base) = diff_base_tasks.next().await {
263 if let Some(diff_base) = diff_base {
264 diff_bases.push(diff_base);
265 }
266 }
267 diff_bases
268 })
269 .await;
270
271 this.update(&mut cx, |this, cx| {
272 // Assign the new diff bases on all of the buffers.
273 for (buffer, diff_base) in diff_bases_by_buffer {
274 let buffer_id = buffer.update(cx, |buffer, cx| {
275 buffer.set_diff_base(diff_base.clone(), cx);
276 buffer.remote_id().to_proto()
277 });
278 if let Some(project_id) = this.remote_id {
279 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
280 proto::UpdateDiffBase {
281 project_id,
282 buffer_id,
283 diff_base,
284 }
285 .into_envelope(0, None, None),
286 )))
287 }
288 }
289 })
290 })
291 .detach_and_log_err(cx);
292 }
293
294 fn open_local_buffer_internal(
295 &mut self,
296 path: Arc<Path>,
297 worktree: Model<Worktree>,
298 cx: &mut ModelContext<Self>,
299 ) -> Task<Result<Model<Buffer>>> {
300 let load_buffer = worktree.update(cx, |worktree, cx| {
301 let load_file = worktree.load_file(path.as_ref(), cx);
302 let reservation = cx.reserve_model();
303 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
304 cx.spawn(move |_, mut cx| async move {
305 let loaded = load_file.await?;
306 let text_buffer = cx
307 .background_executor()
308 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
309 .await;
310 cx.insert_model(reservation, |_| {
311 Buffer::build(
312 text_buffer,
313 loaded.diff_base,
314 Some(loaded.file),
315 Capability::ReadWrite,
316 )
317 })
318 })
319 });
320
321 cx.spawn(move |this, mut cx| async move {
322 let buffer = match load_buffer.await {
323 Ok(buffer) => Ok(buffer),
324 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
325 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
326 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
327 Buffer::build(
328 text_buffer,
329 None,
330 Some(Arc::new(File {
331 worktree,
332 path,
333 mtime: None,
334 entry_id: None,
335 is_local: true,
336 is_deleted: false,
337 is_private: false,
338 })),
339 Capability::ReadWrite,
340 )
341 }),
342 Err(e) => Err(e),
343 }?;
344 this.update(&mut cx, |this, cx| {
345 this.add_buffer(buffer.clone(), cx).log_err();
346 })?;
347 Ok(buffer)
348 })
349 }
350
351 fn open_remote_buffer_internal(
352 &self,
353 path: &Arc<Path>,
354 worktree: &RemoteWorktree,
355 cx: &ModelContext<Self>,
356 ) -> Task<Result<Model<Buffer>>> {
357 let worktree_id = worktree.id().to_proto();
358 let project_id = worktree.project_id();
359 let client = worktree.client();
360 let path_string = path.clone().to_string_lossy().to_string();
361 cx.spawn(move |this, mut cx| async move {
362 let response = client
363 .request(proto::OpenBufferByPath {
364 project_id,
365 worktree_id,
366 path: path_string,
367 })
368 .await?;
369 let buffer_id = BufferId::new(response.buffer_id)?;
370 this.update(&mut cx, |this, cx| {
371 this.wait_for_remote_buffer(buffer_id, cx)
372 })?
373 .await
374 })
375 }
376
377 pub fn create_buffer(
378 &mut self,
379 remote_client: Option<(AnyProtoClient, u64)>,
380 cx: &mut ModelContext<Self>,
381 ) -> Task<Result<Model<Buffer>>> {
382 if let Some((remote_client, project_id)) = remote_client {
383 let create = remote_client.request(proto::OpenNewBuffer { project_id });
384 cx.spawn(|this, mut cx| async move {
385 let response = create.await?;
386 let buffer_id = BufferId::new(response.buffer_id)?;
387
388 this.update(&mut cx, |this, cx| {
389 this.wait_for_remote_buffer(buffer_id, cx)
390 })?
391 .await
392 })
393 } else {
394 Task::ready(Ok(self.create_local_buffer("", None, cx)))
395 }
396 }
397
398 pub fn create_local_buffer(
399 &mut self,
400 text: &str,
401 language: Option<Arc<Language>>,
402 cx: &mut ModelContext<Self>,
403 ) -> Model<Buffer> {
404 let buffer = cx.new_model(|cx| {
405 Buffer::local(text, cx)
406 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
407 });
408 self.add_buffer(buffer.clone(), cx).log_err();
409 buffer
410 }
411
412 pub fn save_buffer(
413 &mut self,
414 buffer: Model<Buffer>,
415 cx: &mut ModelContext<Self>,
416 ) -> Task<Result<()>> {
417 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
418 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
419 };
420 match file.worktree.read(cx) {
421 Worktree::Local(_) => {
422 self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
423 }
424 Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
425 }
426 }
427
428 pub fn save_buffer_as(
429 &mut self,
430 buffer: Model<Buffer>,
431 path: ProjectPath,
432 cx: &mut ModelContext<Self>,
433 ) -> Task<Result<()>> {
434 let Some(worktree) = self
435 .worktree_store
436 .read(cx)
437 .worktree_for_id(path.worktree_id, cx)
438 else {
439 return Task::ready(Err(anyhow!("no such worktree")));
440 };
441
442 let old_file = buffer.read(cx).file().cloned();
443
444 let task = match worktree.read(cx) {
445 Worktree::Local(_) => {
446 self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
447 }
448 Worktree::Remote(tree) => {
449 self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
450 }
451 };
452 cx.spawn(|this, mut cx| async move {
453 task.await?;
454 this.update(&mut cx, |_, cx| {
455 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
456 })
457 })
458 }
459
460 fn save_local_buffer(
461 &self,
462 worktree: Model<Worktree>,
463 buffer_handle: Model<Buffer>,
464 path: Arc<Path>,
465 mut has_changed_file: bool,
466 cx: &mut ModelContext<Self>,
467 ) -> Task<Result<()>> {
468 let buffer = buffer_handle.read(cx);
469 let text = buffer.as_rope().clone();
470 let line_ending = buffer.line_ending();
471 let version = buffer.version();
472 let buffer_id = buffer.remote_id();
473 if buffer.file().is_some_and(|file| !file.is_created()) {
474 has_changed_file = true;
475 }
476
477 let save = worktree.update(cx, |worktree, cx| {
478 worktree.write_file(path.as_ref(), text, line_ending, cx)
479 });
480
481 cx.spawn(move |this, mut cx| async move {
482 let new_file = save.await?;
483 let mtime = new_file.mtime;
484 this.update(&mut cx, |this, cx| {
485 if let Some(project_id) = this.remote_id {
486 if has_changed_file {
487 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
488 proto::UpdateBufferFile {
489 project_id,
490 buffer_id: buffer_id.to_proto(),
491 file: Some(language::File::to_proto(&*new_file, cx)),
492 }
493 .into_envelope(0, None, None),
494 )));
495 }
496 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
497 proto::BufferSaved {
498 project_id,
499 buffer_id: buffer_id.to_proto(),
500 version: serialize_version(&version),
501 mtime: mtime.map(|time| time.into()),
502 }
503 .into_envelope(0, None, None),
504 )));
505 }
506 })?;
507 buffer_handle.update(&mut cx, |buffer, cx| {
508 if has_changed_file {
509 buffer.file_updated(new_file, cx);
510 }
511 buffer.did_save(version.clone(), mtime, cx);
512 })
513 })
514 }
515
516 fn save_remote_buffer(
517 &self,
518 buffer_handle: Model<Buffer>,
519 new_path: Option<proto::ProjectPath>,
520 tree: &RemoteWorktree,
521 cx: &ModelContext<Self>,
522 ) -> Task<Result<()>> {
523 let buffer = buffer_handle.read(cx);
524 let buffer_id = buffer.remote_id().into();
525 let version = buffer.version();
526 let rpc = tree.client();
527 let project_id = tree.project_id();
528 cx.spawn(move |_, mut cx| async move {
529 let response = rpc
530 .request(proto::SaveBuffer {
531 project_id,
532 buffer_id,
533 new_path,
534 version: serialize_version(&version),
535 })
536 .await?;
537 let version = deserialize_version(&response.version);
538 let mtime = response.mtime.map(|mtime| mtime.into());
539
540 buffer_handle.update(&mut cx, |buffer, cx| {
541 buffer.did_save(version.clone(), mtime, cx);
542 })?;
543
544 Ok(())
545 })
546 }
547
548 pub fn blame_buffer(
549 &self,
550 buffer: &Model<Buffer>,
551 version: Option<clock::Global>,
552 cx: &AppContext,
553 ) -> Task<Result<Blame>> {
554 let buffer = buffer.read(cx);
555 let Some(file) = File::from_dyn(buffer.file()) else {
556 return Task::ready(Err(anyhow!("buffer has no file")));
557 };
558
559 match file.worktree.clone().read(cx) {
560 Worktree::Local(worktree) => {
561 let worktree = worktree.snapshot();
562 let blame_params = maybe!({
563 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
564 Some(repo_for_path) => repo_for_path,
565 None => anyhow::bail!(NoRepositoryError {}),
566 };
567
568 let relative_path = repo_entry
569 .relativize(&worktree, &file.path)
570 .context("failed to relativize buffer path")?;
571
572 let repo = local_repo_entry.repo().clone();
573
574 let content = match version {
575 Some(version) => buffer.rope_for_version(&version).clone(),
576 None => buffer.as_rope().clone(),
577 };
578
579 anyhow::Ok((repo, relative_path, content))
580 });
581
582 cx.background_executor().spawn(async move {
583 let (repo, relative_path, content) = blame_params?;
584 repo.blame(&relative_path, content)
585 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
586 })
587 }
588 Worktree::Remote(worktree) => {
589 let buffer_id = buffer.remote_id();
590 let version = buffer.version();
591 let project_id = worktree.project_id();
592 let client = worktree.client();
593 cx.spawn(|_| async move {
594 let response = client
595 .request(proto::BlameBuffer {
596 project_id,
597 buffer_id: buffer_id.into(),
598 version: serialize_version(&version),
599 })
600 .await?;
601 Ok(deserialize_blame_buffer_response(response))
602 })
603 }
604 }
605 }
606
607 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
608 let remote_id = buffer.read(cx).remote_id();
609 let is_remote = buffer.read(cx).replica_id() != 0;
610 let open_buffer = if self.remote_id.is_some() {
611 OpenBuffer::Strong(buffer.clone())
612 } else {
613 OpenBuffer::Weak(buffer.downgrade())
614 };
615
616 match self.opened_buffers.entry(remote_id) {
617 hash_map::Entry::Vacant(entry) => {
618 entry.insert(open_buffer);
619 }
620 hash_map::Entry::Occupied(mut entry) => {
621 if let OpenBuffer::Operations(operations) = entry.get_mut() {
622 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
623 } else if entry.get().upgrade().is_some() {
624 if is_remote {
625 return Ok(());
626 } else {
627 debug_panic!("buffer {} was already registered", remote_id);
628 Err(anyhow!("buffer {} was already registered", remote_id))?;
629 }
630 }
631 entry.insert(open_buffer);
632 }
633 }
634
635 if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
636 for sender in senders {
637 sender.send(Ok(buffer.clone())).ok();
638 }
639 }
640
641 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
642 if file.is_local {
643 self.local_buffer_ids_by_path.insert(
644 ProjectPath {
645 worktree_id: file.worktree_id(cx),
646 path: file.path.clone(),
647 },
648 remote_id,
649 );
650
651 if let Some(entry_id) = file.entry_id {
652 self.local_buffer_ids_by_entry_id
653 .insert(entry_id, remote_id);
654 }
655 }
656 }
657
658 cx.subscribe(&buffer, Self::on_buffer_event).detach();
659 cx.emit(BufferStoreEvent::BufferAdded(buffer));
660 Ok(())
661 }
662
663 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
664 self.opened_buffers
665 .values()
666 .filter_map(|buffer| buffer.upgrade())
667 }
668
669 pub fn loading_buffers(
670 &self,
671 ) -> impl Iterator<
672 Item = (
673 &ProjectPath,
674 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
675 ),
676 > {
677 self.loading_buffers_by_path
678 .iter()
679 .map(|(path, rx)| (path, rx.clone()))
680 }
681
682 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
683 self.buffers().find_map(|buffer| {
684 let file = File::from_dyn(buffer.read(cx).file())?;
685 if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
686 Some(buffer)
687 } else {
688 None
689 }
690 })
691 }
692
693 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
694 self.opened_buffers
695 .get(&buffer_id)
696 .and_then(|buffer| buffer.upgrade())
697 }
698
699 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
700 self.get(buffer_id)
701 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
702 }
703
704 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
705 self.get(buffer_id)
706 .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
707 }
708
709 pub fn wait_for_remote_buffer(
710 &mut self,
711 id: BufferId,
712 cx: &mut AppContext,
713 ) -> Task<Result<Model<Buffer>>> {
714 let buffer = self.get(id);
715 if let Some(buffer) = buffer {
716 return Task::ready(Ok(buffer));
717 }
718 let (tx, rx) = oneshot::channel();
719 self.remote_buffer_listeners.entry(id).or_default().push(tx);
720 cx.background_executor().spawn(async move { rx.await? })
721 }
722
723 pub fn buffer_version_info(
724 &self,
725 cx: &AppContext,
726 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
727 let buffers = self
728 .buffers()
729 .map(|buffer| {
730 let buffer = buffer.read(cx);
731 proto::BufferVersion {
732 id: buffer.remote_id().into(),
733 version: language::proto::serialize_version(&buffer.version),
734 }
735 })
736 .collect();
737 let incomplete_buffer_ids = self
738 .loading_remote_buffers_by_id
739 .keys()
740 .copied()
741 .collect::<Vec<_>>();
742 (buffers, incomplete_buffer_ids)
743 }
744
745 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
746 self.set_remote_id(None, cx);
747
748 for buffer in self.buffers() {
749 buffer.update(cx, |buffer, cx| {
750 buffer.set_capability(Capability::ReadOnly, cx)
751 });
752 }
753
754 // Wake up all futures currently waiting on a buffer to get opened,
755 // to give them a chance to fail now that we've disconnected.
756 self.remote_buffer_listeners.clear();
757 }
758
759 pub fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
760 self.remote_id = remote_id;
761 for open_buffer in self.opened_buffers.values_mut() {
762 if remote_id.is_some() {
763 if let OpenBuffer::Weak(buffer) = open_buffer {
764 if let Some(buffer) = buffer.upgrade() {
765 *open_buffer = OpenBuffer::Strong(buffer);
766 }
767 }
768 } else {
769 if let Some(buffer) = open_buffer.upgrade() {
770 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
771 }
772 if let OpenBuffer::Strong(buffer) = open_buffer {
773 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
774 }
775 }
776 }
777 }
778
779 pub fn discard_incomplete(&mut self) {
780 self.opened_buffers
781 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
782 }
783
784 pub fn find_search_candidates(
785 &mut self,
786 query: &SearchQuery,
787 mut limit: usize,
788 fs: Arc<dyn Fs>,
789 cx: &mut ModelContext<Self>,
790 ) -> Receiver<Model<Buffer>> {
791 let (tx, rx) = smol::channel::unbounded();
792 let mut open_buffers = HashSet::default();
793 let mut unnamed_buffers = Vec::new();
794 for handle in self.buffers() {
795 let buffer = handle.read(cx);
796 if let Some(entry_id) = buffer.entry_id(cx) {
797 open_buffers.insert(entry_id);
798 } else {
799 limit = limit.saturating_sub(1);
800 unnamed_buffers.push(handle)
801 };
802 }
803
804 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
805 let mut project_paths_rx = self
806 .worktree_store
807 .update(cx, |worktree_store, cx| {
808 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
809 })
810 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
811
812 cx.spawn(|this, mut cx| async move {
813 for buffer in unnamed_buffers {
814 tx.send(buffer).await.ok();
815 }
816
817 while let Some(project_paths) = project_paths_rx.next().await {
818 let buffers = this.update(&mut cx, |this, cx| {
819 project_paths
820 .into_iter()
821 .map(|project_path| this.open_buffer(project_path, cx))
822 .collect::<Vec<_>>()
823 })?;
824 for buffer_task in buffers {
825 if let Some(buffer) = buffer_task.await.log_err() {
826 if tx.send(buffer).await.is_err() {
827 return anyhow::Ok(());
828 }
829 }
830 }
831 }
832 anyhow::Ok(())
833 })
834 .detach();
835 rx
836 }
837
838 fn on_buffer_event(
839 &mut self,
840 buffer: Model<Buffer>,
841 event: &BufferEvent,
842 cx: &mut ModelContext<Self>,
843 ) {
844 match event {
845 BufferEvent::FileHandleChanged => {
846 self.buffer_changed_file(buffer, cx);
847 }
848 _ => {}
849 }
850 }
851
852 fn local_worktree_entry_changed(
853 &mut self,
854 entry_id: ProjectEntryId,
855 path: &Arc<Path>,
856 worktree: &Model<worktree::Worktree>,
857 snapshot: &worktree::Snapshot,
858 cx: &mut ModelContext<Self>,
859 ) -> Option<()> {
860 let project_path = ProjectPath {
861 worktree_id: snapshot.id(),
862 path: path.clone(),
863 };
864 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
865 Some(&buffer_id) => buffer_id,
866 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
867 };
868 let buffer = if let Some(buffer) = self.get(buffer_id) {
869 buffer
870 } else {
871 self.opened_buffers.remove(&buffer_id);
872 self.local_buffer_ids_by_path.remove(&project_path);
873 self.local_buffer_ids_by_entry_id.remove(&entry_id);
874 return None;
875 };
876
877 let events = buffer.update(cx, |buffer, cx| {
878 let file = buffer.file()?;
879 let old_file = File::from_dyn(Some(file))?;
880 if old_file.worktree != *worktree {
881 return None;
882 }
883
884 let new_file = if let Some(entry) = old_file
885 .entry_id
886 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
887 {
888 File {
889 is_local: true,
890 entry_id: Some(entry.id),
891 mtime: entry.mtime,
892 path: entry.path.clone(),
893 worktree: worktree.clone(),
894 is_deleted: false,
895 is_private: entry.is_private,
896 }
897 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
898 File {
899 is_local: true,
900 entry_id: Some(entry.id),
901 mtime: entry.mtime,
902 path: entry.path.clone(),
903 worktree: worktree.clone(),
904 is_deleted: false,
905 is_private: entry.is_private,
906 }
907 } else {
908 File {
909 is_local: true,
910 entry_id: old_file.entry_id,
911 path: old_file.path.clone(),
912 mtime: old_file.mtime,
913 worktree: worktree.clone(),
914 is_deleted: true,
915 is_private: old_file.is_private,
916 }
917 };
918
919 if new_file == *old_file {
920 return None;
921 }
922
923 let mut events = Vec::new();
924 if new_file.path != old_file.path {
925 self.local_buffer_ids_by_path.remove(&ProjectPath {
926 path: old_file.path.clone(),
927 worktree_id: old_file.worktree_id(cx),
928 });
929 self.local_buffer_ids_by_path.insert(
930 ProjectPath {
931 worktree_id: new_file.worktree_id(cx),
932 path: new_file.path.clone(),
933 },
934 buffer_id,
935 );
936 events.push(BufferStoreEvent::BufferChangedFilePath {
937 buffer: cx.handle(),
938 old_file: buffer.file().cloned(),
939 });
940 }
941
942 if new_file.entry_id != old_file.entry_id {
943 if let Some(entry_id) = old_file.entry_id {
944 self.local_buffer_ids_by_entry_id.remove(&entry_id);
945 }
946 if let Some(entry_id) = new_file.entry_id {
947 self.local_buffer_ids_by_entry_id
948 .insert(entry_id, buffer_id);
949 }
950 }
951
952 if let Some(project_id) = self.remote_id {
953 events.push(BufferStoreEvent::MessageToReplicas(Box::new(
954 proto::UpdateBufferFile {
955 project_id,
956 buffer_id: buffer_id.to_proto(),
957 file: Some(new_file.to_proto(cx)),
958 }
959 .into_envelope(0, None, None),
960 )))
961 }
962
963 buffer.file_updated(Arc::new(new_file), cx);
964 Some(events)
965 })?;
966
967 for event in events {
968 cx.emit(event);
969 }
970
971 None
972 }
973
974 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
975 let file = File::from_dyn(buffer.read(cx).file())?;
976
977 let remote_id = buffer.read(cx).remote_id();
978 if let Some(entry_id) = file.entry_id {
979 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
980 Some(_) => {
981 return None;
982 }
983 None => {
984 self.local_buffer_ids_by_entry_id
985 .insert(entry_id, remote_id);
986 }
987 }
988 };
989 self.local_buffer_ids_by_path.insert(
990 ProjectPath {
991 worktree_id: file.worktree_id(cx),
992 path: file.path.clone(),
993 },
994 remote_id,
995 );
996
997 Some(())
998 }
999
1000 pub async fn create_buffer_for_peer(
1001 this: Model<Self>,
1002 peer_id: PeerId,
1003 buffer_id: BufferId,
1004 project_id: u64,
1005 client: AnyProtoClient,
1006 cx: &mut AsyncAppContext,
1007 ) -> Result<()> {
1008 let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
1009 return Ok(());
1010 };
1011
1012 let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
1013 let operations = operations.await;
1014 let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
1015
1016 let initial_state = proto::CreateBufferForPeer {
1017 project_id,
1018 peer_id: Some(peer_id),
1019 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1020 };
1021
1022 if client.send(initial_state).log_err().is_some() {
1023 let client = client.clone();
1024 cx.background_executor()
1025 .spawn(async move {
1026 let mut chunks = split_operations(operations).peekable();
1027 while let Some(chunk) = chunks.next() {
1028 let is_last = chunks.peek().is_none();
1029 client.send(proto::CreateBufferForPeer {
1030 project_id,
1031 peer_id: Some(peer_id),
1032 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1033 proto::BufferChunk {
1034 buffer_id: buffer_id.into(),
1035 operations: chunk,
1036 is_last,
1037 },
1038 )),
1039 })?;
1040 }
1041 anyhow::Ok(())
1042 })
1043 .await
1044 .log_err();
1045 }
1046 Ok(())
1047 }
1048
1049 pub async fn handle_update_buffer(
1050 this: Model<Self>,
1051 envelope: TypedEnvelope<proto::UpdateBuffer>,
1052 mut cx: AsyncAppContext,
1053 ) -> Result<proto::Ack> {
1054 let payload = envelope.payload.clone();
1055 let buffer_id = BufferId::new(payload.buffer_id)?;
1056 let ops = payload
1057 .operations
1058 .into_iter()
1059 .map(language::proto::deserialize_operation)
1060 .collect::<Result<Vec<_>, _>>()?;
1061 this.update(&mut cx, |this, cx| {
1062 match this.opened_buffers.entry(buffer_id) {
1063 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1064 OpenBuffer::Strong(buffer) => {
1065 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1066 }
1067 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1068 OpenBuffer::Weak(_) => {}
1069 },
1070 hash_map::Entry::Vacant(e) => {
1071 e.insert(OpenBuffer::Operations(ops));
1072 }
1073 }
1074 Ok(proto::Ack {})
1075 })?
1076 }
1077
1078 pub fn handle_create_buffer_for_peer(
1079 &mut self,
1080 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1081 replica_id: u16,
1082 capability: Capability,
1083 cx: &mut ModelContext<Self>,
1084 ) -> Result<()> {
1085 match envelope
1086 .payload
1087 .variant
1088 .ok_or_else(|| anyhow!("missing variant"))?
1089 {
1090 proto::create_buffer_for_peer::Variant::State(mut state) => {
1091 let buffer_id = BufferId::new(state.id)?;
1092
1093 let buffer_result = maybe!({
1094 let mut buffer_file = None;
1095 if let Some(file) = state.file.take() {
1096 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1097 let worktree = self
1098 .worktree_store
1099 .read(cx)
1100 .worktree_for_id(worktree_id, cx)
1101 .ok_or_else(|| {
1102 anyhow!("no worktree found for id {}", file.worktree_id)
1103 })?;
1104 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1105 as Arc<dyn language::File>);
1106 }
1107 Buffer::from_proto(replica_id, capability, state, buffer_file)
1108 });
1109
1110 match buffer_result {
1111 Ok(buffer) => {
1112 let buffer = cx.new_model(|_| buffer);
1113 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1114 }
1115 Err(error) => {
1116 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1117 for listener in listeners {
1118 listener.send(Err(anyhow!(error.cloned()))).ok();
1119 }
1120 }
1121 }
1122 }
1123 }
1124 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1125 let buffer_id = BufferId::new(chunk.buffer_id)?;
1126 let buffer = self
1127 .loading_remote_buffers_by_id
1128 .get(&buffer_id)
1129 .cloned()
1130 .ok_or_else(|| {
1131 anyhow!(
1132 "received chunk for buffer {} without initial state",
1133 chunk.buffer_id
1134 )
1135 })?;
1136
1137 let result = maybe!({
1138 let operations = chunk
1139 .operations
1140 .into_iter()
1141 .map(language::proto::deserialize_operation)
1142 .collect::<Result<Vec<_>>>()?;
1143 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1144 });
1145
1146 if let Err(error) = result {
1147 self.loading_remote_buffers_by_id.remove(&buffer_id);
1148 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1149 for listener in listeners {
1150 listener.send(Err(error.cloned())).ok();
1151 }
1152 }
1153 } else if chunk.is_last {
1154 self.loading_remote_buffers_by_id.remove(&buffer_id);
1155 self.add_buffer(buffer, cx)?;
1156 }
1157 }
1158 }
1159
1160 Ok(())
1161 }
1162
1163 pub async fn handle_update_buffer_file(
1164 this: Model<Self>,
1165 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1166 mut cx: AsyncAppContext,
1167 ) -> Result<()> {
1168 let buffer_id = envelope.payload.buffer_id;
1169 let buffer_id = BufferId::new(buffer_id)?;
1170
1171 this.update(&mut cx, |this, cx| {
1172 let payload = envelope.payload.clone();
1173 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1174 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1175 let worktree = this
1176 .worktree_store
1177 .read(cx)
1178 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1179 .ok_or_else(|| anyhow!("no such worktree"))?;
1180 let file = File::from_proto(file, worktree, cx)?;
1181 let old_file = buffer.update(cx, |buffer, cx| {
1182 let old_file = buffer.file().cloned();
1183 let new_path = file.path.clone();
1184 buffer.file_updated(Arc::new(file), cx);
1185 if old_file
1186 .as_ref()
1187 .map_or(true, |old| *old.path() != new_path)
1188 {
1189 Some(old_file)
1190 } else {
1191 None
1192 }
1193 });
1194 if let Some(old_file) = old_file {
1195 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1196 }
1197 }
1198 Ok(())
1199 })?
1200 }
1201
1202 pub async fn handle_update_diff_base(
1203 this: Model<Self>,
1204 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1205 mut cx: AsyncAppContext,
1206 ) -> Result<()> {
1207 this.update(&mut cx, |this, cx| {
1208 let buffer_id = envelope.payload.buffer_id;
1209 let buffer_id = BufferId::new(buffer_id)?;
1210 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1211 buffer.update(cx, |buffer, cx| {
1212 buffer.set_diff_base(envelope.payload.diff_base, cx)
1213 });
1214 }
1215 Ok(())
1216 })?
1217 }
1218
1219 pub async fn handle_save_buffer(
1220 this: Model<Self>,
1221 envelope: TypedEnvelope<proto::SaveBuffer>,
1222 mut cx: AsyncAppContext,
1223 ) -> Result<proto::BufferSaved> {
1224 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1225 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1226 anyhow::Ok((
1227 this.get_existing(buffer_id)?,
1228 this.remote_id.context("project is not shared")?,
1229 ))
1230 })??;
1231 buffer
1232 .update(&mut cx, |buffer, _| {
1233 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1234 })?
1235 .await?;
1236 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1237
1238 if let Some(new_path) = envelope.payload.new_path {
1239 let new_path = ProjectPath::from_proto(new_path);
1240 this.update(&mut cx, |this, cx| {
1241 this.save_buffer_as(buffer.clone(), new_path, cx)
1242 })?
1243 .await?;
1244 } else {
1245 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1246 .await?;
1247 }
1248
1249 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1250 project_id,
1251 buffer_id: buffer_id.into(),
1252 version: serialize_version(buffer.saved_version()),
1253 mtime: buffer.saved_mtime().map(|time| time.into()),
1254 })
1255 }
1256
1257 pub async fn handle_buffer_saved(
1258 this: Model<Self>,
1259 envelope: TypedEnvelope<proto::BufferSaved>,
1260 mut cx: AsyncAppContext,
1261 ) -> Result<()> {
1262 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1263 let version = deserialize_version(&envelope.payload.version);
1264 let mtime = envelope.payload.mtime.map(|time| time.into());
1265 this.update(&mut cx, |this, cx| {
1266 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1267 buffer.update(cx, |buffer, cx| {
1268 buffer.did_save(version, mtime, cx);
1269 });
1270 }
1271 })
1272 }
1273
1274 pub async fn handle_buffer_reloaded(
1275 this: Model<Self>,
1276 envelope: TypedEnvelope<proto::BufferReloaded>,
1277 mut cx: AsyncAppContext,
1278 ) -> Result<()> {
1279 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1280 let version = deserialize_version(&envelope.payload.version);
1281 let mtime = envelope.payload.mtime.map(|time| time.into());
1282 let line_ending = deserialize_line_ending(
1283 proto::LineEnding::from_i32(envelope.payload.line_ending)
1284 .ok_or_else(|| anyhow!("missing line ending"))?,
1285 );
1286 this.update(&mut cx, |this, cx| {
1287 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1288 buffer.update(cx, |buffer, cx| {
1289 buffer.did_reload(version, line_ending, mtime, cx);
1290 });
1291 }
1292 })
1293 }
1294
1295 pub async fn handle_blame_buffer(
1296 this: Model<Self>,
1297 envelope: TypedEnvelope<proto::BlameBuffer>,
1298 mut cx: AsyncAppContext,
1299 ) -> Result<proto::BlameBufferResponse> {
1300 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1301 let version = deserialize_version(&envelope.payload.version);
1302 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1303 buffer
1304 .update(&mut cx, |buffer, _| {
1305 buffer.wait_for_version(version.clone())
1306 })?
1307 .await?;
1308 let blame = this
1309 .update(&mut cx, |this, cx| {
1310 this.blame_buffer(&buffer, Some(version), cx)
1311 })?
1312 .await?;
1313 Ok(serialize_blame_buffer_response(blame))
1314 }
1315
1316 pub async fn wait_for_loading_buffer(
1317 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1318 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1319 loop {
1320 if let Some(result) = receiver.borrow().as_ref() {
1321 match result {
1322 Ok(buffer) => return Ok(buffer.to_owned()),
1323 Err(e) => return Err(e.to_owned()),
1324 }
1325 }
1326 receiver.next().await;
1327 }
1328 }
1329}
1330
1331impl OpenBuffer {
1332 fn upgrade(&self) -> Option<Model<Buffer>> {
1333 match self {
1334 OpenBuffer::Strong(handle) => Some(handle.clone()),
1335 OpenBuffer::Weak(handle) => handle.upgrade(),
1336 OpenBuffer::Operations(_) => None,
1337 }
1338 }
1339}
1340
1341fn is_not_found_error(error: &anyhow::Error) -> bool {
1342 error
1343 .root_cause()
1344 .downcast_ref::<io::Error>()
1345 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1346}
1347
1348fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1349 let entries = blame
1350 .entries
1351 .into_iter()
1352 .map(|entry| proto::BlameEntry {
1353 sha: entry.sha.as_bytes().into(),
1354 start_line: entry.range.start,
1355 end_line: entry.range.end,
1356 original_line_number: entry.original_line_number,
1357 author: entry.author.clone(),
1358 author_mail: entry.author_mail.clone(),
1359 author_time: entry.author_time,
1360 author_tz: entry.author_tz.clone(),
1361 committer: entry.committer.clone(),
1362 committer_mail: entry.committer_mail.clone(),
1363 committer_time: entry.committer_time,
1364 committer_tz: entry.committer_tz.clone(),
1365 summary: entry.summary.clone(),
1366 previous: entry.previous.clone(),
1367 filename: entry.filename.clone(),
1368 })
1369 .collect::<Vec<_>>();
1370
1371 let messages = blame
1372 .messages
1373 .into_iter()
1374 .map(|(oid, message)| proto::CommitMessage {
1375 oid: oid.as_bytes().into(),
1376 message,
1377 })
1378 .collect::<Vec<_>>();
1379
1380 let permalinks = blame
1381 .permalinks
1382 .into_iter()
1383 .map(|(oid, url)| proto::CommitPermalink {
1384 oid: oid.as_bytes().into(),
1385 permalink: url.to_string(),
1386 })
1387 .collect::<Vec<_>>();
1388
1389 proto::BlameBufferResponse {
1390 entries,
1391 messages,
1392 permalinks,
1393 remote_url: blame.remote_url,
1394 }
1395}
1396
1397fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1398 let entries = response
1399 .entries
1400 .into_iter()
1401 .filter_map(|entry| {
1402 Some(git::blame::BlameEntry {
1403 sha: git::Oid::from_bytes(&entry.sha).ok()?,
1404 range: entry.start_line..entry.end_line,
1405 original_line_number: entry.original_line_number,
1406 committer: entry.committer,
1407 committer_time: entry.committer_time,
1408 committer_tz: entry.committer_tz,
1409 committer_mail: entry.committer_mail,
1410 author: entry.author,
1411 author_mail: entry.author_mail,
1412 author_time: entry.author_time,
1413 author_tz: entry.author_tz,
1414 summary: entry.summary,
1415 previous: entry.previous,
1416 filename: entry.filename,
1417 })
1418 })
1419 .collect::<Vec<_>>();
1420
1421 let messages = response
1422 .messages
1423 .into_iter()
1424 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1425 .collect::<HashMap<_, _>>();
1426
1427 let permalinks = response
1428 .permalinks
1429 .into_iter()
1430 .filter_map(|permalink| {
1431 Some((
1432 git::Oid::from_bytes(&permalink.oid).ok()?,
1433 Url::from_str(&permalink.permalink).ok()?,
1434 ))
1435 })
1436 .collect::<HashMap<_, _>>();
1437
1438 Blame {
1439 entries,
1440 permalinks,
1441 messages,
1442 remote_url: response.remote_url,
1443 }
1444}