buffer_store.rs

   1use crate::{
   2    search::SearchQuery,
   3    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   4    Item, NoRepositoryError, ProjectPath,
   5};
   6use anyhow::{anyhow, Context as _, Result};
   7use collections::{hash_map, HashMap, HashSet};
   8use fs::Fs;
   9use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
  10use git::blame::Blame;
  11use gpui::{
  12    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
  13};
  14use http_client::Url;
  15use language::{
  16    proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
  17    Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
  18};
  19use rpc::{
  20    proto::{self, AnyProtoClient, EnvelopedMessage, PeerId},
  21    ErrorExt as _, TypedEnvelope,
  22};
  23use smol::channel::Receiver;
  24use std::{io, path::Path, str::FromStr as _, sync::Arc};
  25use text::BufferId;
  26use util::{debug_panic, maybe, ResultExt as _};
  27use worktree::{
  28    File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
  29    WorktreeId,
  30};
  31
  32/// A set of open buffers.
  33pub struct BufferStore {
  34    remote_id: Option<u64>,
  35    #[allow(unused)]
  36    worktree_store: Model<WorktreeStore>,
  37    opened_buffers: HashMap<BufferId, OpenBuffer>,
  38    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  39    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  40    #[allow(clippy::type_complexity)]
  41    loading_buffers_by_path: HashMap<
  42        ProjectPath,
  43        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  44    >,
  45    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  46    remote_buffer_listeners:
  47        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  48}
  49
  50enum OpenBuffer {
  51    Strong(Model<Buffer>),
  52    Weak(WeakModel<Buffer>),
  53    Operations(Vec<Operation>),
  54}
  55
  56pub enum BufferStoreEvent {
  57    BufferAdded(Model<Buffer>),
  58    BufferChangedFilePath {
  59        buffer: Model<Buffer>,
  60        old_file: Option<Arc<dyn language::File>>,
  61    },
  62    MessageToReplicas(Box<proto::Envelope>),
  63}
  64
  65impl EventEmitter<BufferStoreEvent> for BufferStore {}
  66
  67impl BufferStore {
  68    /// Creates a buffer store, optionally retaining its buffers.
  69    ///
  70    /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
  71    /// and won't be released unless they are explicitly removed, or `retain_buffers`
  72    /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
  73    /// weak handles.
  74    pub fn new(
  75        worktree_store: Model<WorktreeStore>,
  76        remote_id: Option<u64>,
  77        cx: &mut ModelContext<Self>,
  78    ) -> Self {
  79        cx.subscribe(&worktree_store, |this, _, event, cx| match event {
  80            WorktreeStoreEvent::WorktreeAdded(worktree) => {
  81                this.subscribe_to_worktree(worktree, cx);
  82            }
  83            _ => {}
  84        })
  85        .detach();
  86
  87        Self {
  88            remote_id,
  89            worktree_store,
  90            opened_buffers: Default::default(),
  91            remote_buffer_listeners: Default::default(),
  92            loading_remote_buffers_by_id: Default::default(),
  93            local_buffer_ids_by_path: Default::default(),
  94            local_buffer_ids_by_entry_id: Default::default(),
  95            loading_buffers_by_path: Default::default(),
  96        }
  97    }
  98
  99    pub fn open_buffer(
 100        &mut self,
 101        project_path: ProjectPath,
 102        cx: &mut ModelContext<Self>,
 103    ) -> Task<Result<Model<Buffer>>> {
 104        let existing_buffer = self.get_by_path(&project_path, cx);
 105        if let Some(existing_buffer) = existing_buffer {
 106            return Task::ready(Ok(existing_buffer));
 107        }
 108
 109        let Some(worktree) = self
 110            .worktree_store
 111            .read(cx)
 112            .worktree_for_id(project_path.worktree_id, cx)
 113        else {
 114            return Task::ready(Err(anyhow!("no such worktree")));
 115        };
 116
 117        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
 118            // If the given path is already being loaded, then wait for that existing
 119            // task to complete and return the same buffer.
 120            hash_map::Entry::Occupied(e) => e.get().clone(),
 121
 122            // Otherwise, record the fact that this path is now being loaded.
 123            hash_map::Entry::Vacant(entry) => {
 124                let (mut tx, rx) = postage::watch::channel();
 125                entry.insert(rx.clone());
 126
 127                let project_path = project_path.clone();
 128                let load_buffer = match worktree.read(cx) {
 129                    Worktree::Local(_) => {
 130                        self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
 131                    }
 132                    Worktree::Remote(tree) => {
 133                        self.open_remote_buffer_internal(&project_path.path, tree, cx)
 134                    }
 135                };
 136
 137                cx.spawn(move |this, mut cx| async move {
 138                    let load_result = load_buffer.await;
 139                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
 140                        // Record the fact that the buffer is no longer loading.
 141                        this.loading_buffers_by_path.remove(&project_path);
 142                        let buffer = load_result.map_err(Arc::new)?;
 143                        Ok(buffer)
 144                    })?);
 145                    anyhow::Ok(())
 146                })
 147                .detach();
 148                rx
 149            }
 150        };
 151
 152        cx.background_executor().spawn(async move {
 153            Self::wait_for_loading_buffer(loading_watch)
 154                .await
 155                .map_err(|e| e.cloned())
 156        })
 157    }
 158
 159    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 160        cx.subscribe(worktree, |this, worktree, event, cx| {
 161            if worktree.read(cx).is_local() {
 162                match event {
 163                    worktree::Event::UpdatedEntries(changes) => {
 164                        this.local_worktree_entries_changed(&worktree, changes, cx);
 165                    }
 166                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 167                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
 168                    }
 169                    _ => {}
 170                }
 171            }
 172        })
 173        .detach();
 174    }
 175
 176    fn local_worktree_entries_changed(
 177        &mut self,
 178        worktree_handle: &Model<Worktree>,
 179        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 180        cx: &mut ModelContext<Self>,
 181    ) {
 182        let snapshot = worktree_handle.read(cx).snapshot();
 183        for (path, entry_id, _) in changes {
 184            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
 185        }
 186    }
 187
 188    fn local_worktree_git_repos_changed(
 189        &mut self,
 190        worktree_handle: Model<Worktree>,
 191        changed_repos: &UpdatedGitRepositoriesSet,
 192        cx: &mut ModelContext<Self>,
 193    ) {
 194        debug_assert!(worktree_handle.read(cx).is_local());
 195
 196        // Identify the loading buffers whose containing repository that has changed.
 197        let future_buffers = self
 198            .loading_buffers()
 199            .filter_map(|(project_path, receiver)| {
 200                if project_path.worktree_id != worktree_handle.read(cx).id() {
 201                    return None;
 202                }
 203                let path = &project_path.path;
 204                changed_repos
 205                    .iter()
 206                    .find(|(work_dir, _)| path.starts_with(work_dir))?;
 207                let path = path.clone();
 208                Some(async move {
 209                    Self::wait_for_loading_buffer(receiver)
 210                        .await
 211                        .ok()
 212                        .map(|buffer| (buffer, path))
 213                })
 214            })
 215            .collect::<FuturesUnordered<_>>();
 216
 217        // Identify the current buffers whose containing repository has changed.
 218        let current_buffers = self
 219            .buffers()
 220            .filter_map(|buffer| {
 221                let file = File::from_dyn(buffer.read(cx).file())?;
 222                if file.worktree != worktree_handle {
 223                    return None;
 224                }
 225                changed_repos
 226                    .iter()
 227                    .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 228                Some((buffer, file.path.clone()))
 229            })
 230            .collect::<Vec<_>>();
 231
 232        if future_buffers.len() + current_buffers.len() == 0 {
 233            return;
 234        }
 235
 236        cx.spawn(move |this, mut cx| async move {
 237            // Wait for all of the buffers to load.
 238            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 239
 240            // Reload the diff base for every buffer whose containing git repository has changed.
 241            let snapshot =
 242                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 243            let diff_bases_by_buffer = cx
 244                .background_executor()
 245                .spawn(async move {
 246                    let mut diff_base_tasks = future_buffers
 247                        .into_iter()
 248                        .flatten()
 249                        .chain(current_buffers)
 250                        .filter_map(|(buffer, path)| {
 251                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 252                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 253                            Some(async move {
 254                                let base_text =
 255                                    local_repo_entry.repo().load_index_text(&relative_path);
 256                                Some((buffer, base_text))
 257                            })
 258                        })
 259                        .collect::<FuturesUnordered<_>>();
 260
 261                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 262                    while let Some(diff_base) = diff_base_tasks.next().await {
 263                        if let Some(diff_base) = diff_base {
 264                            diff_bases.push(diff_base);
 265                        }
 266                    }
 267                    diff_bases
 268                })
 269                .await;
 270
 271            this.update(&mut cx, |this, cx| {
 272                // Assign the new diff bases on all of the buffers.
 273                for (buffer, diff_base) in diff_bases_by_buffer {
 274                    let buffer_id = buffer.update(cx, |buffer, cx| {
 275                        buffer.set_diff_base(diff_base.clone(), cx);
 276                        buffer.remote_id().to_proto()
 277                    });
 278                    if let Some(project_id) = this.remote_id {
 279                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 280                            proto::UpdateDiffBase {
 281                                project_id,
 282                                buffer_id,
 283                                diff_base,
 284                            }
 285                            .into_envelope(0, None, None),
 286                        )))
 287                    }
 288                }
 289            })
 290        })
 291        .detach_and_log_err(cx);
 292    }
 293
 294    fn open_local_buffer_internal(
 295        &mut self,
 296        path: Arc<Path>,
 297        worktree: Model<Worktree>,
 298        cx: &mut ModelContext<Self>,
 299    ) -> Task<Result<Model<Buffer>>> {
 300        let load_buffer = worktree.update(cx, |worktree, cx| {
 301            let load_file = worktree.load_file(path.as_ref(), cx);
 302            let reservation = cx.reserve_model();
 303            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 304            cx.spawn(move |_, mut cx| async move {
 305                let loaded = load_file.await?;
 306                let text_buffer = cx
 307                    .background_executor()
 308                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 309                    .await;
 310                cx.insert_model(reservation, |_| {
 311                    Buffer::build(
 312                        text_buffer,
 313                        loaded.diff_base,
 314                        Some(loaded.file),
 315                        Capability::ReadWrite,
 316                    )
 317                })
 318            })
 319        });
 320
 321        cx.spawn(move |this, mut cx| async move {
 322            let buffer = match load_buffer.await {
 323                Ok(buffer) => Ok(buffer),
 324                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 325                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 326                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 327                    Buffer::build(
 328                        text_buffer,
 329                        None,
 330                        Some(Arc::new(File {
 331                            worktree,
 332                            path,
 333                            mtime: None,
 334                            entry_id: None,
 335                            is_local: true,
 336                            is_deleted: false,
 337                            is_private: false,
 338                        })),
 339                        Capability::ReadWrite,
 340                    )
 341                }),
 342                Err(e) => Err(e),
 343            }?;
 344            this.update(&mut cx, |this, cx| {
 345                this.add_buffer(buffer.clone(), cx).log_err();
 346            })?;
 347            Ok(buffer)
 348        })
 349    }
 350
 351    fn open_remote_buffer_internal(
 352        &self,
 353        path: &Arc<Path>,
 354        worktree: &RemoteWorktree,
 355        cx: &ModelContext<Self>,
 356    ) -> Task<Result<Model<Buffer>>> {
 357        let worktree_id = worktree.id().to_proto();
 358        let project_id = worktree.project_id();
 359        let client = worktree.client();
 360        let path_string = path.clone().to_string_lossy().to_string();
 361        cx.spawn(move |this, mut cx| async move {
 362            let response = client
 363                .request(proto::OpenBufferByPath {
 364                    project_id,
 365                    worktree_id,
 366                    path: path_string,
 367                })
 368                .await?;
 369            let buffer_id = BufferId::new(response.buffer_id)?;
 370            this.update(&mut cx, |this, cx| {
 371                this.wait_for_remote_buffer(buffer_id, cx)
 372            })?
 373            .await
 374        })
 375    }
 376
 377    pub fn create_buffer(
 378        &mut self,
 379        remote_client: Option<(AnyProtoClient, u64)>,
 380        cx: &mut ModelContext<Self>,
 381    ) -> Task<Result<Model<Buffer>>> {
 382        if let Some((remote_client, project_id)) = remote_client {
 383            let create = remote_client.request(proto::OpenNewBuffer { project_id });
 384            cx.spawn(|this, mut cx| async move {
 385                let response = create.await?;
 386                let buffer_id = BufferId::new(response.buffer_id)?;
 387
 388                this.update(&mut cx, |this, cx| {
 389                    this.wait_for_remote_buffer(buffer_id, cx)
 390                })?
 391                .await
 392            })
 393        } else {
 394            Task::ready(Ok(self.create_local_buffer("", None, cx)))
 395        }
 396    }
 397
 398    pub fn create_local_buffer(
 399        &mut self,
 400        text: &str,
 401        language: Option<Arc<Language>>,
 402        cx: &mut ModelContext<Self>,
 403    ) -> Model<Buffer> {
 404        let buffer = cx.new_model(|cx| {
 405            Buffer::local(text, cx)
 406                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
 407        });
 408        self.add_buffer(buffer.clone(), cx).log_err();
 409        buffer
 410    }
 411
 412    pub fn save_buffer(
 413        &mut self,
 414        buffer: Model<Buffer>,
 415        cx: &mut ModelContext<Self>,
 416    ) -> Task<Result<()>> {
 417        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 418            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 419        };
 420        match file.worktree.read(cx) {
 421            Worktree::Local(_) => {
 422                self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
 423            }
 424            Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
 425        }
 426    }
 427
 428    pub fn save_buffer_as(
 429        &mut self,
 430        buffer: Model<Buffer>,
 431        path: ProjectPath,
 432        cx: &mut ModelContext<Self>,
 433    ) -> Task<Result<()>> {
 434        let Some(worktree) = self
 435            .worktree_store
 436            .read(cx)
 437            .worktree_for_id(path.worktree_id, cx)
 438        else {
 439            return Task::ready(Err(anyhow!("no such worktree")));
 440        };
 441
 442        let old_file = buffer.read(cx).file().cloned();
 443
 444        let task = match worktree.read(cx) {
 445            Worktree::Local(_) => {
 446                self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
 447            }
 448            Worktree::Remote(tree) => {
 449                self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
 450            }
 451        };
 452        cx.spawn(|this, mut cx| async move {
 453            task.await?;
 454            this.update(&mut cx, |_, cx| {
 455                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 456            })
 457        })
 458    }
 459
 460    fn save_local_buffer(
 461        &self,
 462        worktree: Model<Worktree>,
 463        buffer_handle: Model<Buffer>,
 464        path: Arc<Path>,
 465        mut has_changed_file: bool,
 466        cx: &mut ModelContext<Self>,
 467    ) -> Task<Result<()>> {
 468        let buffer = buffer_handle.read(cx);
 469        let text = buffer.as_rope().clone();
 470        let line_ending = buffer.line_ending();
 471        let version = buffer.version();
 472        let buffer_id = buffer.remote_id();
 473        if buffer.file().is_some_and(|file| !file.is_created()) {
 474            has_changed_file = true;
 475        }
 476
 477        let save = worktree.update(cx, |worktree, cx| {
 478            worktree.write_file(path.as_ref(), text, line_ending, cx)
 479        });
 480
 481        cx.spawn(move |this, mut cx| async move {
 482            let new_file = save.await?;
 483            let mtime = new_file.mtime;
 484            this.update(&mut cx, |this, cx| {
 485                if let Some(project_id) = this.remote_id {
 486                    if has_changed_file {
 487                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 488                            proto::UpdateBufferFile {
 489                                project_id,
 490                                buffer_id: buffer_id.to_proto(),
 491                                file: Some(language::File::to_proto(&*new_file, cx)),
 492                            }
 493                            .into_envelope(0, None, None),
 494                        )));
 495                    }
 496                    cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 497                        proto::BufferSaved {
 498                            project_id,
 499                            buffer_id: buffer_id.to_proto(),
 500                            version: serialize_version(&version),
 501                            mtime: mtime.map(|time| time.into()),
 502                        }
 503                        .into_envelope(0, None, None),
 504                    )));
 505                }
 506            })?;
 507            buffer_handle.update(&mut cx, |buffer, cx| {
 508                if has_changed_file {
 509                    buffer.file_updated(new_file, cx);
 510                }
 511                buffer.did_save(version.clone(), mtime, cx);
 512            })
 513        })
 514    }
 515
 516    fn save_remote_buffer(
 517        &self,
 518        buffer_handle: Model<Buffer>,
 519        new_path: Option<proto::ProjectPath>,
 520        tree: &RemoteWorktree,
 521        cx: &ModelContext<Self>,
 522    ) -> Task<Result<()>> {
 523        let buffer = buffer_handle.read(cx);
 524        let buffer_id = buffer.remote_id().into();
 525        let version = buffer.version();
 526        let rpc = tree.client();
 527        let project_id = tree.project_id();
 528        cx.spawn(move |_, mut cx| async move {
 529            let response = rpc
 530                .request(proto::SaveBuffer {
 531                    project_id,
 532                    buffer_id,
 533                    new_path,
 534                    version: serialize_version(&version),
 535                })
 536                .await?;
 537            let version = deserialize_version(&response.version);
 538            let mtime = response.mtime.map(|mtime| mtime.into());
 539
 540            buffer_handle.update(&mut cx, |buffer, cx| {
 541                buffer.did_save(version.clone(), mtime, cx);
 542            })?;
 543
 544            Ok(())
 545        })
 546    }
 547
 548    pub fn blame_buffer(
 549        &self,
 550        buffer: &Model<Buffer>,
 551        version: Option<clock::Global>,
 552        cx: &AppContext,
 553    ) -> Task<Result<Blame>> {
 554        let buffer = buffer.read(cx);
 555        let Some(file) = File::from_dyn(buffer.file()) else {
 556            return Task::ready(Err(anyhow!("buffer has no file")));
 557        };
 558
 559        match file.worktree.clone().read(cx) {
 560            Worktree::Local(worktree) => {
 561                let worktree = worktree.snapshot();
 562                let blame_params = maybe!({
 563                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
 564                        Some(repo_for_path) => repo_for_path,
 565                        None => anyhow::bail!(NoRepositoryError {}),
 566                    };
 567
 568                    let relative_path = repo_entry
 569                        .relativize(&worktree, &file.path)
 570                        .context("failed to relativize buffer path")?;
 571
 572                    let repo = local_repo_entry.repo().clone();
 573
 574                    let content = match version {
 575                        Some(version) => buffer.rope_for_version(&version).clone(),
 576                        None => buffer.as_rope().clone(),
 577                    };
 578
 579                    anyhow::Ok((repo, relative_path, content))
 580                });
 581
 582                cx.background_executor().spawn(async move {
 583                    let (repo, relative_path, content) = blame_params?;
 584                    repo.blame(&relative_path, content)
 585                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
 586                })
 587            }
 588            Worktree::Remote(worktree) => {
 589                let buffer_id = buffer.remote_id();
 590                let version = buffer.version();
 591                let project_id = worktree.project_id();
 592                let client = worktree.client();
 593                cx.spawn(|_| async move {
 594                    let response = client
 595                        .request(proto::BlameBuffer {
 596                            project_id,
 597                            buffer_id: buffer_id.into(),
 598                            version: serialize_version(&version),
 599                        })
 600                        .await?;
 601                    Ok(deserialize_blame_buffer_response(response))
 602                })
 603            }
 604        }
 605    }
 606
 607    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
 608        let remote_id = buffer.read(cx).remote_id();
 609        let is_remote = buffer.read(cx).replica_id() != 0;
 610        let open_buffer = if self.remote_id.is_some() {
 611            OpenBuffer::Strong(buffer.clone())
 612        } else {
 613            OpenBuffer::Weak(buffer.downgrade())
 614        };
 615
 616        match self.opened_buffers.entry(remote_id) {
 617            hash_map::Entry::Vacant(entry) => {
 618                entry.insert(open_buffer);
 619            }
 620            hash_map::Entry::Occupied(mut entry) => {
 621                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 622                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
 623                } else if entry.get().upgrade().is_some() {
 624                    if is_remote {
 625                        return Ok(());
 626                    } else {
 627                        debug_panic!("buffer {} was already registered", remote_id);
 628                        Err(anyhow!("buffer {} was already registered", remote_id))?;
 629                    }
 630                }
 631                entry.insert(open_buffer);
 632            }
 633        }
 634
 635        if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
 636            for sender in senders {
 637                sender.send(Ok(buffer.clone())).ok();
 638            }
 639        }
 640
 641        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 642            if file.is_local {
 643                self.local_buffer_ids_by_path.insert(
 644                    ProjectPath {
 645                        worktree_id: file.worktree_id(cx),
 646                        path: file.path.clone(),
 647                    },
 648                    remote_id,
 649                );
 650
 651                if let Some(entry_id) = file.entry_id {
 652                    self.local_buffer_ids_by_entry_id
 653                        .insert(entry_id, remote_id);
 654                }
 655            }
 656        }
 657
 658        cx.subscribe(&buffer, Self::on_buffer_event).detach();
 659        cx.emit(BufferStoreEvent::BufferAdded(buffer));
 660        Ok(())
 661    }
 662
 663    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
 664        self.opened_buffers
 665            .values()
 666            .filter_map(|buffer| buffer.upgrade())
 667    }
 668
 669    pub fn loading_buffers(
 670        &self,
 671    ) -> impl Iterator<
 672        Item = (
 673            &ProjectPath,
 674            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
 675        ),
 676    > {
 677        self.loading_buffers_by_path
 678            .iter()
 679            .map(|(path, rx)| (path, rx.clone()))
 680    }
 681
 682    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
 683        self.buffers().find_map(|buffer| {
 684            let file = File::from_dyn(buffer.read(cx).file())?;
 685            if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
 686                Some(buffer)
 687            } else {
 688                None
 689            }
 690        })
 691    }
 692
 693    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 694        self.opened_buffers
 695            .get(&buffer_id)
 696            .and_then(|buffer| buffer.upgrade())
 697    }
 698
 699    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
 700        self.get(buffer_id)
 701            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
 702    }
 703
 704    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 705        self.get(buffer_id)
 706            .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
 707    }
 708
 709    pub fn wait_for_remote_buffer(
 710        &mut self,
 711        id: BufferId,
 712        cx: &mut AppContext,
 713    ) -> Task<Result<Model<Buffer>>> {
 714        let buffer = self.get(id);
 715        if let Some(buffer) = buffer {
 716            return Task::ready(Ok(buffer));
 717        }
 718        let (tx, rx) = oneshot::channel();
 719        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 720        cx.background_executor().spawn(async move { rx.await? })
 721    }
 722
 723    pub fn buffer_version_info(
 724        &self,
 725        cx: &AppContext,
 726    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
 727        let buffers = self
 728            .buffers()
 729            .map(|buffer| {
 730                let buffer = buffer.read(cx);
 731                proto::BufferVersion {
 732                    id: buffer.remote_id().into(),
 733                    version: language::proto::serialize_version(&buffer.version),
 734                }
 735            })
 736            .collect();
 737        let incomplete_buffer_ids = self
 738            .loading_remote_buffers_by_id
 739            .keys()
 740            .copied()
 741            .collect::<Vec<_>>();
 742        (buffers, incomplete_buffer_ids)
 743    }
 744
 745    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 746        self.set_remote_id(None, cx);
 747
 748        for buffer in self.buffers() {
 749            buffer.update(cx, |buffer, cx| {
 750                buffer.set_capability(Capability::ReadOnly, cx)
 751            });
 752        }
 753
 754        // Wake up all futures currently waiting on a buffer to get opened,
 755        // to give them a chance to fail now that we've disconnected.
 756        self.remote_buffer_listeners.clear();
 757    }
 758
 759    pub fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
 760        self.remote_id = remote_id;
 761        for open_buffer in self.opened_buffers.values_mut() {
 762            if remote_id.is_some() {
 763                if let OpenBuffer::Weak(buffer) = open_buffer {
 764                    if let Some(buffer) = buffer.upgrade() {
 765                        *open_buffer = OpenBuffer::Strong(buffer);
 766                    }
 767                }
 768            } else {
 769                if let Some(buffer) = open_buffer.upgrade() {
 770                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
 771                }
 772                if let OpenBuffer::Strong(buffer) = open_buffer {
 773                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
 774                }
 775            }
 776        }
 777    }
 778
 779    pub fn discard_incomplete(&mut self) {
 780        self.opened_buffers
 781            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
 782    }
 783
 784    pub fn find_search_candidates(
 785        &mut self,
 786        query: &SearchQuery,
 787        mut limit: usize,
 788        fs: Arc<dyn Fs>,
 789        cx: &mut ModelContext<Self>,
 790    ) -> Receiver<Model<Buffer>> {
 791        let (tx, rx) = smol::channel::unbounded();
 792        let mut open_buffers = HashSet::default();
 793        let mut unnamed_buffers = Vec::new();
 794        for handle in self.buffers() {
 795            let buffer = handle.read(cx);
 796            if let Some(entry_id) = buffer.entry_id(cx) {
 797                open_buffers.insert(entry_id);
 798            } else {
 799                limit = limit.saturating_sub(1);
 800                unnamed_buffers.push(handle)
 801            };
 802        }
 803
 804        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 805        let mut project_paths_rx = self
 806            .worktree_store
 807            .update(cx, |worktree_store, cx| {
 808                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
 809            })
 810            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
 811
 812        cx.spawn(|this, mut cx| async move {
 813            for buffer in unnamed_buffers {
 814                tx.send(buffer).await.ok();
 815            }
 816
 817            while let Some(project_paths) = project_paths_rx.next().await {
 818                let buffers = this.update(&mut cx, |this, cx| {
 819                    project_paths
 820                        .into_iter()
 821                        .map(|project_path| this.open_buffer(project_path, cx))
 822                        .collect::<Vec<_>>()
 823                })?;
 824                for buffer_task in buffers {
 825                    if let Some(buffer) = buffer_task.await.log_err() {
 826                        if tx.send(buffer).await.is_err() {
 827                            return anyhow::Ok(());
 828                        }
 829                    }
 830                }
 831            }
 832            anyhow::Ok(())
 833        })
 834        .detach();
 835        rx
 836    }
 837
 838    fn on_buffer_event(
 839        &mut self,
 840        buffer: Model<Buffer>,
 841        event: &BufferEvent,
 842        cx: &mut ModelContext<Self>,
 843    ) {
 844        match event {
 845            BufferEvent::FileHandleChanged => {
 846                self.buffer_changed_file(buffer, cx);
 847            }
 848            _ => {}
 849        }
 850    }
 851
 852    fn local_worktree_entry_changed(
 853        &mut self,
 854        entry_id: ProjectEntryId,
 855        path: &Arc<Path>,
 856        worktree: &Model<worktree::Worktree>,
 857        snapshot: &worktree::Snapshot,
 858        cx: &mut ModelContext<Self>,
 859    ) -> Option<()> {
 860        let project_path = ProjectPath {
 861            worktree_id: snapshot.id(),
 862            path: path.clone(),
 863        };
 864        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 865            Some(&buffer_id) => buffer_id,
 866            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
 867        };
 868        let buffer = if let Some(buffer) = self.get(buffer_id) {
 869            buffer
 870        } else {
 871            self.opened_buffers.remove(&buffer_id);
 872            self.local_buffer_ids_by_path.remove(&project_path);
 873            self.local_buffer_ids_by_entry_id.remove(&entry_id);
 874            return None;
 875        };
 876
 877        let events = buffer.update(cx, |buffer, cx| {
 878            let file = buffer.file()?;
 879            let old_file = File::from_dyn(Some(file))?;
 880            if old_file.worktree != *worktree {
 881                return None;
 882            }
 883
 884            let new_file = if let Some(entry) = old_file
 885                .entry_id
 886                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 887            {
 888                File {
 889                    is_local: true,
 890                    entry_id: Some(entry.id),
 891                    mtime: entry.mtime,
 892                    path: entry.path.clone(),
 893                    worktree: worktree.clone(),
 894                    is_deleted: false,
 895                    is_private: entry.is_private,
 896                }
 897            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
 898                File {
 899                    is_local: true,
 900                    entry_id: Some(entry.id),
 901                    mtime: entry.mtime,
 902                    path: entry.path.clone(),
 903                    worktree: worktree.clone(),
 904                    is_deleted: false,
 905                    is_private: entry.is_private,
 906                }
 907            } else {
 908                File {
 909                    is_local: true,
 910                    entry_id: old_file.entry_id,
 911                    path: old_file.path.clone(),
 912                    mtime: old_file.mtime,
 913                    worktree: worktree.clone(),
 914                    is_deleted: true,
 915                    is_private: old_file.is_private,
 916                }
 917            };
 918
 919            if new_file == *old_file {
 920                return None;
 921            }
 922
 923            let mut events = Vec::new();
 924            if new_file.path != old_file.path {
 925                self.local_buffer_ids_by_path.remove(&ProjectPath {
 926                    path: old_file.path.clone(),
 927                    worktree_id: old_file.worktree_id(cx),
 928                });
 929                self.local_buffer_ids_by_path.insert(
 930                    ProjectPath {
 931                        worktree_id: new_file.worktree_id(cx),
 932                        path: new_file.path.clone(),
 933                    },
 934                    buffer_id,
 935                );
 936                events.push(BufferStoreEvent::BufferChangedFilePath {
 937                    buffer: cx.handle(),
 938                    old_file: buffer.file().cloned(),
 939                });
 940            }
 941
 942            if new_file.entry_id != old_file.entry_id {
 943                if let Some(entry_id) = old_file.entry_id {
 944                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
 945                }
 946                if let Some(entry_id) = new_file.entry_id {
 947                    self.local_buffer_ids_by_entry_id
 948                        .insert(entry_id, buffer_id);
 949                }
 950            }
 951
 952            if let Some(project_id) = self.remote_id {
 953                events.push(BufferStoreEvent::MessageToReplicas(Box::new(
 954                    proto::UpdateBufferFile {
 955                        project_id,
 956                        buffer_id: buffer_id.to_proto(),
 957                        file: Some(new_file.to_proto(cx)),
 958                    }
 959                    .into_envelope(0, None, None),
 960                )))
 961            }
 962
 963            buffer.file_updated(Arc::new(new_file), cx);
 964            Some(events)
 965        })?;
 966
 967        for event in events {
 968            cx.emit(event);
 969        }
 970
 971        None
 972    }
 973
 974    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
 975        let file = File::from_dyn(buffer.read(cx).file())?;
 976
 977        let remote_id = buffer.read(cx).remote_id();
 978        if let Some(entry_id) = file.entry_id {
 979            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 980                Some(_) => {
 981                    return None;
 982                }
 983                None => {
 984                    self.local_buffer_ids_by_entry_id
 985                        .insert(entry_id, remote_id);
 986                }
 987            }
 988        };
 989        self.local_buffer_ids_by_path.insert(
 990            ProjectPath {
 991                worktree_id: file.worktree_id(cx),
 992                path: file.path.clone(),
 993            },
 994            remote_id,
 995        );
 996
 997        Some(())
 998    }
 999
