1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, NoRepositoryError, ProjectPath,
5};
6use anyhow::{anyhow, Context as _, Result};
7use collections::{hash_map, HashMap, HashSet};
8use fs::Fs;
9use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
10use git::blame::Blame;
11use gpui::{
12 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
13};
14use http_client::Url;
15use language::{
16 proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
17 Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
18};
19use rpc::{
20 proto::{self, AnyProtoClient, EnvelopedMessage, PeerId},
21 ErrorExt as _, TypedEnvelope,
22};
23use smol::channel::Receiver;
24use std::{io, path::Path, str::FromStr as _, sync::Arc};
25use text::BufferId;
26use util::{debug_panic, maybe, ResultExt as _};
27use worktree::{
28 File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
29 WorktreeId,
30};
31
32/// A set of open buffers.
33pub struct BufferStore {
34 remote_id: Option<u64>,
35 #[allow(unused)]
36 worktree_store: Model<WorktreeStore>,
37 opened_buffers: HashMap<BufferId, OpenBuffer>,
38 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
39 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
40 #[allow(clippy::type_complexity)]
41 loading_buffers_by_path: HashMap<
42 ProjectPath,
43 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
44 >,
45 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
46 remote_buffer_listeners:
47 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
48}
49
50enum OpenBuffer {
51 Strong(Model<Buffer>),
52 Weak(WeakModel<Buffer>),
53 Operations(Vec<Operation>),
54}
55
56pub enum BufferStoreEvent {
57 BufferAdded(Model<Buffer>),
58 BufferChangedFilePath {
59 buffer: Model<Buffer>,
60 old_file: Option<Arc<dyn language::File>>,
61 },
62 MessageToReplicas(Box<proto::Envelope>),
63}
64
65impl EventEmitter<BufferStoreEvent> for BufferStore {}
66
67impl BufferStore {
68 /// Creates a buffer store, optionally retaining its buffers.
69 ///
70 /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
71 /// and won't be released unless they are explicitly removed, or `retain_buffers`
72 /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
73 /// weak handles.
74 pub fn new(
75 worktree_store: Model<WorktreeStore>,
76 remote_id: Option<u64>,
77 cx: &mut ModelContext<Self>,
78 ) -> Self {
79 cx.subscribe(&worktree_store, |this, _, event, cx| match event {
80 WorktreeStoreEvent::WorktreeAdded(worktree) => {
81 this.subscribe_to_worktree(worktree, cx);
82 }
83 _ => {}
84 })
85 .detach();
86
87 Self {
88 remote_id,
89 worktree_store,
90 opened_buffers: Default::default(),
91 remote_buffer_listeners: Default::default(),
92 loading_remote_buffers_by_id: Default::default(),
93 local_buffer_ids_by_path: Default::default(),
94 local_buffer_ids_by_entry_id: Default::default(),
95 loading_buffers_by_path: Default::default(),
96 }
97 }
98
99 pub fn open_buffer(
100 &mut self,
101 project_path: ProjectPath,
102 cx: &mut ModelContext<Self>,
103 ) -> Task<Result<Model<Buffer>>> {
104 let existing_buffer = self.get_by_path(&project_path, cx);
105 if let Some(existing_buffer) = existing_buffer {
106 return Task::ready(Ok(existing_buffer));
107 }
108
109 let Some(worktree) = self
110 .worktree_store
111 .read(cx)
112 .worktree_for_id(project_path.worktree_id, cx)
113 else {
114 return Task::ready(Err(anyhow!("no such worktree")));
115 };
116
117 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
118 // If the given path is already being loaded, then wait for that existing
119 // task to complete and return the same buffer.
120 hash_map::Entry::Occupied(e) => e.get().clone(),
121
122 // Otherwise, record the fact that this path is now being loaded.
123 hash_map::Entry::Vacant(entry) => {
124 let (mut tx, rx) = postage::watch::channel();
125 entry.insert(rx.clone());
126
127 let project_path = project_path.clone();
128 let load_buffer = match worktree.read(cx) {
129 Worktree::Local(_) => {
130 self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
131 }
132 Worktree::Remote(tree) => {
133 self.open_remote_buffer_internal(&project_path.path, tree, cx)
134 }
135 };
136
137 cx.spawn(move |this, mut cx| async move {
138 let load_result = load_buffer.await;
139 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
140 // Record the fact that the buffer is no longer loading.
141 this.loading_buffers_by_path.remove(&project_path);
142 let buffer = load_result.map_err(Arc::new)?;
143 Ok(buffer)
144 })?);
145 anyhow::Ok(())
146 })
147 .detach();
148 rx
149 }
150 };
151
152 cx.background_executor().spawn(async move {
153 Self::wait_for_loading_buffer(loading_watch)
154 .await
155 .map_err(|e| e.cloned())
156 })
157 }
158
159 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
160 cx.subscribe(worktree, |this, worktree, event, cx| {
161 if worktree.read(cx).is_local() {
162 match event {
163 worktree::Event::UpdatedEntries(changes) => {
164 this.local_worktree_entries_changed(&worktree, changes, cx);
165 }
166 worktree::Event::UpdatedGitRepositories(updated_repos) => {
167 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
168 }
169 _ => {}
170 }
171 }
172 })
173 .detach();
174 }
175
176 fn local_worktree_entries_changed(
177 &mut self,
178 worktree_handle: &Model<Worktree>,
179 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
180 cx: &mut ModelContext<Self>,
181 ) {
182 let snapshot = worktree_handle.read(cx).snapshot();
183 for (path, entry_id, _) in changes {
184 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
185 }
186 }
187
188 fn local_worktree_git_repos_changed(
189 &mut self,
190 worktree_handle: Model<Worktree>,
191 changed_repos: &UpdatedGitRepositoriesSet,
192 cx: &mut ModelContext<Self>,
193 ) {
194 debug_assert!(worktree_handle.read(cx).is_local());
195
196 // Identify the loading buffers whose containing repository that has changed.
197 let future_buffers = self
198 .loading_buffers()
199 .filter_map(|(project_path, receiver)| {
200 if project_path.worktree_id != worktree_handle.read(cx).id() {
201 return None;
202 }
203 let path = &project_path.path;
204 changed_repos
205 .iter()
206 .find(|(work_dir, _)| path.starts_with(work_dir))?;
207 let path = path.clone();
208 Some(async move {
209 Self::wait_for_loading_buffer(receiver)
210 .await
211 .ok()
212 .map(|buffer| (buffer, path))
213 })
214 })
215 .collect::<FuturesUnordered<_>>();
216
217 // Identify the current buffers whose containing repository has changed.
218 let current_buffers = self
219 .buffers()
220 .filter_map(|buffer| {
221 let file = File::from_dyn(buffer.read(cx).file())?;
222 if file.worktree != worktree_handle {
223 return None;
224 }
225 changed_repos
226 .iter()
227 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
228 Some((buffer, file.path.clone()))
229 })
230 .collect::<Vec<_>>();
231
232 if future_buffers.len() + current_buffers.len() == 0 {
233 return;
234 }
235
236 cx.spawn(move |this, mut cx| async move {
237 // Wait for all of the buffers to load.
238 let future_buffers = future_buffers.collect::<Vec<_>>().await;
239
240 // Reload the diff base for every buffer whose containing git repository has changed.
241 let snapshot =
242 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
243 let diff_bases_by_buffer = cx
244 .background_executor()
245 .spawn(async move {
246 let mut diff_base_tasks = future_buffers
247 .into_iter()
248 .flatten()
249 .chain(current_buffers)
250 .filter_map(|(buffer, path)| {
251 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
252 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
253 Some(async move {
254 let base_text =
255 local_repo_entry.repo().load_index_text(&relative_path);
256 Some((buffer, base_text))
257 })
258 })
259 .collect::<FuturesUnordered<_>>();
260
261 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
262 while let Some(diff_base) = diff_base_tasks.next().await {
263 if let Some(diff_base) = diff_base {
264 diff_bases.push(diff_base);
265 }
266 }
267 diff_bases
268 })
269 .await;
270
271 this.update(&mut cx, |this, cx| {
272 // Assign the new diff bases on all of the buffers.
273 for (buffer, diff_base) in diff_bases_by_buffer {
274 let buffer_id = buffer.update(cx, |buffer, cx| {
275 buffer.set_diff_base(diff_base.clone(), cx);
276 buffer.remote_id().to_proto()
277 });
278 if let Some(project_id) = this.remote_id {
279 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
280 proto::UpdateDiffBase {
281 project_id,
282 buffer_id,
283 diff_base,
284 }
285 .into_envelope(0, None, None),
286 )))
287 }
288 }
289 })
290 })
291 .detach_and_log_err(cx);
292 }
293
294 fn open_local_buffer_internal(
295 &mut self,
296 path: Arc<Path>,
297 worktree: Model<Worktree>,
298 cx: &mut ModelContext<Self>,
299 ) -> Task<Result<Model<Buffer>>> {
300 let load_buffer = worktree.update(cx, |worktree, cx| {
301 let load_file = worktree.load_file(path.as_ref(), cx);
302 let reservation = cx.reserve_model();
303 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
304 cx.spawn(move |_, mut cx| async move {
305 let loaded = load_file.await?;
306 let text_buffer = cx
307 .background_executor()
308 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
309 .await;
310 cx.insert_model(reservation, |_| {
311 Buffer::build(
312 text_buffer,
313 loaded.diff_base,
314 Some(loaded.file),
315 Capability::ReadWrite,
316 )
317 })
318 })
319 });
320
321 cx.spawn(move |this, mut cx| async move {
322 let buffer = match load_buffer.await {
323 Ok(buffer) => Ok(buffer),
324 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
325 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
326 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
327 Buffer::build(
328 text_buffer,
329 None,
330 Some(Arc::new(File {
331 worktree,
332 path,
333 mtime: None,
334 entry_id: None,
335 is_local: true,
336 is_deleted: false,
337 is_private: false,
338 })),
339 Capability::ReadWrite,
340 )
341 }),
342 Err(e) => Err(e),
343 }?;
344 this.update(&mut cx, |this, cx| {
345 this.add_buffer(buffer.clone(), cx).log_err();
346 })?;
347 Ok(buffer)
348 })
349 }
350
351 fn open_remote_buffer_internal(
352 &self,
353 path: &Arc<Path>,
354 worktree: &RemoteWorktree,
355 cx: &ModelContext<Self>,
356 ) -> Task<Result<Model<Buffer>>> {
357 let worktree_id = worktree.id().to_proto();
358 let project_id = worktree.project_id();
359 let client = worktree.client();
360 let path_string = path.clone().to_string_lossy().to_string();
361 cx.spawn(move |this, mut cx| async move {
362 let response = client
363 .request(proto::OpenBufferByPath {
364 project_id,
365 worktree_id,
366 path: path_string,
367 })
368 .await?;
369 let buffer_id = BufferId::new(response.buffer_id)?;
370 this.update(&mut cx, |this, cx| {
371 this.wait_for_remote_buffer(buffer_id, cx)
372 })?
373 .await
374 })
375 }
376
377 pub fn create_buffer(
378 &mut self,
379 remote_client: Option<(AnyProtoClient, u64)>,
380 cx: &mut ModelContext<Self>,
381 ) -> Task<Result<Model<Buffer>>> {
382 if let Some((remote_client, project_id)) = remote_client {
383 let create = remote_client.request(proto::OpenNewBuffer { project_id });
384 cx.spawn(|this, mut cx| async move {
385 let response = create.await?;
386 let buffer_id = BufferId::new(response.buffer_id)?;
387
388 this.update(&mut cx, |this, cx| {
389 this.wait_for_remote_buffer(buffer_id, cx)
390 })?
391 .await
392 })
393 } else {
394 Task::ready(Ok(self.create_local_buffer("", None, cx)))
395 }
396 }
397
398 pub fn create_local_buffer(
399 &mut self,
400 text: &str,
401 language: Option<Arc<Language>>,
402 cx: &mut ModelContext<Self>,
403 ) -> Model<Buffer> {
404 let buffer = cx.new_model(|cx| {
405 Buffer::local(text, cx)
406 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
407 });
408 self.add_buffer(buffer.clone(), cx).log_err();
409 buffer
410 }
411
412 pub fn save_buffer(
413 &mut self,
414 buffer: Model<Buffer>,
415 cx: &mut ModelContext<Self>,
416 ) -> Task<Result<()>> {
417 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
418 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
419 };
420 match file.worktree.read(cx) {
421 Worktree::Local(_) => {
422 self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
423 }
424 Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
425 }
426 }
427
428 pub fn save_buffer_as(
429 &mut self,
430 buffer: Model<Buffer>,
431 path: ProjectPath,
432 cx: &mut ModelContext<Self>,
433 ) -> Task<Result<()>> {
434 let Some(worktree) = self
435 .worktree_store
436 .read(cx)
437 .worktree_for_id(path.worktree_id, cx)
438 else {
439 return Task::ready(Err(anyhow!("no such worktree")));
440 };
441
442 let old_file = buffer.read(cx).file().cloned();
443
444 let task = match worktree.read(cx) {
445 Worktree::Local(_) => {
446 self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
447 }
448 Worktree::Remote(tree) => {
449 self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
450 }
451 };
452 cx.spawn(|this, mut cx| async move {
453 task.await?;
454 this.update(&mut cx, |_, cx| {
455 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
456 })
457 })
458 }
459
460 fn save_local_buffer(
461 &self,
462 worktree: Model<Worktree>,
463 buffer_handle: Model<Buffer>,
464 path: Arc<Path>,
465 mut has_changed_file: bool,
466 cx: &mut ModelContext<Self>,
467 ) -> Task<Result<()>> {
468 let buffer = buffer_handle.read(cx);
469 let text = buffer.as_rope().clone();
470 let line_ending = buffer.line_ending();
471 let version = buffer.version();
472 let buffer_id = buffer.remote_id();
473 if buffer.file().is_some_and(|file| !file.is_created()) {
474 has_changed_file = true;
475 }
476
477 let save = worktree.update(cx, |worktree, cx| {
478 worktree.write_file(path.as_ref(), text, line_ending, cx)
479 });
480
481 cx.spawn(move |this, mut cx| async move {
482 let new_file = save.await?;
483 let mtime = new_file.mtime;
484 this.update(&mut cx, |this, cx| {
485 if let Some(project_id) = this.remote_id {
486 if has_changed_file {
487 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
488 proto::UpdateBufferFile {
489 project_id,
490 buffer_id: buffer_id.to_proto(),
491 file: Some(language::File::to_proto(&*new_file, cx)),
492 }
493 .into_envelope(0, None, None),
494 )));
495 }
496 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
497 proto::BufferSaved {
498 project_id,
499 buffer_id: buffer_id.to_proto(),
500 version: serialize_version(&version),
501 mtime: mtime.map(|time| time.into()),
502 }
503 .into_envelope(0, None, None),
504 )));
505 }
506 })?;
507 buffer_handle.update(&mut cx, |buffer, cx| {
508 if has_changed_file {
509 buffer.file_updated(new_file, cx);
510 }
511 buffer.did_save(version.clone(), mtime, cx);
512 })
513 })
514 }
515
516 fn save_remote_buffer(
517 &self,
518 buffer_handle: Model<Buffer>,
519 new_path: Option<proto::ProjectPath>,
520 tree: &RemoteWorktree,
521 cx: &ModelContext<Self>,
522 ) -> Task<Result<()>> {
523 let buffer = buffer_handle.read(cx);
524 let buffer_id = buffer.remote_id().into();
525 let version = buffer.version();
526 let rpc = tree.client();
527 let project_id = tree.project_id();
528 cx.spawn(move |_, mut cx| async move {
529 let response = rpc
530 .request(proto::SaveBuffer {
531 project_id,
532 buffer_id,
533 new_path,
534 version: serialize_version(&version),
535 })
536 .await?;
537 let version = deserialize_version(&response.version);
538 let mtime = response.mtime.map(|mtime| mtime.into());
539
540 buffer_handle.update(&mut cx, |buffer, cx| {
541 buffer.did_save(version.clone(), mtime, cx);
542 })?;
543
544 Ok(())
545 })
546 }
547
548 pub fn blame_buffer(
549 &self,
550 buffer: &Model<Buffer>,
551 version: Option<clock::Global>,
552 cx: &AppContext,
553 ) -> Task<Result<Blame>> {
554 let buffer = buffer.read(cx);
555 let Some(file) = File::from_dyn(buffer.file()) else {
556 return Task::ready(Err(anyhow!("buffer has no file")));
557 };
558
559 match file.worktree.clone().read(cx) {
560 Worktree::Local(worktree) => {
561 let worktree = worktree.snapshot();
562 let blame_params = maybe!({
563 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
564 Some(repo_for_path) => repo_for_path,
565 None => anyhow::bail!(NoRepositoryError {}),
566 };
567
568 let relative_path = repo_entry
569 .relativize(&worktree, &file.path)
570 .context("failed to relativize buffer path")?;
571
572 let repo = local_repo_entry.repo().clone();
573
574 let content = match version {
575 Some(version) => buffer.rope_for_version(&version).clone(),
576 None => buffer.as_rope().clone(),
577 };
578
579 anyhow::Ok((repo, relative_path, content))
580 });
581
582 cx.background_executor().spawn(async move {
583 let (repo, relative_path, content) = blame_params?;
584 repo.blame(&relative_path, content)
585 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
586 })
587 }
588 Worktree::Remote(worktree) => {
589 let buffer_id = buffer.remote_id();
590 let version = buffer.version();
591 let project_id = worktree.project_id();
592 let client = worktree.client();
593 cx.spawn(|_| async move {
594 let response = client
595 .request(proto::BlameBuffer {
596 project_id,
597 buffer_id: buffer_id.into(),
598 version: serialize_version(&version),
599 })
600 .await?;
601 Ok(deserialize_blame_buffer_response(response))
602 })
603 }
604 }
605 }
606
607 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
608 let remote_id = buffer.read(cx).remote_id();
609 let is_remote = buffer.read(cx).replica_id() != 0;
610 let open_buffer = if self.remote_id.is_some() {
611 OpenBuffer::Strong(buffer.clone())
612 } else {
613 OpenBuffer::Weak(buffer.downgrade())
614 };
615
616 match self.opened_buffers.entry(remote_id) {
617 hash_map::Entry::Vacant(entry) => {
618 entry.insert(open_buffer);
619 }
620 hash_map::Entry::Occupied(mut entry) => {
621 if let OpenBuffer::Operations(operations) = entry.get_mut() {
622 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
623 } else if entry.get().upgrade().is_some() {
624 if is_remote {
625 return Ok(());
626 } else {
627 debug_panic!("buffer {} was already registered", remote_id);
628 Err(anyhow!("buffer {} was already registered", remote_id))?;
629 }
630 }
631 entry.insert(open_buffer);
632 }
633 }
634
635 if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
636 for sender in senders {
637 sender.send(Ok(buffer.clone())).ok();
638 }
639 }
640
641 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
642 if file.is_local {
643 self.local_buffer_ids_by_path.insert(
644 ProjectPath {
645 worktree_id: file.worktree_id(cx),
646 path: file.path.clone(),
647 },
648 remote_id,
649 );
650
651 if let Some(entry_id) = file.entry_id {
652 self.local_buffer_ids_by_entry_id
653 .insert(entry_id, remote_id);
654 }
655 }
656 }
657
658 cx.subscribe(&buffer, Self::on_buffer_event).detach();
659 cx.emit(BufferStoreEvent::BufferAdded(buffer));
660 Ok(())
661 }
662
663 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
664 self.opened_buffers
665 .values()
666 .filter_map(|buffer| buffer.upgrade())
667 }
668
669 pub fn loading_buffers(
670 &self,
671 ) -> impl Iterator<
672 Item = (
673 &ProjectPath,
674 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
675 ),
676 > {
677 self.loading_buffers_by_path
678 .iter()
679 .map(|(path, rx)| (path, rx.clone()))
680 }
681
682 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
683 self.buffers().find_map(|buffer| {
684 let file = File::from_dyn(buffer.read(cx).file())?;
685 if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
686 Some(buffer)
687 } else {
688 None
689 }
690 })
691 }
692
693 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
694 self.opened_buffers
695 .get(&buffer_id)
696 .and_then(|buffer| buffer.upgrade())
697 }
698
699 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
700 self.get(buffer_id)
701 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
702 }
703
704 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
705 self.get(buffer_id)
706 .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
707 }
708
709 pub fn wait_for_remote_buffer(
710 &mut self,
711 id: BufferId,
712 cx: &mut AppContext,
713 ) -> Task<Result<Model<Buffer>>> {
714 let buffer = self.get(id);
715 if let Some(buffer) = buffer {
716 return Task::ready(Ok(buffer));
717 }
718 let (tx, rx) = oneshot::channel();
719 self.remote_buffer_listeners.entry(id).or_default().push(tx);
720 cx.background_executor().spawn(async move { rx.await? })
721 }
722
723 pub fn buffer_version_info(
724 &self,
725 cx: &AppContext,
726 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
727 let buffers = self
728 .buffers()
729 .map(|buffer| {
730 let buffer = buffer.read(cx);
731 proto::BufferVersion {
732 id: buffer.remote_id().into(),
733 version: language::proto::serialize_version(&buffer.version),
734 }
735 })
736 .collect();
737 let incomplete_buffer_ids = self
738 .loading_remote_buffers_by_id
739 .keys()
740 .copied()
741 .collect::<Vec<_>>();
742 (buffers, incomplete_buffer_ids)
743 }
744
745 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
746 self.set_remote_id(None, cx);
747
748 for buffer in self.buffers() {
749 buffer.update(cx, |buffer, cx| {
750 buffer.set_capability(Capability::ReadOnly, cx)
751 });
752 }
753
754 // Wake up all futures currently waiting on a buffer to get opened,
755 // to give them a chance to fail now that we've disconnected.
756 self.remote_buffer_listeners.clear();
757 }
758
759 pub fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
760 self.remote_id = remote_id;
761 for open_buffer in self.opened_buffers.values_mut() {
762 if remote_id.is_some() {
763 if let OpenBuffer::Weak(buffer) = open_buffer {
764 if let Some(buffer) = buffer.upgrade() {
765 *open_buffer = OpenBuffer::Strong(buffer);
766 }
767 }
768 } else {
769 if let Some(buffer) = open_buffer.upgrade() {
770 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
771 }
772 if let OpenBuffer::Strong(buffer) = open_buffer {
773 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
774 }
775 }
776 }
777 }
778
779 pub fn discard_incomplete(&mut self) {
780 self.opened_buffers
781 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
782 }
783
784 pub fn find_search_candidates(
785 &mut self,
786 query: &SearchQuery,
787 limit: usize,
788 fs: Arc<dyn Fs>,
789 cx: &mut ModelContext<Self>,
790 ) -> Receiver<Model<Buffer>> {
791 let (tx, rx) = smol::channel::unbounded();
792 let open_buffers = self.find_open_search_candidates(query, cx);
793 let skip_entries: HashSet<_> = open_buffers
794 .iter()
795 .filter_map(|buffer| buffer.read(cx).entry_id(cx))
796 .collect();
797
798 let limit = limit.saturating_sub(open_buffers.len());
799 for open_buffer in open_buffers {
800 tx.send_blocking(open_buffer).ok();
801 }
802
803 let match_rx = self.worktree_store.update(cx, |worktree_store, cx| {
804 worktree_store.find_search_candidates(query.clone(), limit, skip_entries, fs, cx)
805 });
806
807 const MAX_CONCURRENT_BUFFER_OPENS: usize = 8;
808
809 for _ in 0..MAX_CONCURRENT_BUFFER_OPENS {
810 let mut match_rx = match_rx.clone();
811 let tx = tx.clone();
812 cx.spawn(|this, mut cx| async move {
813 while let Some(project_path) = match_rx.next().await {
814 let buffer = this
815 .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))?
816 .await
817 .log_err();
818 if let Some(buffer) = buffer {
819 tx.send_blocking(buffer).ok();
820 }
821 }
822 anyhow::Ok(())
823 })
824 .detach();
825 }
826 rx
827 }
828
829 /// Returns open buffers filtered by filename
830 /// Does *not* check the buffer content, the caller must do that
831 fn find_open_search_candidates(
832 &self,
833 query: &SearchQuery,
834 cx: &ModelContext<Self>,
835 ) -> Vec<Model<Buffer>> {
836 let include_root = self
837 .worktree_store
838 .read(cx)
839 .visible_worktrees(cx)
840 .collect::<Vec<_>>()
841 .len()
842 > 1;
843 self.buffers()
844 .filter_map(|buffer| {
845 let handle = buffer.clone();
846 buffer.read_with(cx, |buffer, cx| {
847 let worktree_store = self.worktree_store.read(cx);
848 let entry_id = buffer.entry_id(cx);
849 let is_ignored = entry_id
850 .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
851 .map_or(false, |entry| entry.is_ignored);
852
853 if is_ignored && !query.include_ignored() {
854 return None;
855 }
856 if let Some(file) = buffer.file() {
857 let matched_path = if include_root {
858 query.file_matches(Some(&file.full_path(cx)))
859 } else {
860 query.file_matches(Some(file.path()))
861 };
862
863 if matched_path {
864 Some(handle)
865 } else {
866 None
867 }
868 } else {
869 Some(handle)
870 }
871 })
872 })
873 .collect()
874 }
875
876 fn on_buffer_event(
877 &mut self,
878 buffer: Model<Buffer>,
879 event: &BufferEvent,
880 cx: &mut ModelContext<Self>,
881 ) {
882 match event {
883 BufferEvent::FileHandleChanged => {
884 self.buffer_changed_file(buffer, cx);
885 }
886 _ => {}
887 }
888 }
889
890 fn local_worktree_entry_changed(
891 &mut self,
892 entry_id: ProjectEntryId,
893 path: &Arc<Path>,
894 worktree: &Model<worktree::Worktree>,
895 snapshot: &worktree::Snapshot,
896 cx: &mut ModelContext<Self>,
897 ) -> Option<()> {
898 let project_path = ProjectPath {
899 worktree_id: snapshot.id(),
900 path: path.clone(),
901 };
902 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
903 Some(&buffer_id) => buffer_id,
904 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
905 };
906 let buffer = if let Some(buffer) = self.get(buffer_id) {
907 buffer
908 } else {
909 self.opened_buffers.remove(&buffer_id);
910 self.local_buffer_ids_by_path.remove(&project_path);
911 self.local_buffer_ids_by_entry_id.remove(&entry_id);
912 return None;
913 };
914
915 let events = buffer.update(cx, |buffer, cx| {
916 let file = buffer.file()?;
917 let old_file = File::from_dyn(Some(file))?;
918 if old_file.worktree != *worktree {
919 return None;
920 }
921
922 let new_file = if let Some(entry) = old_file
923 .entry_id
924 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
925 {
926 File {
927 is_local: true,
928 entry_id: Some(entry.id),
929 mtime: entry.mtime,
930 path: entry.path.clone(),
931 worktree: worktree.clone(),
932 is_deleted: false,
933 is_private: entry.is_private,
934 }
935 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
936 File {
937 is_local: true,
938 entry_id: Some(entry.id),
939 mtime: entry.mtime,
940 path: entry.path.clone(),
941 worktree: worktree.clone(),
942 is_deleted: false,
943 is_private: entry.is_private,
944 }
945 } else {
946 File {
947 is_local: true,
948 entry_id: old_file.entry_id,
949 path: old_file.path.clone(),
950 mtime: old_file.mtime,
951 worktree: worktree.clone(),
952 is_deleted: true,
953 is_private: old_file.is_private,
954 }
955 };
956
957 if new_file == *old_file {
958 return None;
959 }
960
961 let mut events = Vec::new();
962 if new_file.path != old_file.path {
963 self.local_buffer_ids_by_path.remove(&ProjectPath {
964 path: old_file.path.clone(),
965 worktree_id: old_file.worktree_id(cx),
966 });
967 self.local_buffer_ids_by_path.insert(
968 ProjectPath {
969 worktree_id: new_file.worktree_id(cx),
970 path: new_file.path.clone(),
971 },
972 buffer_id,
973 );
974 events.push(BufferStoreEvent::BufferChangedFilePath {
975 buffer: cx.handle(),
976 old_file: buffer.file().cloned(),
977 });
978 }
979
980 if new_file.entry_id != old_file.entry_id {
981 if let Some(entry_id) = old_file.entry_id {
982 self.local_buffer_ids_by_entry_id.remove(&entry_id);
983 }
984 if let Some(entry_id) = new_file.entry_id {
985 self.local_buffer_ids_by_entry_id
986 .insert(entry_id, buffer_id);
987 }
988 }
989
990 if let Some(project_id) = self.remote_id {
991 events.push(BufferStoreEvent::MessageToReplicas(Box::new(
992 proto::UpdateBufferFile {
993 project_id,
994 buffer_id: buffer_id.to_proto(),
995 file: Some(new_file.to_proto(cx)),
996 }
997 .into_envelope(0, None, None),
998 )))
999 }
1000
1001 buffer.file_updated(Arc::new(new_file), cx);
1002 Some(events)
1003 })?;
1004
1005 for event in events {
1006 cx.emit(event);
1007 }
1008
1009 None
1010 }
1011
1012 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
1013 let file = File::from_dyn(buffer.read(cx).file())?;
1014
1015 let remote_id = buffer.read(cx).remote_id();
1016 if let Some(entry_id) = file.entry_id {
1017 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1018 Some(_) => {
1019 return None;
1020 }
1021 None => {
1022 self.local_buffer_ids_by_entry_id
1023 .insert(entry_id, remote_id);
1024 }
1025 }
1026 };
1027 self.local_buffer_ids_by_path.insert(
1028 ProjectPath {
1029 worktree_id: file.worktree_id(cx),
1030 path: file.path.clone(),
1031 },
1032 remote_id,
1033 );
1034
1035 Some(())
1036 }
1037
1038 pub async fn create_buffer_for_peer(
1039 this: Model<Self>,
1040 peer_id: PeerId,
1041 buffer_id: BufferId,
1042 project_id: u64,
1043 client: AnyProtoClient,
1044 cx: &mut AsyncAppContext,
1045 ) -> Result<()> {
1046 let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
1047 return Ok(());
1048 };
1049
1050 let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
1051 let operations = operations.await;
1052 let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
1053
1054 let initial_state = proto::CreateBufferForPeer {
1055 project_id,
1056 peer_id: Some(peer_id),
1057 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1058 };
1059
1060 if client.send(initial_state).log_err().is_some() {
1061 let client = client.clone();
1062 cx.background_executor()
1063 .spawn(async move {
1064 let mut chunks = split_operations(operations).peekable();
1065 while let Some(chunk) = chunks.next() {
1066 let is_last = chunks.peek().is_none();
1067 client.send(proto::CreateBufferForPeer {
1068 project_id,
1069 peer_id: Some(peer_id),
1070 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1071 proto::BufferChunk {
1072 buffer_id: buffer_id.into(),
1073 operations: chunk,
1074 is_last,
1075 },
1076 )),
1077 })?;
1078 }
1079 anyhow::Ok(())
1080 })
1081 .await
1082 .log_err();
1083 }
1084 Ok(())
1085 }
1086
1087 pub async fn handle_update_buffer(
1088 this: Model<Self>,
1089 envelope: TypedEnvelope<proto::UpdateBuffer>,
1090 mut cx: AsyncAppContext,
1091 ) -> Result<proto::Ack> {
1092 let payload = envelope.payload.clone();
1093 let buffer_id = BufferId::new(payload.buffer_id)?;
1094 let ops = payload
1095 .operations
1096 .into_iter()
1097 .map(language::proto::deserialize_operation)
1098 .collect::<Result<Vec<_>, _>>()?;
1099 this.update(&mut cx, |this, cx| {
1100 match this.opened_buffers.entry(buffer_id) {
1101 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1102 OpenBuffer::Strong(buffer) => {
1103 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1104 }
1105 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1106 OpenBuffer::Weak(_) => {}
1107 },
1108 hash_map::Entry::Vacant(e) => {
1109 e.insert(OpenBuffer::Operations(ops));
1110 }
1111 }
1112 Ok(proto::Ack {})
1113 })?
1114 }
1115
1116 pub fn handle_create_buffer_for_peer(
1117 &mut self,
1118 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1119 replica_id: u16,
1120 capability: Capability,
1121 cx: &mut ModelContext<Self>,
1122 ) -> Result<()> {
1123 match envelope
1124 .payload
1125 .variant
1126 .ok_or_else(|| anyhow!("missing variant"))?
1127 {
1128 proto::create_buffer_for_peer::Variant::State(mut state) => {
1129 let buffer_id = BufferId::new(state.id)?;
1130
1131 let buffer_result = maybe!({
1132 let mut buffer_file = None;
1133 if let Some(file) = state.file.take() {
1134 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1135 let worktree = self
1136 .worktree_store
1137 .read(cx)
1138 .worktree_for_id(worktree_id, cx)
1139 .ok_or_else(|| {
1140 anyhow!("no worktree found for id {}", file.worktree_id)
1141 })?;
1142 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1143 as Arc<dyn language::File>);
1144 }
1145 Buffer::from_proto(replica_id, capability, state, buffer_file)
1146 });
1147
1148 match buffer_result {
1149 Ok(buffer) => {
1150 let buffer = cx.new_model(|_| buffer);
1151 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1152 }
1153 Err(error) => {
1154 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1155 for listener in listeners {
1156 listener.send(Err(anyhow!(error.cloned()))).ok();
1157 }
1158 }
1159 }
1160 }
1161 }
1162 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1163 let buffer_id = BufferId::new(chunk.buffer_id)?;
1164 let buffer = self
1165 .loading_remote_buffers_by_id
1166 .get(&buffer_id)
1167 .cloned()
1168 .ok_or_else(|| {
1169 anyhow!(
1170 "received chunk for buffer {} without initial state",
1171 chunk.buffer_id
1172 )
1173 })?;
1174
1175 let result = maybe!({
1176 let operations = chunk
1177 .operations
1178 .into_iter()
1179 .map(language::proto::deserialize_operation)
1180 .collect::<Result<Vec<_>>>()?;
1181 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1182 });
1183
1184 if let Err(error) = result {
1185 self.loading_remote_buffers_by_id.remove(&buffer_id);
1186 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1187 for listener in listeners {
1188 listener.send(Err(error.cloned())).ok();
1189 }
1190 }
1191 } else if chunk.is_last {
1192 self.loading_remote_buffers_by_id.remove(&buffer_id);
1193 self.add_buffer(buffer, cx)?;
1194 }
1195 }
1196 }
1197
1198 Ok(())
1199 }
1200
1201 pub async fn handle_update_buffer_file(
1202 this: Model<Self>,
1203 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1204 mut cx: AsyncAppContext,
1205 ) -> Result<()> {
1206 let buffer_id = envelope.payload.buffer_id;
1207 let buffer_id = BufferId::new(buffer_id)?;
1208
1209 this.update(&mut cx, |this, cx| {
1210 let payload = envelope.payload.clone();
1211 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1212 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1213 let worktree = this
1214 .worktree_store
1215 .read(cx)
1216 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1217 .ok_or_else(|| anyhow!("no such worktree"))?;
1218 let file = File::from_proto(file, worktree, cx)?;
1219 let old_file = buffer.update(cx, |buffer, cx| {
1220 let old_file = buffer.file().cloned();
1221 let new_path = file.path.clone();
1222 buffer.file_updated(Arc::new(file), cx);
1223 if old_file
1224 .as_ref()
1225 .map_or(true, |old| *old.path() != new_path)
1226 {
1227 Some(old_file)
1228 } else {
1229 None
1230 }
1231 });
1232 if let Some(old_file) = old_file {
1233 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1234 }
1235 }
1236 Ok(())
1237 })?
1238 }
1239
1240 pub async fn handle_update_diff_base(
1241 this: Model<Self>,
1242 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1243 mut cx: AsyncAppContext,
1244 ) -> Result<()> {
1245 this.update(&mut cx, |this, cx| {
1246 let buffer_id = envelope.payload.buffer_id;
1247 let buffer_id = BufferId::new(buffer_id)?;
1248 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1249 buffer.update(cx, |buffer, cx| {
1250 buffer.set_diff_base(envelope.payload.diff_base, cx)
1251 });
1252 }
1253 Ok(())
1254 })?
1255 }
1256
1257 pub async fn handle_save_buffer(
1258 this: Model<Self>,
1259 envelope: TypedEnvelope<proto::SaveBuffer>,
1260 mut cx: AsyncAppContext,
1261 ) -> Result<proto::BufferSaved> {
1262 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1263 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1264 anyhow::Ok((
1265 this.get_existing(buffer_id)?,
1266 this.remote_id.context("project is not shared")?,
1267 ))
1268 })??;
1269 buffer
1270 .update(&mut cx, |buffer, _| {
1271 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1272 })?
1273 .await?;
1274 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1275
1276 if let Some(new_path) = envelope.payload.new_path {
1277 let new_path = ProjectPath::from_proto(new_path);
1278 this.update(&mut cx, |this, cx| {
1279 this.save_buffer_as(buffer.clone(), new_path, cx)
1280 })?
1281 .await?;
1282 } else {
1283 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1284 .await?;
1285 }
1286
1287 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1288 project_id,
1289 buffer_id: buffer_id.into(),
1290 version: serialize_version(buffer.saved_version()),
1291 mtime: buffer.saved_mtime().map(|time| time.into()),
1292 })
1293 }
1294
1295 pub async fn handle_buffer_saved(
1296 this: Model<Self>,
1297 envelope: TypedEnvelope<proto::BufferSaved>,
1298 mut cx: AsyncAppContext,
1299 ) -> Result<()> {
1300 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1301 let version = deserialize_version(&envelope.payload.version);
1302 let mtime = envelope.payload.mtime.map(|time| time.into());
1303 this.update(&mut cx, |this, cx| {
1304 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1305 buffer.update(cx, |buffer, cx| {
1306 buffer.did_save(version, mtime, cx);
1307 });
1308 }
1309 })
1310 }
1311
1312 pub async fn handle_buffer_reloaded(
1313 this: Model<Self>,
1314 envelope: TypedEnvelope<proto::BufferReloaded>,
1315 mut cx: AsyncAppContext,
1316 ) -> Result<()> {
1317 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1318 let version = deserialize_version(&envelope.payload.version);
1319 let mtime = envelope.payload.mtime.map(|time| time.into());
1320 let line_ending = deserialize_line_ending(
1321 proto::LineEnding::from_i32(envelope.payload.line_ending)
1322 .ok_or_else(|| anyhow!("missing line ending"))?,
1323 );
1324 this.update(&mut cx, |this, cx| {
1325 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1326 buffer.update(cx, |buffer, cx| {
1327 buffer.did_reload(version, line_ending, mtime, cx);
1328 });
1329 }
1330 })
1331 }
1332
1333 pub async fn handle_blame_buffer(
1334 this: Model<Self>,
1335 envelope: TypedEnvelope<proto::BlameBuffer>,
1336 mut cx: AsyncAppContext,
1337 ) -> Result<proto::BlameBufferResponse> {
1338 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1339 let version = deserialize_version(&envelope.payload.version);
1340 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1341 buffer
1342 .update(&mut cx, |buffer, _| {
1343 buffer.wait_for_version(version.clone())
1344 })?
1345 .await?;
1346 let blame = this
1347 .update(&mut cx, |this, cx| {
1348 this.blame_buffer(&buffer, Some(version), cx)
1349 })?
1350 .await?;
1351 Ok(serialize_blame_buffer_response(blame))
1352 }
1353
1354 pub async fn wait_for_loading_buffer(
1355 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1356 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1357 loop {
1358 if let Some(result) = receiver.borrow().as_ref() {
1359 match result {
1360 Ok(buffer) => return Ok(buffer.to_owned()),
1361 Err(e) => return Err(e.to_owned()),
1362 }
1363 }
1364 receiver.next().await;
1365 }
1366 }
1367}
1368
1369impl OpenBuffer {
1370 fn upgrade(&self) -> Option<Model<Buffer>> {
1371 match self {
1372 OpenBuffer::Strong(handle) => Some(handle.clone()),
1373 OpenBuffer::Weak(handle) => handle.upgrade(),
1374 OpenBuffer::Operations(_) => None,
1375 }
1376 }
1377}
1378
1379fn is_not_found_error(error: &anyhow::Error) -> bool {
1380 error
1381 .root_cause()
1382 .downcast_ref::<io::Error>()
1383 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1384}
1385
1386fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1387 let entries = blame
1388 .entries
1389 .into_iter()
1390 .map(|entry| proto::BlameEntry {
1391 sha: entry.sha.as_bytes().into(),
1392 start_line: entry.range.start,
1393 end_line: entry.range.end,
1394 original_line_number: entry.original_line_number,
1395 author: entry.author.clone(),
1396 author_mail: entry.author_mail.clone(),
1397 author_time: entry.author_time,
1398 author_tz: entry.author_tz.clone(),
1399 committer: entry.committer.clone(),
1400 committer_mail: entry.committer_mail.clone(),
1401 committer_time: entry.committer_time,
1402 committer_tz: entry.committer_tz.clone(),
1403 summary: entry.summary.clone(),
1404 previous: entry.previous.clone(),
1405 filename: entry.filename.clone(),
1406 })
1407 .collect::<Vec<_>>();
1408
1409 let messages = blame
1410 .messages
1411 .into_iter()
1412 .map(|(oid, message)| proto::CommitMessage {
1413 oid: oid.as_bytes().into(),
1414 message,
1415 })
1416 .collect::<Vec<_>>();
1417
1418 let permalinks = blame
1419 .permalinks
1420 .into_iter()
1421 .map(|(oid, url)| proto::CommitPermalink {
1422 oid: oid.as_bytes().into(),
1423 permalink: url.to_string(),
1424 })
1425 .collect::<Vec<_>>();
1426
1427 proto::BlameBufferResponse {
1428 entries,
1429 messages,
1430 permalinks,
1431 remote_url: blame.remote_url,
1432 }
1433}
1434
1435fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1436 let entries = response
1437 .entries
1438 .into_iter()
1439 .filter_map(|entry| {
1440 Some(git::blame::BlameEntry {
1441 sha: git::Oid::from_bytes(&entry.sha).ok()?,
1442 range: entry.start_line..entry.end_line,
1443 original_line_number: entry.original_line_number,
1444 committer: entry.committer,
1445 committer_time: entry.committer_time,
1446 committer_tz: entry.committer_tz,
1447 committer_mail: entry.committer_mail,
1448 author: entry.author,
1449 author_mail: entry.author_mail,
1450 author_time: entry.author_time,
1451 author_tz: entry.author_tz,
1452 summary: entry.summary,
1453 previous: entry.previous,
1454 filename: entry.filename,
1455 })
1456 })
1457 .collect::<Vec<_>>();
1458
1459 let messages = response
1460 .messages
1461 .into_iter()
1462 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1463 .collect::<HashMap<_, _>>();
1464
1465 let permalinks = response
1466 .permalinks
1467 .into_iter()
1468 .filter_map(|permalink| {
1469 Some((
1470 git::Oid::from_bytes(&permalink.oid).ok()?,
1471 Url::from_str(&permalink.permalink).ok()?,
1472 ))
1473 })
1474 .collect::<HashMap<_, _>>();
1475
1476 Blame {
1477 entries,
1478 permalinks,
1479 messages,
1480 remote_url: response.remote_url,
1481 }
1482}