buffer_store.rs

   1use crate::{
   2    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   3    NoRepositoryError, ProjectPath,
   4};
   5use anyhow::{anyhow, Context as _, Result};
   6use collections::{hash_map, HashMap};
   7use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
   8use git::blame::Blame;
   9use gpui::{
  10    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
  11};
  12use http_client::Url;
  13use language::{
  14    proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
  15    Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
  16};
  17use rpc::{
  18    proto::{self, AnyProtoClient, EnvelopedMessage, PeerId},
  19    ErrorExt as _, TypedEnvelope,
  20};
  21use std::{io, path::Path, str::FromStr as _, sync::Arc};
  22use text::BufferId;
  23use util::{debug_panic, maybe, ResultExt as _};
  24use worktree::{
  25    File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
  26    WorktreeId,
  27};
  28
  29/// A set of open buffers.
  30pub struct BufferStore {
  31    remote_id: Option<u64>,
  32    #[allow(unused)]
  33    worktree_store: Model<WorktreeStore>,
  34    opened_buffers: HashMap<BufferId, OpenBuffer>,
  35    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  36    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  37    #[allow(clippy::type_complexity)]
  38    loading_buffers_by_path: HashMap<
  39        ProjectPath,
  40        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  41    >,
  42    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  43    remote_buffer_listeners:
  44        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  45}
  46
  47enum OpenBuffer {
  48    Strong(Model<Buffer>),
  49    Weak(WeakModel<Buffer>),
  50    Operations(Vec<Operation>),
  51}
  52
  53pub enum BufferStoreEvent {
  54    BufferAdded(Model<Buffer>),
  55    BufferChangedFilePath {
  56        buffer: Model<Buffer>,
  57        old_file: Option<Arc<dyn language::File>>,
  58    },
  59    MessageToReplicas(Box<proto::Envelope>),
  60}
  61
  62impl EventEmitter<BufferStoreEvent> for BufferStore {}
  63
  64impl BufferStore {
  65    /// Creates a buffer store, optionally retaining its buffers.
  66    ///
  67    /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
  68    /// and won't be released unless they are explicitly removed, or `retain_buffers`
  69    /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
  70    /// weak handles.
  71    pub fn new(
  72        worktree_store: Model<WorktreeStore>,
  73        remote_id: Option<u64>,
  74        cx: &mut ModelContext<Self>,
  75    ) -> Self {
  76        cx.subscribe(&worktree_store, |this, _, event, cx| match event {
  77            WorktreeStoreEvent::WorktreeAdded(worktree) => {
  78                this.subscribe_to_worktree(worktree, cx);
  79            }
  80            _ => {}
  81        })
  82        .detach();
  83
  84        Self {
  85            remote_id,
  86            worktree_store,
  87            opened_buffers: Default::default(),
  88            remote_buffer_listeners: Default::default(),
  89            loading_remote_buffers_by_id: Default::default(),
  90            local_buffer_ids_by_path: Default::default(),
  91            local_buffer_ids_by_entry_id: Default::default(),
  92            loading_buffers_by_path: Default::default(),
  93        }
  94    }
  95
  96    pub fn open_buffer(
  97        &mut self,
  98        project_path: ProjectPath,
  99        cx: &mut ModelContext<Self>,
 100    ) -> Task<Result<Model<Buffer>>> {
 101        let existing_buffer = self.get_by_path(&project_path, cx);
 102        if let Some(existing_buffer) = existing_buffer {
 103            return Task::ready(Ok(existing_buffer));
 104        }
 105
 106        let Some(worktree) = self
 107            .worktree_store
 108            .read(cx)
 109            .worktree_for_id(project_path.worktree_id, cx)
 110        else {
 111            return Task::ready(Err(anyhow!("no such worktree")));
 112        };
 113
 114        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
 115            // If the given path is already being loaded, then wait for that existing
 116            // task to complete and return the same buffer.
 117            hash_map::Entry::Occupied(e) => e.get().clone(),
 118
 119            // Otherwise, record the fact that this path is now being loaded.
 120            hash_map::Entry::Vacant(entry) => {
 121                let (mut tx, rx) = postage::watch::channel();
 122                entry.insert(rx.clone());
 123
 124                let project_path = project_path.clone();
 125                let load_buffer = match worktree.read(cx) {
 126                    Worktree::Local(_) => {
 127                        self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
 128                    }
 129                    Worktree::Remote(tree) => {
 130                        self.open_remote_buffer_internal(&project_path.path, tree, cx)
 131                    }
 132                };
 133
 134                cx.spawn(move |this, mut cx| async move {
 135                    let load_result = load_buffer.await;
 136                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
 137                        // Record the fact that the buffer is no longer loading.
 138                        this.loading_buffers_by_path.remove(&project_path);
 139                        let buffer = load_result.map_err(Arc::new)?;
 140                        Ok(buffer)
 141                    })?);
 142                    anyhow::Ok(())
 143                })
 144                .detach();
 145                rx
 146            }
 147        };
 148
 149        cx.background_executor().spawn(async move {
 150            Self::wait_for_loading_buffer(loading_watch)
 151                .await
 152                .map_err(|e| e.cloned())
 153        })
 154    }
 155
 156    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 157        cx.subscribe(worktree, |this, worktree, event, cx| {
 158            if worktree.read(cx).is_local() {
 159                match event {
 160                    worktree::Event::UpdatedEntries(changes) => {
 161                        this.local_worktree_entries_changed(&worktree, changes, cx);
 162                    }
 163                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 164                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
 165                    }
 166                    _ => {}
 167                }
 168            }
 169        })
 170        .detach();
 171    }
 172
 173    fn local_worktree_entries_changed(
 174        &mut self,
 175        worktree_handle: &Model<Worktree>,
 176        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 177        cx: &mut ModelContext<Self>,
 178    ) {
 179        let snapshot = worktree_handle.read(cx).snapshot();
 180        for (path, entry_id, _) in changes {
 181            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
 182        }
 183    }
 184
 185    fn local_worktree_git_repos_changed(
 186        &mut self,
 187        worktree_handle: Model<Worktree>,
 188        changed_repos: &UpdatedGitRepositoriesSet,
 189        cx: &mut ModelContext<Self>,
 190    ) {
 191        debug_assert!(worktree_handle.read(cx).is_local());
 192
 193        // Identify the loading buffers whose containing repository that has changed.
 194        let future_buffers = self
 195            .loading_buffers()
 196            .filter_map(|(project_path, receiver)| {
 197                if project_path.worktree_id != worktree_handle.read(cx).id() {
 198                    return None;
 199                }
 200                let path = &project_path.path;
 201                changed_repos
 202                    .iter()
 203                    .find(|(work_dir, _)| path.starts_with(work_dir))?;
 204                let path = path.clone();
 205                Some(async move {
 206                    Self::wait_for_loading_buffer(receiver)
 207                        .await
 208                        .ok()
 209                        .map(|buffer| (buffer, path))
 210                })
 211            })
 212            .collect::<FuturesUnordered<_>>();
 213
 214        // Identify the current buffers whose containing repository has changed.
 215        let current_buffers = self
 216            .buffers()
 217            .filter_map(|buffer| {
 218                let file = File::from_dyn(buffer.read(cx).file())?;
 219                if file.worktree != worktree_handle {
 220                    return None;
 221                }
 222                changed_repos
 223                    .iter()
 224                    .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 225                Some((buffer, file.path.clone()))
 226            })
 227            .collect::<Vec<_>>();
 228
 229        if future_buffers.len() + current_buffers.len() == 0 {
 230            return;
 231        }
 232
 233        cx.spawn(move |this, mut cx| async move {
 234            // Wait for all of the buffers to load.
 235            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 236
 237            // Reload the diff base for every buffer whose containing git repository has changed.
 238            let snapshot =
 239                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 240            let diff_bases_by_buffer = cx
 241                .background_executor()
 242                .spawn(async move {
 243                    let mut diff_base_tasks = future_buffers
 244                        .into_iter()
 245                        .flatten()
 246                        .chain(current_buffers)
 247                        .filter_map(|(buffer, path)| {
 248                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 249                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 250                            Some(async move {
 251                                let base_text =
 252                                    local_repo_entry.repo().load_index_text(&relative_path);
 253                                Some((buffer, base_text))
 254                            })
 255                        })
 256                        .collect::<FuturesUnordered<_>>();
 257
 258                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 259                    while let Some(diff_base) = diff_base_tasks.next().await {
 260                        if let Some(diff_base) = diff_base {
 261                            diff_bases.push(diff_base);
 262                        }
 263                    }
 264                    diff_bases
 265                })
 266                .await;
 267
 268            this.update(&mut cx, |this, cx| {
 269                // Assign the new diff bases on all of the buffers.
 270                for (buffer, diff_base) in diff_bases_by_buffer {
 271                    let buffer_id = buffer.update(cx, |buffer, cx| {
 272                        buffer.set_diff_base(diff_base.clone(), cx);
 273                        buffer.remote_id().to_proto()
 274                    });
 275                    if let Some(project_id) = this.remote_id {
 276                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 277                            proto::UpdateDiffBase {
 278                                project_id,
 279                                buffer_id,
 280                                diff_base,
 281                            }
 282                            .into_envelope(0, None, None),
 283                        )))
 284                    }
 285                }
 286            })
 287        })
 288        .detach_and_log_err(cx);
 289    }
 290
 291    fn open_local_buffer_internal(
 292        &mut self,
 293        path: Arc<Path>,
 294        worktree: Model<Worktree>,
 295        cx: &mut ModelContext<Self>,
 296    ) -> Task<Result<Model<Buffer>>> {
 297        let load_buffer = worktree.update(cx, |worktree, cx| {
 298            let load_file = worktree.load_file(path.as_ref(), cx);
 299            let reservation = cx.reserve_model();
 300            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 301            cx.spawn(move |_, mut cx| async move {
 302                let loaded = load_file.await?;
 303                let text_buffer = cx
 304                    .background_executor()
 305                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 306                    .await;
 307                cx.insert_model(reservation, |_| {
 308                    Buffer::build(
 309                        text_buffer,
 310                        loaded.diff_base,
 311                        Some(loaded.file),
 312                        Capability::ReadWrite,
 313                    )
 314                })
 315            })
 316        });
 317
 318        cx.spawn(move |this, mut cx| async move {
 319            let buffer = match load_buffer.await {
 320                Ok(buffer) => Ok(buffer),
 321                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 322                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 323                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 324                    Buffer::build(
 325                        text_buffer,
 326                        None,
 327                        Some(Arc::new(File {
 328                            worktree,
 329                            path,
 330                            mtime: None,
 331                            entry_id: None,
 332                            is_local: true,
 333                            is_deleted: false,
 334                            is_private: false,
 335                        })),
 336                        Capability::ReadWrite,
 337                    )
 338                }),
 339                Err(e) => Err(e),
 340            }?;
 341            this.update(&mut cx, |this, cx| {
 342                this.add_buffer(buffer.clone(), cx).log_err();
 343            })?;
 344            Ok(buffer)
 345        })
 346    }
 347
 348    fn open_remote_buffer_internal(
 349        &self,
 350        path: &Arc<Path>,
 351        worktree: &RemoteWorktree,
 352        cx: &ModelContext<Self>,
 353    ) -> Task<Result<Model<Buffer>>> {
 354        let worktree_id = worktree.id().to_proto();
 355        let project_id = worktree.project_id();
 356        let client = worktree.client();
 357        let path_string = path.clone().to_string_lossy().to_string();
 358        cx.spawn(move |this, mut cx| async move {
 359            let response = client
 360                .request(proto::OpenBufferByPath {
 361                    project_id,
 362                    worktree_id,
 363                    path: path_string,
 364                })
 365                .await?;
 366            let buffer_id = BufferId::new(response.buffer_id)?;
 367            this.update(&mut cx, |this, cx| {
 368                this.wait_for_remote_buffer(buffer_id, cx)
 369            })?
 370            .await
 371        })
 372    }
 373
 374    pub fn create_buffer(
 375        &mut self,
 376        remote_client: Option<(AnyProtoClient, u64)>,
 377        cx: &mut ModelContext<Self>,
 378    ) -> Task<Result<Model<Buffer>>> {
 379        if let Some((remote_client, project_id)) = remote_client {
 380            let create = remote_client.request(proto::OpenNewBuffer { project_id });
 381            cx.spawn(|this, mut cx| async move {
 382                let response = create.await?;
 383                let buffer_id = BufferId::new(response.buffer_id)?;
 384
 385                this.update(&mut cx, |this, cx| {
 386                    this.wait_for_remote_buffer(buffer_id, cx)
 387                })?
 388                .await
 389            })
 390        } else {
 391            Task::ready(Ok(self.create_local_buffer("", None, cx)))
 392        }
 393    }
 394
 395    pub fn create_local_buffer(
 396        &mut self,
 397        text: &str,
 398        language: Option<Arc<Language>>,
 399        cx: &mut ModelContext<Self>,
 400    ) -> Model<Buffer> {
 401        let buffer = cx.new_model(|cx| {
 402            Buffer::local(text, cx)
 403                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
 404        });
 405        self.add_buffer(buffer.clone(), cx).log_err();
 406        buffer
 407    }
 408
 409    pub fn save_buffer(
 410        &mut self,
 411        buffer: Model<Buffer>,
 412        cx: &mut ModelContext<Self>,
 413    ) -> Task<Result<()>> {
 414        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 415            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 416        };
 417        match file.worktree.read(cx) {
 418            Worktree::Local(_) => {
 419                self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
 420            }
 421            Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
 422        }
 423    }
 424
 425    pub fn save_buffer_as(
 426        &mut self,
 427        buffer: Model<Buffer>,
 428        path: ProjectPath,
 429        cx: &mut ModelContext<Self>,
 430    ) -> Task<Result<()>> {
 431        let Some(worktree) = self
 432            .worktree_store
 433            .read(cx)
 434            .worktree_for_id(path.worktree_id, cx)
 435        else {
 436            return Task::ready(Err(anyhow!("no such worktree")));
 437        };
 438
 439        let old_file = buffer.read(cx).file().cloned();
 440
 441        let task = match worktree.read(cx) {
 442            Worktree::Local(_) => {
 443                self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
 444            }
 445            Worktree::Remote(tree) => {
 446                self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
 447            }
 448        };
 449        cx.spawn(|this, mut cx| async move {
 450            task.await?;
 451            this.update(&mut cx, |_, cx| {
 452                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 453            })
 454        })
 455    }
 456
 457    fn save_local_buffer(
 458        &self,
 459        worktree: Model<Worktree>,
 460        buffer_handle: Model<Buffer>,
 461        path: Arc<Path>,
 462        mut has_changed_file: bool,
 463        cx: &mut ModelContext<Self>,
 464    ) -> Task<Result<()>> {
 465        let buffer = buffer_handle.read(cx);
 466        let text = buffer.as_rope().clone();
 467        let line_ending = buffer.line_ending();
 468        let version = buffer.version();
 469        let buffer_id = buffer.remote_id();
 470        if buffer.file().is_some_and(|file| !file.is_created()) {
 471            has_changed_file = true;
 472        }
 473
 474        let save = worktree.update(cx, |worktree, cx| {
 475            worktree.write_file(path.as_ref(), text, line_ending, cx)
 476        });
 477
 478        cx.spawn(move |this, mut cx| async move {
 479            let new_file = save.await?;
 480            let mtime = new_file.mtime;
 481            this.update(&mut cx, |this, cx| {
 482                if let Some(project_id) = this.remote_id {
 483                    if has_changed_file {
 484                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 485                            proto::UpdateBufferFile {
 486                                project_id,
 487                                buffer_id: buffer_id.to_proto(),
 488                                file: Some(language::File::to_proto(&*new_file, cx)),
 489                            }
 490                            .into_envelope(0, None, None),
 491                        )));
 492                    }
 493                    cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 494                        proto::BufferSaved {
 495                            project_id,
 496                            buffer_id: buffer_id.to_proto(),
 497                            version: serialize_version(&version),
 498                            mtime: mtime.map(|time| time.into()),
 499                        }
 500                        .into_envelope(0, None, None),
 501                    )));
 502                }
 503            })?;
 504            buffer_handle.update(&mut cx, |buffer, cx| {
 505                if has_changed_file {
 506                    buffer.file_updated(new_file, cx);
 507                }
 508                buffer.did_save(version.clone(), mtime, cx);
 509            })
 510        })
 511    }
 512
 513    fn save_remote_buffer(
 514        &self,
 515        buffer_handle: Model<Buffer>,
 516        new_path: Option<proto::ProjectPath>,
 517        tree: &RemoteWorktree,
 518        cx: &ModelContext<Self>,
 519    ) -> Task<Result<()>> {
 520        let buffer = buffer_handle.read(cx);
 521        let buffer_id = buffer.remote_id().into();
 522        let version = buffer.version();
 523        let rpc = tree.client();
 524        let project_id = tree.project_id();
 525        cx.spawn(move |_, mut cx| async move {
 526            let response = rpc
 527                .request(proto::SaveBuffer {
 528                    project_id,
 529                    buffer_id,
 530                    new_path,
 531                    version: serialize_version(&version),
 532                })
 533                .await?;
 534            let version = deserialize_version(&response.version);
 535            let mtime = response.mtime.map(|mtime| mtime.into());
 536
 537            buffer_handle.update(&mut cx, |buffer, cx| {
 538                buffer.did_save(version.clone(), mtime, cx);
 539            })?;
 540
 541            Ok(())
 542        })
 543    }
 544
 545    pub fn blame_buffer(
 546        &self,
 547        buffer: &Model<Buffer>,
 548        version: Option<clock::Global>,
 549        cx: &AppContext,
 550    ) -> Task<Result<Blame>> {
 551        let buffer = buffer.read(cx);
 552        let Some(file) = File::from_dyn(buffer.file()) else {
 553            return Task::ready(Err(anyhow!("buffer has no file")));
 554        };
 555
 556        match file.worktree.clone().read(cx) {
 557            Worktree::Local(worktree) => {
 558                let worktree = worktree.snapshot();
 559                let blame_params = maybe!({
 560                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
 561                        Some(repo_for_path) => repo_for_path,
 562                        None => anyhow::bail!(NoRepositoryError {}),
 563                    };
 564
 565                    let relative_path = repo_entry
 566                        .relativize(&worktree, &file.path)
 567                        .context("failed to relativize buffer path")?;
 568
 569                    let repo = local_repo_entry.repo().clone();
 570
 571                    let content = match version {
 572                        Some(version) => buffer.rope_for_version(&version).clone(),
 573                        None => buffer.as_rope().clone(),
 574                    };
 575
 576                    anyhow::Ok((repo, relative_path, content))
 577                });
 578
 579                cx.background_executor().spawn(async move {
 580                    let (repo, relative_path, content) = blame_params?;
 581                    repo.blame(&relative_path, content)
 582                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
 583                })
 584            }
 585            Worktree::Remote(worktree) => {
 586                let buffer_id = buffer.remote_id();
 587                let version = buffer.version();
 588                let project_id = worktree.project_id();
 589                let client = worktree.client();
 590                cx.spawn(|_| async move {
 591                    let response = client
 592                        .request(proto::BlameBuffer {
 593                            project_id,
 594                            buffer_id: buffer_id.into(),
 595                            version: serialize_version(&version),
 596                        })
 597                        .await?;
 598                    Ok(deserialize_blame_buffer_response(response))
 599                })
 600            }
 601        }
 602    }
 603
 604    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
 605        let remote_id = buffer.read(cx).remote_id();
 606        let is_remote = buffer.read(cx).replica_id() != 0;
 607        let open_buffer = if self.remote_id.is_some() {
 608            OpenBuffer::Strong(buffer.clone())
 609        } else {
 610            OpenBuffer::Weak(buffer.downgrade())
 611        };
 612
 613        match self.opened_buffers.entry(remote_id) {
 614            hash_map::Entry::Vacant(entry) => {
 615                entry.insert(open_buffer);
 616            }
 617            hash_map::Entry::Occupied(mut entry) => {
 618                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 619                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
 620                } else if entry.get().upgrade().is_some() {
 621                    if is_remote {
 622                        return Ok(());
 623                    } else {
 624                        debug_panic!("buffer {} was already registered", remote_id);
 625                        Err(anyhow!("buffer {} was already registered", remote_id))?;
 626                    }
 627                }
 628                entry.insert(open_buffer);
 629            }
 630        }
 631
 632        if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
 633            for sender in senders {
 634                sender.send(Ok(buffer.clone())).ok();
 635            }
 636        }
 637
 638        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 639            if file.is_local {
 640                self.local_buffer_ids_by_path.insert(
 641                    ProjectPath {
 642                        worktree_id: file.worktree_id(cx),
 643                        path: file.path.clone(),
 644                    },
 645                    remote_id,
 646                );
 647
 648                if let Some(entry_id) = file.entry_id {
 649                    self.local_buffer_ids_by_entry_id
 650                        .insert(entry_id, remote_id);
 651                }
 652            }
 653        }
 654
 655        cx.subscribe(&buffer, Self::on_buffer_event).detach();
 656        cx.emit(BufferStoreEvent::BufferAdded(buffer));
 657        Ok(())
 658    }
 659
 660    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
 661        self.opened_buffers
 662            .values()
 663            .filter_map(|buffer| buffer.upgrade())
 664    }
 665
 666    pub fn loading_buffers(
 667        &self,
 668    ) -> impl Iterator<
 669        Item = (
 670            &ProjectPath,
 671            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
 672        ),
 673    > {
 674        self.loading_buffers_by_path
 675            .iter()
 676            .map(|(path, rx)| (path, rx.clone()))
 677    }
 678
 679    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
 680        self.buffers().find_map(|buffer| {
 681            let file = File::from_dyn(buffer.read(cx).file())?;
 682            if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
 683                Some(buffer)
 684            } else {
 685                None
 686            }
 687        })
 688    }
 689
 690    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 691        self.opened_buffers
 692            .get(&buffer_id)
 693            .and_then(|buffer| buffer.upgrade())
 694    }
 695
 696    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
 697        self.get(buffer_id)
 698            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
 699    }
 700
 701    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 702        self.get(buffer_id)
 703            .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
 704    }
 705
 706    pub fn wait_for_remote_buffer(
 707        &mut self,
 708        id: BufferId,
 709        cx: &mut AppContext,
 710    ) -> Task<Result<Model<Buffer>>> {
 711        let buffer = self.get(id);
 712        if let Some(buffer) = buffer {
 713            return Task::ready(Ok(buffer));
 714        }
 715        let (tx, rx) = oneshot::channel();
 716        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 717        cx.background_executor().spawn(async move { rx.await? })
 718    }
 719
 720    pub fn buffer_version_info(
 721        &self,
 722        cx: &AppContext,
 723    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
 724        let buffers = self
 725            .buffers()
 726            .map(|buffer| {
 727                let buffer = buffer.read(cx);
 728                proto::BufferVersion {
 729                    id: buffer.remote_id().into(),
 730                    version: language::proto::serialize_version(&buffer.version),
 731                }
 732            })
 733            .collect();
 734        let incomplete_buffer_ids = self
 735            .loading_remote_buffers_by_id
 736            .keys()
 737            .copied()
 738            .collect::<Vec<_>>();
 739        (buffers, incomplete_buffer_ids)
 740    }
 741
 742    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 743        self.set_remote_id(None, cx);
 744
 745        for buffer in self.buffers() {
 746            buffer.update(cx, |buffer, cx| {
 747                buffer.set_capability(Capability::ReadOnly, cx)
 748            });
 749        }
 750
 751        // Wake up all futures currently waiting on a buffer to get opened,
 752        // to give them a chance to fail now that we've disconnected.
 753        self.remote_buffer_listeners.clear();
 754    }
 755
 756    pub fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
 757        self.remote_id = remote_id;
 758        for open_buffer in self.opened_buffers.values_mut() {
 759            if remote_id.is_some() {
 760                if let OpenBuffer::Weak(buffer) = open_buffer {
 761                    if let Some(buffer) = buffer.upgrade() {
 762                        *open_buffer = OpenBuffer::Strong(buffer);
 763                    }
 764                }
 765            } else {
 766                if let Some(buffer) = open_buffer.upgrade() {
 767                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
 768                }
 769                if let OpenBuffer::Strong(buffer) = open_buffer {
 770                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
 771                }
 772            }
 773        }
 774    }
 775
 776    pub fn discard_incomplete(&mut self) {
 777        self.opened_buffers
 778            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
 779    }
 780
 781    fn on_buffer_event(
 782        &mut self,
 783        buffer: Model<Buffer>,
 784        event: &BufferEvent,
 785        cx: &mut ModelContext<Self>,
 786    ) {
 787        match event {
 788            BufferEvent::FileHandleChanged => {
 789                self.buffer_changed_file(buffer, cx);
 790            }
 791            _ => {}
 792        }
 793    }
 794
 795    fn local_worktree_entry_changed(
 796        &mut self,
 797        entry_id: ProjectEntryId,
 798        path: &Arc<Path>,
 799        worktree: &Model<worktree::Worktree>,
 800        snapshot: &worktree::Snapshot,
 801        cx: &mut ModelContext<Self>,
 802    ) -> Option<()> {
 803        let project_path = ProjectPath {
 804            worktree_id: snapshot.id(),
 805            path: path.clone(),
 806        };
 807        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 808            Some(&buffer_id) => buffer_id,
 809            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
 810        };
 811        let buffer = if let Some(buffer) = self.get(buffer_id) {
 812            buffer
 813        } else {
 814            self.opened_buffers.remove(&buffer_id);
 815            self.local_buffer_ids_by_path.remove(&project_path);
 816            self.local_buffer_ids_by_entry_id.remove(&entry_id);
 817            return None;
 818        };
 819
 820        let events = buffer.update(cx, |buffer, cx| {
 821            let file = buffer.file()?;
 822            let old_file = File::from_dyn(Some(file))?;
 823            if old_file.worktree != *worktree {
 824                return None;
 825            }
 826
 827            let new_file = if let Some(entry) = old_file
 828                .entry_id
 829                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 830            {
 831                File {
 832                    is_local: true,
 833                    entry_id: Some(entry.id),
 834                    mtime: entry.mtime,
 835                    path: entry.path.clone(),
 836                    worktree: worktree.clone(),
 837                    is_deleted: false,
 838                    is_private: entry.is_private,
 839                }
 840            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
 841                File {
 842                    is_local: true,
 843                    entry_id: Some(entry.id),
 844                    mtime: entry.mtime,
 845                    path: entry.path.clone(),
 846                    worktree: worktree.clone(),
 847                    is_deleted: false,
 848                    is_private: entry.is_private,
 849                }
 850            } else {
 851                File {
 852                    is_local: true,
 853                    entry_id: old_file.entry_id,
 854                    path: old_file.path.clone(),
 855                    mtime: old_file.mtime,
 856                    worktree: worktree.clone(),
 857                    is_deleted: true,
 858                    is_private: old_file.is_private,
 859                }
 860            };
 861
 862            if new_file == *old_file {
 863                return None;
 864            }
 865
 866            let mut events = Vec::new();
 867            if new_file.path != old_file.path {
 868                self.local_buffer_ids_by_path.remove(&ProjectPath {
 869                    path: old_file.path.clone(),
 870                    worktree_id: old_file.worktree_id(cx),
 871                });
 872                self.local_buffer_ids_by_path.insert(
 873                    ProjectPath {
 874                        worktree_id: new_file.worktree_id(cx),
 875                        path: new_file.path.clone(),
 876                    },
 877                    buffer_id,
 878                );
 879                events.push(BufferStoreEvent::BufferChangedFilePath {
 880                    buffer: cx.handle(),
 881                    old_file: buffer.file().cloned(),
 882                });
 883            }
 884
 885            if new_file.entry_id != old_file.entry_id {
 886                if let Some(entry_id) = old_file.entry_id {
 887                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
 888                }
 889                if let Some(entry_id) = new_file.entry_id {
 890                    self.local_buffer_ids_by_entry_id
 891                        .insert(entry_id, buffer_id);
 892                }
 893            }
 894
 895            if let Some(project_id) = self.remote_id {
 896                events.push(BufferStoreEvent::MessageToReplicas(Box::new(
 897                    proto::UpdateBufferFile {
 898                        project_id,
 899                        buffer_id: buffer_id.to_proto(),
 900                        file: Some(new_file.to_proto(cx)),
 901                    }
 902                    .into_envelope(0, None, None),
 903                )))
 904            }
 905
 906            buffer.file_updated(Arc::new(new_file), cx);
 907            Some(events)
 908        })?;
 909
 910        for event in events {
 911            cx.emit(event);
 912        }
 913
 914        None
 915    }
 916
 917    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
 918        let file = File::from_dyn(buffer.read(cx).file())?;
 919
 920        let remote_id = buffer.read(cx).remote_id();
 921        if let Some(entry_id) = file.entry_id {
 922            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 923                Some(_) => {
 924                    return None;
 925                }
 926                None => {
 927                    self.local_buffer_ids_by_entry_id
 928                        .insert(entry_id, remote_id);
 929                }
 930            }
 931        };
 932        self.local_buffer_ids_by_path.insert(
 933            ProjectPath {
 934                worktree_id: file.worktree_id(cx),
 935                path: file.path.clone(),
 936            },
 937            remote_id,
 938        );
 939
 940        Some(())
 941    }
 942
 943    pub async fn create_buffer_for_peer(
 944        this: Model<Self>,
 945        peer_id: PeerId,
 946        buffer_id: BufferId,
 947        project_id: u64,
 948        client: AnyProtoClient,
 949        cx: &mut AsyncAppContext,
 950    ) -> Result<()> {
 951        let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
 952            return Ok(());
 953        };
 954
 955        let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
 956        let operations = operations.await;
 957        let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
 958
 959        let initial_state = proto::CreateBufferForPeer {
 960            project_id,
 961            peer_id: Some(peer_id),
 962            variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
 963        };
 964
 965        if client.send(initial_state).log_err().is_some() {
 966            let client = client.clone();
 967            cx.background_executor()
 968                .spawn(async move {
 969                    let mut chunks = split_operations(operations).peekable();
 970                    while let Some(chunk) = chunks.next() {
 971                        let is_last = chunks.peek().is_none();
 972                        client.send(proto::CreateBufferForPeer {
 973                            project_id,
 974                            peer_id: Some(peer_id),
 975                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
 976                                proto::BufferChunk {
 977                                    buffer_id: buffer_id.into(),
 978                                    operations: chunk,
 979                                    is_last,
 980                                },
 981                            )),
 982                        })?;
 983                    }
 984                    anyhow::Ok(())
 985                })
 986                .await
 987                .log_err();
 988        }
 989        Ok(())
 990    }
 991
 992    pub async fn handle_update_buffer(
 993        this: Model<Self>,
 994        envelope: TypedEnvelope<proto::UpdateBuffer>,
 995        mut cx: AsyncAppContext,
 996    ) -> Result<proto::Ack> {
 997        let payload = envelope.payload.clone();
 998        let buffer_id = BufferId::new(payload.buffer_id)?;
 999        let ops = payload
1000            .operations
1001            .into_iter()
1002            .map(language::proto::deserialize_operation)
1003            .collect::<Result<Vec<_>, _>>()?;
1004        this.update(&mut cx, |this, cx| {
1005            match this.opened_buffers.entry(buffer_id) {
1006                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1007                    OpenBuffer::Strong(buffer) => {
1008                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1009                    }
1010                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1011                    OpenBuffer::Weak(_) => {}
1012                },
1013                hash_map::Entry::Vacant(e) => {
1014                    e.insert(OpenBuffer::Operations(ops));
1015                }
1016            }
1017            Ok(proto::Ack {})
1018        })?
1019    }
1020
1021    pub fn handle_create_buffer_for_peer(
1022        &mut self,
1023        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1024        replica_id: u16,
1025        capability: Capability,
1026        cx: &mut ModelContext<Self>,
1027    ) -> Result<()> {
1028        match envelope
1029            .payload
1030            .variant
1031            .ok_or_else(|| anyhow!("missing variant"))?
1032        {
1033            proto::create_buffer_for_peer::Variant::State(mut state) => {
1034                let buffer_id = BufferId::new(state.id)?;
1035
1036                let buffer_result = maybe!({
1037                    let mut buffer_file = None;
1038                    if let Some(file) = state.file.take() {
1039                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1040                        let worktree = self
1041                            .worktree_store
1042                            .read(cx)
1043                            .worktree_for_id(worktree_id, cx)
1044                            .ok_or_else(|| {
1045                                anyhow!("no worktree found for id {}", file.worktree_id)
1046                            })?;
1047                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1048                            as Arc<dyn language::File>);
1049                    }
1050                    Buffer::from_proto(replica_id, capability, state, buffer_file)
1051                });
1052
1053                match buffer_result {
1054                    Ok(buffer) => {
1055                        let buffer = cx.new_model(|_| buffer);
1056                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1057                    }
1058                    Err(error) => {
1059                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1060                            for listener in listeners {
1061                                listener.send(Err(anyhow!(error.cloned()))).ok();
1062                            }
1063                        }
1064                    }
1065                }
1066            }
1067            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1068                let buffer_id = BufferId::new(chunk.buffer_id)?;
1069                let buffer = self
1070                    .loading_remote_buffers_by_id
1071                    .get(&buffer_id)
1072                    .cloned()
1073                    .ok_or_else(|| {
1074                        anyhow!(
1075                            "received chunk for buffer {} without initial state",
1076                            chunk.buffer_id
1077                        )
1078                    })?;
1079
1080                let result = maybe!({
1081                    let operations = chunk
1082                        .operations
1083                        .into_iter()
1084                        .map(language::proto::deserialize_operation)
1085                        .collect::<Result<Vec<_>>>()?;
1086                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1087                });
1088
1089                if let Err(error) = result {
1090                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1091                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1092                        for listener in listeners {
1093                            listener.send(Err(error.cloned())).ok();
1094                        }
1095                    }
1096                } else if chunk.is_last {
1097                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1098                    self.add_buffer(buffer, cx)?;
1099                }
1100            }
1101        }
1102
1103        Ok(())
1104    }
1105
1106    pub async fn handle_update_buffer_file(
1107        this: Model<Self>,
1108        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1109        mut cx: AsyncAppContext,
1110    ) -> Result<()> {
1111        let buffer_id = envelope.payload.buffer_id;
1112        let buffer_id = BufferId::new(buffer_id)?;
1113
1114        this.update(&mut cx, |this, cx| {
1115            let payload = envelope.payload.clone();
1116            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1117                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1118                let worktree = this
1119                    .worktree_store
1120                    .read(cx)
1121                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1122                    .ok_or_else(|| anyhow!("no such worktree"))?;
1123                let file = File::from_proto(file, worktree, cx)?;
1124                let old_file = buffer.update(cx, |buffer, cx| {
1125                    let old_file = buffer.file().cloned();
1126                    let new_path = file.path.clone();
1127                    buffer.file_updated(Arc::new(file), cx);
1128                    if old_file
1129                        .as_ref()
1130                        .map_or(true, |old| *old.path() != new_path)
1131                    {
1132                        Some(old_file)
1133                    } else {
1134                        None
1135                    }
1136                });
1137                if let Some(old_file) = old_file {
1138                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1139                }
1140            }
1141            Ok(())
1142        })?
1143    }
1144
1145    pub async fn handle_update_diff_base(
1146        this: Model<Self>,
1147        envelope: TypedEnvelope<proto::UpdateDiffBase>,
1148        mut cx: AsyncAppContext,
1149    ) -> Result<()> {
1150        this.update(&mut cx, |this, cx| {
1151            let buffer_id = envelope.payload.buffer_id;
1152            let buffer_id = BufferId::new(buffer_id)?;
1153            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1154                buffer.update(cx, |buffer, cx| {
1155                    buffer.set_diff_base(envelope.payload.diff_base, cx)
1156                });
1157            }
1158            Ok(())
1159        })?
1160    }
1161
1162    pub async fn handle_save_buffer(
1163        this: Model<Self>,
1164        envelope: TypedEnvelope<proto::SaveBuffer>,
1165        mut cx: AsyncAppContext,
1166    ) -> Result<proto::BufferSaved> {
1167        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1168        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1169            anyhow::Ok((
1170                this.get_existing(buffer_id)?,
1171                this.remote_id.context("project is not shared")?,
1172            ))
1173        })??;
1174        buffer
1175            .update(&mut cx, |buffer, _| {
1176                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1177            })?
1178            .await?;
1179        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1180
1181        if let Some(new_path) = envelope.payload.new_path {
1182            let new_path = ProjectPath::from_proto(new_path);
1183            this.update(&mut cx, |this, cx| {
1184                this.save_buffer_as(buffer.clone(), new_path, cx)
1185            })?
1186            .await?;
1187        } else {
1188            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1189                .await?;
1190        }
1191
1192        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1193            project_id,
1194            buffer_id: buffer_id.into(),
1195            version: serialize_version(buffer.saved_version()),
1196            mtime: buffer.saved_mtime().map(|time| time.into()),
1197        })
1198    }
1199
1200    pub async fn handle_buffer_saved(
1201        this: Model<Self>,
1202        envelope: TypedEnvelope<proto::BufferSaved>,
1203        mut cx: AsyncAppContext,
1204    ) -> Result<()> {
1205        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1206        let version = deserialize_version(&envelope.payload.version);
1207        let mtime = envelope.payload.mtime.map(|time| time.into());
1208        this.update(&mut cx, |this, cx| {
1209            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1210                buffer.update(cx, |buffer, cx| {
1211                    buffer.did_save(version, mtime, cx);
1212                });
1213            }
1214        })
1215    }
1216
1217    pub async fn handle_buffer_reloaded(
1218        this: Model<Self>,
1219        envelope: TypedEnvelope<proto::BufferReloaded>,
1220        mut cx: AsyncAppContext,
1221    ) -> Result<()> {
1222        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1223        let version = deserialize_version(&envelope.payload.version);
1224        let mtime = envelope.payload.mtime.map(|time| time.into());
1225        let line_ending = deserialize_line_ending(
1226            proto::LineEnding::from_i32(envelope.payload.line_ending)
1227                .ok_or_else(|| anyhow!("missing line ending"))?,
1228        );
1229        this.update(&mut cx, |this, cx| {
1230            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1231                buffer.update(cx, |buffer, cx| {
1232                    buffer.did_reload(version, line_ending, mtime, cx);
1233                });
1234            }
1235        })
1236    }
1237
1238    pub async fn handle_blame_buffer(
1239        this: Model<Self>,
1240        envelope: TypedEnvelope<proto::BlameBuffer>,
1241        mut cx: AsyncAppContext,
1242    ) -> Result<proto::BlameBufferResponse> {
1243        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1244        let version = deserialize_version(&envelope.payload.version);
1245        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1246        buffer
1247            .update(&mut cx, |buffer, _| {
1248                buffer.wait_for_version(version.clone())
1249            })?
1250            .await?;
1251        let blame = this
1252            .update(&mut cx, |this, cx| {
1253                this.blame_buffer(&buffer, Some(version), cx)
1254            })?
1255            .await?;
1256        Ok(serialize_blame_buffer_response(blame))
1257    }
1258
1259    pub async fn wait_for_loading_buffer(
1260        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1261    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1262        loop {
1263            if let Some(result) = receiver.borrow().as_ref() {
1264                match result {
1265                    Ok(buffer) => return Ok(buffer.to_owned()),
1266                    Err(e) => return Err(e.to_owned()),
1267                }
1268            }
1269            receiver.next().await;
1270        }
1271    }
1272}
1273
1274impl OpenBuffer {
1275    fn upgrade(&self) -> Option<Model<Buffer>> {
1276        match self {
1277            OpenBuffer::Strong(handle) => Some(handle.clone()),
1278            OpenBuffer::Weak(handle) => handle.upgrade(),
1279            OpenBuffer::Operations(_) => None,
1280        }
1281    }
1282}
1283
1284fn is_not_found_error(error: &anyhow::Error) -> bool {
1285    error
1286        .root_cause()
1287        .downcast_ref::<io::Error>()
1288        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1289}
1290
1291fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1292    let entries = blame
1293        .entries
1294        .into_iter()
1295        .map(|entry| proto::BlameEntry {
1296            sha: entry.sha.as_bytes().into(),
1297            start_line: entry.range.start,
1298            end_line: entry.range.end,
1299            original_line_number: entry.original_line_number,
1300            author: entry.author.clone(),
1301            author_mail: entry.author_mail.clone(),
1302            author_time: entry.author_time,
1303            author_tz: entry.author_tz.clone(),
1304            committer: entry.committer.clone(),
1305            committer_mail: entry.committer_mail.clone(),
1306            committer_time: entry.committer_time,
1307            committer_tz: entry.committer_tz.clone(),
1308            summary: entry.summary.clone(),
1309            previous: entry.previous.clone(),
1310            filename: entry.filename.clone(),
1311        })
1312        .collect::<Vec<_>>();
1313
1314    let messages = blame
1315        .messages
1316        .into_iter()
1317        .map(|(oid, message)| proto::CommitMessage {
1318            oid: oid.as_bytes().into(),
1319            message,
1320        })
1321        .collect::<Vec<_>>();
1322
1323    let permalinks = blame
1324        .permalinks
1325        .into_iter()
1326        .map(|(oid, url)| proto::CommitPermalink {
1327            oid: oid.as_bytes().into(),
1328            permalink: url.to_string(),
1329        })
1330        .collect::<Vec<_>>();
1331
1332    proto::BlameBufferResponse {
1333        entries,
1334        messages,
1335        permalinks,
1336        remote_url: blame.remote_url,
1337    }
1338}
1339
1340fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1341    let entries = response
1342        .entries
1343        .into_iter()
1344        .filter_map(|entry| {
1345            Some(git::blame::BlameEntry {
1346                sha: git::Oid::from_bytes(&entry.sha).ok()?,
1347                range: entry.start_line..entry.end_line,
1348                original_line_number: entry.original_line_number,
1349                committer: entry.committer,
1350                committer_time: entry.committer_time,
1351                committer_tz: entry.committer_tz,
1352                committer_mail: entry.committer_mail,
1353                author: entry.author,
1354                author_mail: entry.author_mail,
1355                author_time: entry.author_time,
1356                author_tz: entry.author_tz,
1357                summary: entry.summary,
1358                previous: entry.previous,
1359                filename: entry.filename,
1360            })
1361        })
1362        .collect::<Vec<_>>();
1363
1364    let messages = response
1365        .messages
1366        .into_iter()
1367        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1368        .collect::<HashMap<_, _>>();
1369
1370    let permalinks = response
1371        .permalinks
1372        .into_iter()
1373        .filter_map(|permalink| {
1374            Some((
1375                git::Oid::from_bytes(&permalink.oid).ok()?,
1376                Url::from_str(&permalink.permalink).ok()?,
1377            ))
1378        })
1379        .collect::<HashMap<_, _>>();
1380
1381    Blame {
1382        entries,
1383        permalinks,
1384        messages,
1385        remote_url: response.remote_url,
1386    }
1387}