1000    pub async fn create_buffer_for_peer(
1001        this: Model<Self>,
1002        peer_id: PeerId,
1003        buffer_id: BufferId,
1004        project_id: u64,
1005        client: AnyProtoClient,
1006        cx: &mut AsyncAppContext,
1007    ) -> Result<()> {
1008        let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
1009            return Ok(());
1010        };
1011
1012        let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
1013        let operations = operations.await;
1014        let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
1015
1016        let initial_state = proto::CreateBufferForPeer {
1017            project_id,
1018            peer_id: Some(peer_id),
1019            variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1020        };
1021
1022        if client.send(initial_state).log_err().is_some() {
1023            let client = client.clone();
1024            cx.background_executor()
1025                .spawn(async move {
1026                    let mut chunks = split_operations(operations).peekable();
1027                    while let Some(chunk) = chunks.next() {
1028                        let is_last = chunks.peek().is_none();
1029                        client.send(proto::CreateBufferForPeer {
1030                            project_id,
1031                            peer_id: Some(peer_id),
1032                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1033                                proto::BufferChunk {
1034                                    buffer_id: buffer_id.into(),
1035                                    operations: chunk,
1036                                    is_last,
1037                                },
1038                            )),
1039                        })?;
1040                    }
1041                    anyhow::Ok(())
1042                })
1043                .await
1044                .log_err();
1045        }
1046        Ok(())
1047    }
1048
1049    pub async fn handle_update_buffer(
1050        this: Model<Self>,
1051        envelope: TypedEnvelope<proto::UpdateBuffer>,
1052        mut cx: AsyncAppContext,
1053    ) -> Result<proto::Ack> {
1054        let payload = envelope.payload.clone();
1055        let buffer_id = BufferId::new(payload.buffer_id)?;
1056        let ops = payload
1057            .operations
1058            .into_iter()
1059            .map(language::proto::deserialize_operation)
1060            .collect::<Result<Vec<_>, _>>()?;
1061        this.update(&mut cx, |this, cx| {
1062            match this.opened_buffers.entry(buffer_id) {
1063                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1064                    OpenBuffer::Strong(buffer) => {
1065                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1066                    }
1067                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1068                    OpenBuffer::Weak(_) => {}
1069                },
1070                hash_map::Entry::Vacant(e) => {
1071                    e.insert(OpenBuffer::Operations(ops));
1072                }
1073            }
1074            Ok(proto::Ack {})
1075        })?
1076    }
1077
1078    pub fn handle_create_buffer_for_peer(
1079        &mut self,
1080        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1081        replica_id: u16,
1082        capability: Capability,
1083        cx: &mut ModelContext<Self>,
1084    ) -> Result<()> {
1085        match envelope
1086            .payload
1087            .variant
1088            .ok_or_else(|| anyhow!("missing variant"))?
1089        {
1090            proto::create_buffer_for_peer::Variant::State(mut state) => {
1091                let buffer_id = BufferId::new(state.id)?;
1092
1093                let buffer_result = maybe!({
1094                    let mut buffer_file = None;
1095                    if let Some(file) = state.file.take() {
1096                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1097                        let worktree = self
1098                            .worktree_store
1099                            .read(cx)
1100                            .worktree_for_id(worktree_id, cx)
1101                            .ok_or_else(|| {
1102                                anyhow!("no worktree found for id {}", file.worktree_id)
1103                            })?;
1104                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1105                            as Arc<dyn language::File>);
1106                    }
1107                    Buffer::from_proto(replica_id, capability, state, buffer_file)
1108                });
1109
1110                match buffer_result {
1111                    Ok(buffer) => {
1112                        let buffer = cx.new_model(|_| buffer);
1113                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1114                    }
1115                    Err(error) => {
1116                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1117                            for listener in listeners {
1118                                listener.send(Err(anyhow!(error.cloned()))).ok();
1119                            }
1120                        }
1121                    }
1122                }
1123            }
1124            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1125                let buffer_id = BufferId::new(chunk.buffer_id)?;
1126                let buffer = self
1127                    .loading_remote_buffers_by_id
1128                    .get(&buffer_id)
1129                    .cloned()
1130                    .ok_or_else(|| {
1131                        anyhow!(
1132                            "received chunk for buffer {} without initial state",
1133                            chunk.buffer_id
1134                        )
1135                    })?;
1136
1137                let result = maybe!({
1138                    let operations = chunk
1139                        .operations
1140                        .into_iter()
1141                        .map(language::proto::deserialize_operation)
1142                        .collect::<Result<Vec<_>>>()?;
1143                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1144                });
1145
1146                if let Err(error) = result {
1147                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1148                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1149                        for listener in listeners {
1150                            listener.send(Err(error.cloned())).ok();
1151                        }
1152                    }
1153                } else if chunk.is_last {
1154                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1155                    self.add_buffer(buffer, cx)?;
1156                }
1157            }
1158        }
1159
1160        Ok(())
1161    }
1162
1163    pub async fn handle_update_buffer_file(
1164        this: Model<Self>,
1165        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1166        mut cx: AsyncAppContext,
1167    ) -> Result<()> {
1168        let buffer_id = envelope.payload.buffer_id;
1169        let buffer_id = BufferId::new(buffer_id)?;
1170
1171        this.update(&mut cx, |this, cx| {
1172            let payload = envelope.payload.clone();
1173            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1174                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1175                let worktree = this
1176                    .worktree_store
1177                    .read(cx)
1178                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1179                    .ok_or_else(|| anyhow!("no such worktree"))?;
1180                let file = File::from_proto(file, worktree, cx)?;
1181                let old_file = buffer.update(cx, |buffer, cx| {
1182                    let old_file = buffer.file().cloned();
1183                    let new_path = file.path.clone();
1184                    buffer.file_updated(Arc::new(file), cx);
1185                    if old_file
1186                        .as_ref()
1187                        .map_or(true, |old| *old.path() != new_path)
1188                    {
1189                        Some(old_file)
1190                    } else {
1191                        None
1192                    }
1193                });
1194                if let Some(old_file) = old_file {
1195                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1196                }
1197            }
1198            Ok(())
1199        })?
1200    }
1201
1202    pub async fn handle_update_diff_base(
1203        this: Model<Self>,
1204        envelope: TypedEnvelope<proto::UpdateDiffBase>,
1205        mut cx: AsyncAppContext,
1206    ) -> Result<()> {
1207        this.update(&mut cx, |this, cx| {
1208            let buffer_id = envelope.payload.buffer_id;
1209            let buffer_id = BufferId::new(buffer_id)?;
1210            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1211                buffer.update(cx, |buffer, cx| {
1212                    buffer.set_diff_base(envelope.payload.diff_base, cx)
1213                });
1214            }
1215            Ok(())
1216        })?
1217    }
1218
1219    pub async fn handle_save_buffer(
1220        this: Model<Self>,
1221        envelope: TypedEnvelope<proto::SaveBuffer>,
1222        mut cx: AsyncAppContext,
1223    ) -> Result<proto::BufferSaved> {
1224        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1225        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1226            anyhow::Ok((
1227                this.get_existing(buffer_id)?,
1228                this.remote_id.context("project is not shared")?,
1229            ))
1230        })??;
1231        buffer
1232            .update(&mut cx, |buffer, _| {
1233                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1234            })?
1235            .await?;
1236        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1237
1238        if let Some(new_path) = envelope.payload.new_path {
1239            let new_path = ProjectPath::from_proto(new_path);
1240            this.update(&mut cx, |this, cx| {
1241                this.save_buffer_as(buffer.clone(), new_path, cx)
1242            })?
1243            .await?;
1244        } else {
1245            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1246                .await?;
1247        }
1248
1249        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1250            project_id,
1251            buffer_id: buffer_id.into(),
1252            version: serialize_version(buffer.saved_version()),
1253            mtime: buffer.saved_mtime().map(|time| time.into()),
1254        })
1255    }
1256
1257    pub async fn handle_buffer_saved(
1258        this: Model<Self>,
1259        envelope: TypedEnvelope<proto::BufferSaved>,
1260        mut cx: AsyncAppContext,
1261    ) -> Result<()> {
1262        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1263        let version = deserialize_version(&envelope.payload.version);
1264        let mtime = envelope.payload.mtime.map(|time| time.into());
1265        this.update(&mut cx, |this, cx| {
1266            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1267                buffer.update(cx, |buffer, cx| {
1268                    buffer.did_save(version, mtime, cx);
1269                });
1270            }
1271        })
1272    }
1273
1274    pub async fn handle_buffer_reloaded(
1275        this: Model<Self>,
1276        envelope: TypedEnvelope<proto::BufferReloaded>,
1277        mut cx: AsyncAppContext,
1278    ) -> Result<()> {
1279        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1280        let version = deserialize_version(&envelope.payload.version);
1281        let mtime = envelope.payload.mtime.map(|time| time.into());
1282        let line_ending = deserialize_line_ending(
1283            proto::LineEnding::from_i32(envelope.payload.line_ending)
1284                .ok_or_else(|| anyhow!("missing line ending"))?,
1285        );
1286        this.update(&mut cx, |this, cx| {
1287            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1288                buffer.update(cx, |buffer, cx| {
1289                    buffer.did_reload(version, line_ending, mtime, cx);
1290                });
1291            }
1292        })
1293    }
1294
1295    pub async fn handle_blame_buffer(
1296        this: Model<Self>,
1297        envelope: TypedEnvelope<proto::BlameBuffer>,
1298        mut cx: AsyncAppContext,
1299    ) -> Result<proto::BlameBufferResponse> {
1300        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1301        let version = deserialize_version(&envelope.payload.version);
1302        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1303        buffer
1304            .update(&mut cx, |buffer, _| {
1305                buffer.wait_for_version(version.clone())
1306            })?
1307            .await?;
1308        let blame = this
1309            .update(&mut cx, |this, cx| {
1310                this.blame_buffer(&buffer, Some(version), cx)
1311            })?
1312            .await?;
1313        Ok(serialize_blame_buffer_response(blame))
1314    }
1315
1316    pub async fn wait_for_loading_buffer(
1317        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1318    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1319        loop {
1320            if let Some(result) = receiver.borrow().as_ref() {
1321                match result {
1322                    Ok(buffer) => return Ok(buffer.to_owned()),
1323                    Err(e) => return Err(e.to_owned()),
1324                }
1325            }
1326            receiver.next().await;
1327        }
1328    }
1329}
1330
1331impl OpenBuffer {
1332    fn upgrade(&self) -> Option<Model<Buffer>> {
1333        match self {
1334            OpenBuffer::Strong(handle) => Some(handle.clone()),
1335            OpenBuffer::Weak(handle) => handle.upgrade(),
1336            OpenBuffer::Operations(_) => None,
1337        }
1338    }
1339}
1340
1341fn is_not_found_error(error: &anyhow::Error) -> bool {
1342    error
1343        .root_cause()
1344        .downcast_ref::<io::Error>()
1345        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1346}
1347
1348fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1349    let entries = blame
1350        .entries
1351        .into_iter()
1352        .map(|entry| proto::BlameEntry {
1353            sha: entry.sha.as_bytes().into(),
1354            start_line: entry.range.start,
1355            end_line: entry.range.end,
1356            original_line_number: entry.original_line_number,
1357            author: entry.author.clone(),
1358            author_mail: entry.author_mail.clone(),
1359            author_time: entry.author_time,
1360            author_tz: entry.author_tz.clone(),
1361            committer: entry.committer.clone(),
1362            committer_mail: entry.committer_mail.clone(),
1363            committer_time: entry.committer_time,
1364            committer_tz: entry.committer_tz.clone(),
1365            summary: entry.summary.clone(),
1366            previous: entry.previous.clone(),
1367            filename: entry.filename.clone(),
1368        })
1369        .collect::<Vec<_>>();
1370
1371    let messages = blame
1372        .messages
1373        .into_iter()
1374        .map(|(oid, message)| proto::CommitMessage {
1375            oid: oid.as_bytes().into(),
1376            message,
1377        })
1378        .collect::<Vec<_>>();
1379
1380    let permalinks = blame
1381        .permalinks
1382        .into_iter()
1383        .map(|(oid, url)| proto::CommitPermalink {
1384            oid: oid.as_bytes().into(),
1385            permalink: url.to_string(),
1386        })
1387        .collect::<Vec<_>>();
1388
1389    proto::BlameBufferResponse {
1390        entries,
1391        messages,
1392        permalinks,
1393        remote_url: blame.remote_url,
1394    }
1395}
1396
1397fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1398    let entries = response
1399        .entries
1400        .into_iter()
1401        .filter_map(|entry| {
1402            Some(git::blame::BlameEntry {
1403                sha: git::Oid::from_bytes(&entry.sha).ok()?,
1404                range: entry.start_line..entry.end_line,
1405                original_line_number: entry.original_line_number,
1406                committer: entry.committer,
1407                committer_time: entry.committer_time,
1408                committer_tz: entry.committer_tz,
1409                committer_mail: entry.committer_mail,
1410                author: entry.author,
1411                author_mail: entry.author_mail,
1412                author_time: entry.author_time,
1413                author_tz: entry.author_tz,
1414                summary: entry.summary,
1415                previous: entry.previous,
1416                filename: entry.filename,
1417            })
1418        })
1419        .collect::<Vec<_>>();
1420
1421    let messages = response
1422        .messages
1423        .into_iter()
1424        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1425        .collect::<HashMap<_, _>>();
1426
1427    let permalinks = response
1428        .permalinks
1429        .into_iter()
1430        .filter_map(|permalink| {
1431            Some((
1432                git::Oid::from_bytes(&permalink.oid).ok()?,
1433                Url::from_str(&permalink.permalink).ok()?,
1434            ))
1435        })
1436        .collect::<HashMap<_, _>>();
1437
1438    Blame {
1439        entries,
1440        permalinks,
1441        messages,
1442        remote_url: response.remote_url,
1443    }
1444}