buffer_store.rs

   1use crate::{
   2    search::SearchQuery,
   3    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   4    Item, NoRepositoryError, ProjectPath,
   5};
   6use anyhow::{anyhow, Context as _, Result};
   7use collections::{hash_map, HashMap, HashSet};
   8use fs::Fs;
   9use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
  10use git::blame::Blame;
  11use gpui::{
  12    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
  13};
  14use http_client::Url;
  15use language::{
  16    proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
  17    Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
  18};
  19use rpc::{
  20    proto::{self, AnyProtoClient, EnvelopedMessage, PeerId},
  21    ErrorExt as _, TypedEnvelope,
  22};
  23use smol::channel::Receiver;
  24use std::{io, path::Path, str::FromStr as _, sync::Arc};
  25use text::BufferId;
  26use util::{debug_panic, maybe, ResultExt as _};
  27use worktree::{
  28    File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
  29    WorktreeId,
  30};
  31
  32/// A set of open buffers.
  33pub struct BufferStore {
  34    remote_id: Option<u64>,
  35    #[allow(unused)]
  36    worktree_store: Model<WorktreeStore>,
  37    opened_buffers: HashMap<BufferId, OpenBuffer>,
  38    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  39    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  40    #[allow(clippy::type_complexity)]
  41    loading_buffers_by_path: HashMap<
  42        ProjectPath,
  43        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  44    >,
  45    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  46    remote_buffer_listeners:
  47        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  48}
  49
  50enum OpenBuffer {
  51    Strong(Model<Buffer>),
  52    Weak(WeakModel<Buffer>),
  53    Operations(Vec<Operation>),
  54}
  55
  56pub enum BufferStoreEvent {
  57    BufferAdded(Model<Buffer>),
  58    BufferChangedFilePath {
  59        buffer: Model<Buffer>,
  60        old_file: Option<Arc<dyn language::File>>,
  61    },
  62    MessageToReplicas(Box<proto::Envelope>),
  63}
  64
  65impl EventEmitter<BufferStoreEvent> for BufferStore {}
  66
  67impl BufferStore {
  68    /// Creates a buffer store, optionally retaining its buffers.
  69    ///
  70    /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
  71    /// and won't be released unless they are explicitly removed, or `retain_buffers`
  72    /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
  73    /// weak handles.
  74    pub fn new(
  75        worktree_store: Model<WorktreeStore>,
  76        remote_id: Option<u64>,
  77        cx: &mut ModelContext<Self>,
  78    ) -> Self {
  79        cx.subscribe(&worktree_store, |this, _, event, cx| match event {
  80            WorktreeStoreEvent::WorktreeAdded(worktree) => {
  81                this.subscribe_to_worktree(worktree, cx);
  82            }
  83            _ => {}
  84        })
  85        .detach();
  86
  87        Self {
  88            remote_id,
  89            worktree_store,
  90            opened_buffers: Default::default(),
  91            remote_buffer_listeners: Default::default(),
  92            loading_remote_buffers_by_id: Default::default(),
  93            local_buffer_ids_by_path: Default::default(),
  94            local_buffer_ids_by_entry_id: Default::default(),
  95            loading_buffers_by_path: Default::default(),
  96        }
  97    }
  98
  99    pub fn open_buffer(
 100        &mut self,
 101        project_path: ProjectPath,
 102        cx: &mut ModelContext<Self>,
 103    ) -> Task<Result<Model<Buffer>>> {
 104        let existing_buffer = self.get_by_path(&project_path, cx);
 105        if let Some(existing_buffer) = existing_buffer {
 106            return Task::ready(Ok(existing_buffer));
 107        }
 108
 109        let Some(worktree) = self
 110            .worktree_store
 111            .read(cx)
 112            .worktree_for_id(project_path.worktree_id, cx)
 113        else {
 114            return Task::ready(Err(anyhow!("no such worktree")));
 115        };
 116
 117        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
 118            // If the given path is already being loaded, then wait for that existing
 119            // task to complete and return the same buffer.
 120            hash_map::Entry::Occupied(e) => e.get().clone(),
 121
 122            // Otherwise, record the fact that this path is now being loaded.
 123            hash_map::Entry::Vacant(entry) => {
 124                let (mut tx, rx) = postage::watch::channel();
 125                entry.insert(rx.clone());
 126
 127                let project_path = project_path.clone();
 128                let load_buffer = match worktree.read(cx) {
 129                    Worktree::Local(_) => {
 130                        self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
 131                    }
 132                    Worktree::Remote(tree) => {
 133                        self.open_remote_buffer_internal(&project_path.path, tree, cx)
 134                    }
 135                };
 136
 137                cx.spawn(move |this, mut cx| async move {
 138                    let load_result = load_buffer.await;
 139                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
 140                        // Record the fact that the buffer is no longer loading.
 141                        this.loading_buffers_by_path.remove(&project_path);
 142                        let buffer = load_result.map_err(Arc::new)?;
 143                        Ok(buffer)
 144                    })?);
 145                    anyhow::Ok(())
 146                })
 147                .detach();
 148                rx
 149            }
 150        };
 151
 152        cx.background_executor().spawn(async move {
 153            Self::wait_for_loading_buffer(loading_watch)
 154                .await
 155                .map_err(|e| e.cloned())
 156        })
 157    }
 158
 159    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 160        cx.subscribe(worktree, |this, worktree, event, cx| {
 161            if worktree.read(cx).is_local() {
 162                match event {
 163                    worktree::Event::UpdatedEntries(changes) => {
 164                        this.local_worktree_entries_changed(&worktree, changes, cx);
 165                    }
 166                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 167                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
 168                    }
 169                    _ => {}
 170                }
 171            }
 172        })
 173        .detach();
 174    }
 175
 176    fn local_worktree_entries_changed(
 177        &mut self,
 178        worktree_handle: &Model<Worktree>,
 179        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 180        cx: &mut ModelContext<Self>,
 181    ) {
 182        let snapshot = worktree_handle.read(cx).snapshot();
 183        for (path, entry_id, _) in changes {
 184            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
 185        }
 186    }
 187
 188    fn local_worktree_git_repos_changed(
 189        &mut self,
 190        worktree_handle: Model<Worktree>,
 191        changed_repos: &UpdatedGitRepositoriesSet,
 192        cx: &mut ModelContext<Self>,
 193    ) {
 194        debug_assert!(worktree_handle.read(cx).is_local());
 195
 196        // Identify the loading buffers whose containing repository that has changed.
 197        let future_buffers = self
 198            .loading_buffers()
 199            .filter_map(|(project_path, receiver)| {
 200                if project_path.worktree_id != worktree_handle.read(cx).id() {
 201                    return None;
 202                }
 203                let path = &project_path.path;
 204                changed_repos
 205                    .iter()
 206                    .find(|(work_dir, _)| path.starts_with(work_dir))?;
 207                let path = path.clone();
 208                Some(async move {
 209                    Self::wait_for_loading_buffer(receiver)
 210                        .await
 211                        .ok()
 212                        .map(|buffer| (buffer, path))
 213                })
 214            })
 215            .collect::<FuturesUnordered<_>>();
 216
 217        // Identify the current buffers whose containing repository has changed.
 218        let current_buffers = self
 219            .buffers()
 220            .filter_map(|buffer| {
 221                let file = File::from_dyn(buffer.read(cx).file())?;
 222                if file.worktree != worktree_handle {
 223                    return None;
 224                }
 225                changed_repos
 226                    .iter()
 227                    .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 228                Some((buffer, file.path.clone()))
 229            })
 230            .collect::<Vec<_>>();
 231
 232        if future_buffers.len() + current_buffers.len() == 0 {
 233            return;
 234        }
 235
 236        cx.spawn(move |this, mut cx| async move {
 237            // Wait for all of the buffers to load.
 238            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 239
 240            // Reload the diff base for every buffer whose containing git repository has changed.
 241            let snapshot =
 242                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 243            let diff_bases_by_buffer = cx
 244                .background_executor()
 245                .spawn(async move {
 246                    let mut diff_base_tasks = future_buffers
 247                        .into_iter()
 248                        .flatten()
 249                        .chain(current_buffers)
 250                        .filter_map(|(buffer, path)| {
 251                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 252                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 253                            Some(async move {
 254                                let base_text =
 255                                    local_repo_entry.repo().load_index_text(&relative_path);
 256                                Some((buffer, base_text))
 257                            })
 258                        })
 259                        .collect::<FuturesUnordered<_>>();
 260
 261                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 262                    while let Some(diff_base) = diff_base_tasks.next().await {
 263                        if let Some(diff_base) = diff_base {
 264                            diff_bases.push(diff_base);
 265                        }
 266                    }
 267                    diff_bases
 268                })
 269                .await;
 270
 271            this.update(&mut cx, |this, cx| {
 272                // Assign the new diff bases on all of the buffers.
 273                for (buffer, diff_base) in diff_bases_by_buffer {
 274                    let buffer_id = buffer.update(cx, |buffer, cx| {
 275                        buffer.set_diff_base(diff_base.clone(), cx);
 276                        buffer.remote_id().to_proto()
 277                    });
 278                    if let Some(project_id) = this.remote_id {
 279                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 280                            proto::UpdateDiffBase {
 281                                project_id,
 282                                buffer_id,
 283                                diff_base,
 284                            }
 285                            .into_envelope(0, None, None),
 286                        )))
 287                    }
 288                }
 289            })
 290        })
 291        .detach_and_log_err(cx);
 292    }
 293
 294    fn open_local_buffer_internal(
 295        &mut self,
 296        path: Arc<Path>,
 297        worktree: Model<Worktree>,
 298        cx: &mut ModelContext<Self>,
 299    ) -> Task<Result<Model<Buffer>>> {
 300        let load_buffer = worktree.update(cx, |worktree, cx| {
 301            let load_file = worktree.load_file(path.as_ref(), cx);
 302            let reservation = cx.reserve_model();
 303            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 304            cx.spawn(move |_, mut cx| async move {
 305                let loaded = load_file.await?;
 306                let text_buffer = cx
 307                    .background_executor()
 308                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 309                    .await;
 310                cx.insert_model(reservation, |_| {
 311                    Buffer::build(
 312                        text_buffer,
 313                        loaded.diff_base,
 314                        Some(loaded.file),
 315                        Capability::ReadWrite,
 316                    )
 317                })
 318            })
 319        });
 320
 321        cx.spawn(move |this, mut cx| async move {
 322            let buffer = match load_buffer.await {
 323                Ok(buffer) => Ok(buffer),
 324                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 325                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 326                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 327                    Buffer::build(
 328                        text_buffer,
 329                        None,
 330                        Some(Arc::new(File {
 331                            worktree,
 332                            path,
 333                            mtime: None,
 334                            entry_id: None,
 335                            is_local: true,
 336                            is_deleted: false,
 337                            is_private: false,
 338                        })),
 339                        Capability::ReadWrite,
 340                    )
 341                }),
 342                Err(e) => Err(e),
 343            }?;
 344            this.update(&mut cx, |this, cx| {
 345                this.add_buffer(buffer.clone(), cx).log_err();
 346            })?;
 347            Ok(buffer)
 348        })
 349    }
 350
 351    fn open_remote_buffer_internal(
 352        &self,
 353        path: &Arc<Path>,
 354        worktree: &RemoteWorktree,
 355        cx: &ModelContext<Self>,
 356    ) -> Task<Result<Model<Buffer>>> {
 357        let worktree_id = worktree.id().to_proto();
 358        let project_id = worktree.project_id();
 359        let client = worktree.client();
 360        let path_string = path.clone().to_string_lossy().to_string();
 361        cx.spawn(move |this, mut cx| async move {
 362            let response = client
 363                .request(proto::OpenBufferByPath {
 364                    project_id,
 365                    worktree_id,
 366                    path: path_string,
 367                })
 368                .await?;
 369            let buffer_id = BufferId::new(response.buffer_id)?;
 370            this.update(&mut cx, |this, cx| {
 371                this.wait_for_remote_buffer(buffer_id, cx)
 372            })?
 373            .await
 374        })
 375    }
 376
 377    pub fn create_buffer(
 378        &mut self,
 379        remote_client: Option<(AnyProtoClient, u64)>,
 380        cx: &mut ModelContext<Self>,
 381    ) -> Task<Result<Model<Buffer>>> {
 382        if let Some((remote_client, project_id)) = remote_client {
 383            let create = remote_client.request(proto::OpenNewBuffer { project_id });
 384            cx.spawn(|this, mut cx| async move {
 385                let response = create.await?;
 386                let buffer_id = BufferId::new(response.buffer_id)?;
 387
 388                this.update(&mut cx, |this, cx| {
 389                    this.wait_for_remote_buffer(buffer_id, cx)
 390                })?
 391                .await
 392            })
 393        } else {
 394            Task::ready(Ok(self.create_local_buffer("", None, cx)))
 395        }
 396    }
 397
 398    pub fn create_local_buffer(
 399        &mut self,
 400        text: &str,
 401        language: Option<Arc<Language>>,
 402        cx: &mut ModelContext<Self>,
 403    ) -> Model<Buffer> {
 404        let buffer = cx.new_model(|cx| {
 405            Buffer::local(text, cx)
 406                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
 407        });
 408        self.add_buffer(buffer.clone(), cx).log_err();
 409        buffer
 410    }
 411
 412    pub fn save_buffer(
 413        &mut self,
 414        buffer: Model<Buffer>,
 415        cx: &mut ModelContext<Self>,
 416    ) -> Task<Result<()>> {
 417        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 418            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 419        };
 420        match file.worktree.read(cx) {
 421            Worktree::Local(_) => {
 422                self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
 423            }
 424            Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
 425        }
 426    }
 427
 428    pub fn save_buffer_as(
 429        &mut self,
 430        buffer: Model<Buffer>,
 431        path: ProjectPath,
 432        cx: &mut ModelContext<Self>,
 433    ) -> Task<Result<()>> {
 434        let Some(worktree) = self
 435            .worktree_store
 436            .read(cx)
 437            .worktree_for_id(path.worktree_id, cx)
 438        else {
 439            return Task::ready(Err(anyhow!("no such worktree")));
 440        };
 441
 442        let old_file = buffer.read(cx).file().cloned();
 443
 444        let task = match worktree.read(cx) {
 445            Worktree::Local(_) => {
 446                self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
 447            }
 448            Worktree::Remote(tree) => {
 449                self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
 450            }
 451        };
 452        cx.spawn(|this, mut cx| async move {
 453            task.await?;
 454            this.update(&mut cx, |_, cx| {
 455                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 456            })
 457        })
 458    }
 459
 460    fn save_local_buffer(
 461        &self,
 462        worktree: Model<Worktree>,
 463        buffer_handle: Model<Buffer>,
 464        path: Arc<Path>,
 465        mut has_changed_file: bool,
 466        cx: &mut ModelContext<Self>,
 467    ) -> Task<Result<()>> {
 468        let buffer = buffer_handle.read(cx);
 469        let text = buffer.as_rope().clone();
 470        let line_ending = buffer.line_ending();
 471        let version = buffer.version();
 472        let buffer_id = buffer.remote_id();
 473        if buffer.file().is_some_and(|file| !file.is_created()) {
 474            has_changed_file = true;
 475        }
 476
 477        let save = worktree.update(cx, |worktree, cx| {
 478            worktree.write_file(path.as_ref(), text, line_ending, cx)
 479        });
 480
 481        cx.spawn(move |this, mut cx| async move {
 482            let new_file = save.await?;
 483            let mtime = new_file.mtime;
 484            this.update(&mut cx, |this, cx| {
 485                if let Some(project_id) = this.remote_id {
 486                    if has_changed_file {
 487                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 488                            proto::UpdateBufferFile {
 489                                project_id,
 490                                buffer_id: buffer_id.to_proto(),
 491                                file: Some(language::File::to_proto(&*new_file, cx)),
 492                            }
 493                            .into_envelope(0, None, None),
 494                        )));
 495                    }
 496                    cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 497                        proto::BufferSaved {
 498                            project_id,
 499                            buffer_id: buffer_id.to_proto(),
 500                            version: serialize_version(&version),
 501                            mtime: mtime.map(|time| time.into()),
 502                        }
 503                        .into_envelope(0, None, None),
 504                    )));
 505                }
 506            })?;
 507            buffer_handle.update(&mut cx, |buffer, cx| {
 508                if has_changed_file {
 509                    buffer.file_updated(new_file, cx);
 510                }
 511                buffer.did_save(version.clone(), mtime, cx);
 512            })
 513        })
 514    }
 515
 516    fn save_remote_buffer(
 517        &self,
 518        buffer_handle: Model<Buffer>,
 519        new_path: Option<proto::ProjectPath>,
 520        tree: &RemoteWorktree,
 521        cx: &ModelContext<Self>,
 522    ) -> Task<Result<()>> {
 523        let buffer = buffer_handle.read(cx);
 524        let buffer_id = buffer.remote_id().into();
 525        let version = buffer.version();
 526        let rpc = tree.client();
 527        let project_id = tree.project_id();
 528        cx.spawn(move |_, mut cx| async move {
 529            let response = rpc
 530                .request(proto::SaveBuffer {
 531                    project_id,
 532                    buffer_id,
 533                    new_path,
 534                    version: serialize_version(&version),
 535                })
 536                .await?;
 537            let version = deserialize_version(&response.version);
 538            let mtime = response.mtime.map(|mtime| mtime.into());
 539
 540            buffer_handle.update(&mut cx, |buffer, cx| {
 541                buffer.did_save(version.clone(), mtime, cx);
 542            })?;
 543
 544            Ok(())
 545        })
 546    }
 547
 548    pub fn blame_buffer(
 549        &self,
 550        buffer: &Model<Buffer>,
 551        version: Option<clock::Global>,
 552        cx: &AppContext,
 553    ) -> Task<Result<Blame>> {
 554        let buffer = buffer.read(cx);
 555        let Some(file) = File::from_dyn(buffer.file()) else {
 556            return Task::ready(Err(anyhow!("buffer has no file")));
 557        };
 558
 559        match file.worktree.clone().read(cx) {
 560            Worktree::Local(worktree) => {
 561                let worktree = worktree.snapshot();
 562                let blame_params = maybe!({
 563                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
 564                        Some(repo_for_path) => repo_for_path,
 565                        None => anyhow::bail!(NoRepositoryError {}),
 566                    };
 567
 568                    let relative_path = repo_entry
 569                        .relativize(&worktree, &file.path)
 570                        .context("failed to relativize buffer path")?;
 571
 572                    let repo = local_repo_entry.repo().clone();
 573
 574                    let content = match version {
 575                        Some(version) => buffer.rope_for_version(&version).clone(),
 576                        None => buffer.as_rope().clone(),
 577                    };
 578
 579                    anyhow::Ok((repo, relative_path, content))
 580                });
 581
 582                cx.background_executor().spawn(async move {
 583                    let (repo, relative_path, content) = blame_params?;
 584                    repo.blame(&relative_path, content)
 585                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
 586                })
 587            }
 588            Worktree::Remote(worktree) => {
 589                let buffer_id = buffer.remote_id();
 590                let version = buffer.version();
 591                let project_id = worktree.project_id();
 592                let client = worktree.client();
 593                cx.spawn(|_| async move {
 594                    let response = client
 595                        .request(proto::BlameBuffer {
 596                            project_id,
 597                            buffer_id: buffer_id.into(),
 598                            version: serialize_version(&version),
 599                        })
 600                        .await?;
 601                    Ok(deserialize_blame_buffer_response(response))
 602                })
 603            }
 604        }
 605    }
 606
 607    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
 608        let remote_id = buffer.read(cx).remote_id();
 609        let is_remote = buffer.read(cx).replica_id() != 0;
 610        let open_buffer = if self.remote_id.is_some() {
 611            OpenBuffer::Strong(buffer.clone())
 612        } else {
 613            OpenBuffer::Weak(buffer.downgrade())
 614        };
 615
 616        match self.opened_buffers.entry(remote_id) {
 617            hash_map::Entry::Vacant(entry) => {
 618                entry.insert(open_buffer);
 619            }
 620            hash_map::Entry::Occupied(mut entry) => {
 621                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 622                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
 623                } else if entry.get().upgrade().is_some() {
 624                    if is_remote {
 625                        return Ok(());
 626                    } else {
 627                        debug_panic!("buffer {} was already registered", remote_id);
 628                        Err(anyhow!("buffer {} was already registered", remote_id))?;
 629                    }
 630                }
 631                entry.insert(open_buffer);
 632            }
 633        }
 634
 635        if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
 636            for sender in senders {
 637                sender.send(Ok(buffer.clone())).ok();
 638            }
 639        }
 640
 641        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 642            if file.is_local {
 643                self.local_buffer_ids_by_path.insert(
 644                    ProjectPath {
 645                        worktree_id: file.worktree_id(cx),
 646                        path: file.path.clone(),
 647                    },
 648                    remote_id,
 649                );
 650
 651                if let Some(entry_id) = file.entry_id {
 652                    self.local_buffer_ids_by_entry_id
 653                        .insert(entry_id, remote_id);
 654                }
 655            }
 656        }
 657
 658        cx.subscribe(&buffer, Self::on_buffer_event).detach();
 659        cx.emit(BufferStoreEvent::BufferAdded(buffer));
 660        Ok(())
 661    }
 662
 663    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
 664        self.opened_buffers
 665            .values()
 666            .filter_map(|buffer| buffer.upgrade())
 667    }
 668
 669    pub fn loading_buffers(
 670        &self,
 671    ) -> impl Iterator<
 672        Item = (
 673            &ProjectPath,
 674            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
 675        ),
 676    > {
 677        self.loading_buffers_by_path
 678            .iter()
 679            .map(|(path, rx)| (path, rx.clone()))
 680    }
 681
 682    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
 683        self.buffers().find_map(|buffer| {
 684            let file = File::from_dyn(buffer.read(cx).file())?;
 685            if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
 686                Some(buffer)
 687            } else {
 688                None
 689            }
 690        })
 691    }
 692
 693    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 694        self.opened_buffers
 695            .get(&buffer_id)
 696            .and_then(|buffer| buffer.upgrade())
 697    }
 698
 699    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
 700        self.get(buffer_id)
 701            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
 702    }
 703
 704    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 705        self.get(buffer_id)
 706            .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
 707    }
 708
 709    pub fn wait_for_remote_buffer(
 710        &mut self,
 711        id: BufferId,
 712        cx: &mut AppContext,
 713    ) -> Task<Result<Model<Buffer>>> {
 714        let buffer = self.get(id);
 715        if let Some(buffer) = buffer {
 716            return Task::ready(Ok(buffer));
 717        }
 718        let (tx, rx) = oneshot::channel();
 719        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 720        cx.background_executor().spawn(async move { rx.await? })
 721    }
 722
 723    pub fn buffer_version_info(
 724        &self,
 725        cx: &AppContext,
 726    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
 727        let buffers = self
 728            .buffers()
 729            .map(|buffer| {
 730                let buffer = buffer.read(cx);
 731                proto::BufferVersion {
 732                    id: buffer.remote_id().into(),
 733                    version: language::proto::serialize_version(&buffer.version),
 734                }
 735            })
 736            .collect();
 737        let incomplete_buffer_ids = self
 738            .loading_remote_buffers_by_id
 739            .keys()
 740            .copied()
 741            .collect::<Vec<_>>();
 742        (buffers, incomplete_buffer_ids)
 743    }
 744
 745    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 746        self.set_remote_id(None, cx);
 747
 748        for buffer in self.buffers() {
 749            buffer.update(cx, |buffer, cx| {
 750                buffer.set_capability(Capability::ReadOnly, cx)
 751            });
 752        }
 753
 754        // Wake up all futures currently waiting on a buffer to get opened,
 755        // to give them a chance to fail now that we've disconnected.
 756        self.remote_buffer_listeners.clear();
 757    }
 758
 759    pub fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
 760        self.remote_id = remote_id;
 761        for open_buffer in self.opened_buffers.values_mut() {
 762            if remote_id.is_some() {
 763                if let OpenBuffer::Weak(buffer) = open_buffer {
 764                    if let Some(buffer) = buffer.upgrade() {
 765                        *open_buffer = OpenBuffer::Strong(buffer);
 766                    }
 767                }
 768            } else {
 769                if let Some(buffer) = open_buffer.upgrade() {
 770                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
 771                }
 772                if let OpenBuffer::Strong(buffer) = open_buffer {
 773                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
 774                }
 775            }
 776        }
 777    }
 778
 779    pub fn discard_incomplete(&mut self) {
 780        self.opened_buffers
 781            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
 782    }
 783
 784    pub fn find_search_candidates(
 785        &mut self,
 786        query: &SearchQuery,
 787        limit: usize,
 788        fs: Arc<dyn Fs>,
 789        cx: &mut ModelContext<Self>,
 790    ) -> Receiver<Model<Buffer>> {
 791        let (tx, rx) = smol::channel::unbounded();
 792        let open_buffers = self.find_open_search_candidates(query, cx);
 793        let skip_entries: HashSet<_> = open_buffers
 794            .iter()
 795            .filter_map(|buffer| buffer.read(cx).entry_id(cx))
 796            .collect();
 797
 798        let limit = limit.saturating_sub(open_buffers.len());
 799        for open_buffer in open_buffers {
 800            tx.send_blocking(open_buffer).ok();
 801        }
 802
 803        let match_rx = self.worktree_store.update(cx, |worktree_store, cx| {
 804            worktree_store.find_search_candidates(query.clone(), limit, skip_entries, fs, cx)
 805        });
 806
 807        const MAX_CONCURRENT_BUFFER_OPENS: usize = 8;
 808
 809        for _ in 0..MAX_CONCURRENT_BUFFER_OPENS {
 810            let mut match_rx = match_rx.clone();
 811            let tx = tx.clone();
 812            cx.spawn(|this, mut cx| async move {
 813                while let Some(project_path) = match_rx.next().await {
 814                    let buffer = this
 815                        .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))?
 816                        .await
 817                        .log_err();
 818                    if let Some(buffer) = buffer {
 819                        tx.send_blocking(buffer).ok();
 820                    }
 821                }
 822                anyhow::Ok(())
 823            })
 824            .detach();
 825        }
 826        rx
 827    }
 828
 829    /// Returns open buffers filtered by filename
 830    /// Does *not* check the buffer content, the caller must do that
 831    fn find_open_search_candidates(
 832        &self,
 833        query: &SearchQuery,
 834        cx: &ModelContext<Self>,
 835    ) -> Vec<Model<Buffer>> {
 836        let include_root = self
 837            .worktree_store
 838            .read(cx)
 839            .visible_worktrees(cx)
 840            .collect::<Vec<_>>()
 841            .len()
 842            > 1;
 843        self.buffers()
 844            .filter_map(|buffer| {
 845                let handle = buffer.clone();
 846                buffer.read_with(cx, |buffer, cx| {
 847                    let worktree_store = self.worktree_store.read(cx);
 848                    let entry_id = buffer.entry_id(cx);
 849                    let is_ignored = entry_id
 850                        .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
 851                        .map_or(false, |entry| entry.is_ignored);
 852
 853                    if is_ignored && !query.include_ignored() {
 854                        return None;
 855                    }
 856                    if let Some(file) = buffer.file() {
 857                        let matched_path = if include_root {
 858                            query.file_matches(Some(&file.full_path(cx)))
 859                        } else {
 860                            query.file_matches(Some(file.path()))
 861                        };
 862
 863                        if matched_path {
 864                            Some(handle)
 865                        } else {
 866                            None
 867                        }
 868                    } else {
 869                        Some(handle)
 870                    }
 871                })
 872            })
 873            .collect()
 874    }
 875
 876    fn on_buffer_event(
 877        &mut self,
 878        buffer: Model<Buffer>,
 879        event: &BufferEvent,
 880        cx: &mut ModelContext<Self>,
 881    ) {
 882        match event {
 883            BufferEvent::FileHandleChanged => {
 884                self.buffer_changed_file(buffer, cx);
 885            }
 886            _ => {}
 887        }
 888    }
 889
 890    fn local_worktree_entry_changed(
 891        &mut self,
 892        entry_id: ProjectEntryId,
 893        path: &Arc<Path>,
 894        worktree: &Model<worktree::Worktree>,
 895        snapshot: &worktree::Snapshot,
 896        cx: &mut ModelContext<Self>,
 897    ) -> Option<()> {
 898        let project_path = ProjectPath {
 899            worktree_id: snapshot.id(),
 900            path: path.clone(),
 901        };
 902        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 903            Some(&buffer_id) => buffer_id,
 904            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
 905        };
 906        let buffer = if let Some(buffer) = self.get(buffer_id) {
 907            buffer
 908        } else {
 909            self.opened_buffers.remove(&buffer_id);
 910            self.local_buffer_ids_by_path.remove(&project_path);
 911            self.local_buffer_ids_by_entry_id.remove(&entry_id);
 912            return None;
 913        };
 914
 915        let events = buffer.update(cx, |buffer, cx| {
 916            let file = buffer.file()?;
 917            let old_file = File::from_dyn(Some(file))?;
 918            if old_file.worktree != *worktree {
 919                return None;
 920            }
 921
 922            let new_file = if let Some(entry) = old_file
 923                .entry_id
 924                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 925            {
 926                File {
 927                    is_local: true,
 928                    entry_id: Some(entry.id),
 929                    mtime: entry.mtime,
 930                    path: entry.path.clone(),
 931                    worktree: worktree.clone(),
 932                    is_deleted: false,
 933                    is_private: entry.is_private,
 934                }
 935            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
 936                File {
 937                    is_local: true,
 938                    entry_id: Some(entry.id),
 939                    mtime: entry.mtime,
 940                    path: entry.path.clone(),
 941                    worktree: worktree.clone(),
 942                    is_deleted: false,
 943                    is_private: entry.is_private,
 944                }
 945            } else {
 946                File {
 947                    is_local: true,
 948                    entry_id: old_file.entry_id,
 949                    path: old_file.path.clone(),
 950                    mtime: old_file.mtime,
 951                    worktree: worktree.clone(),
 952                    is_deleted: true,
 953                    is_private: old_file.is_private,
 954                }
 955            };
 956
 957            if new_file == *old_file {
 958                return None;
 959            }
 960
 961            let mut events = Vec::new();
 962            if new_file.path != old_file.path {
 963                self.local_buffer_ids_by_path.remove(&ProjectPath {
 964                    path: old_file.path.clone(),
 965                    worktree_id: old_file.worktree_id(cx),
 966                });
 967                self.local_buffer_ids_by_path.insert(
 968                    ProjectPath {
 969                        worktree_id: new_file.worktree_id(cx),
 970                        path: new_file.path.clone(),
 971                    },
 972                    buffer_id,
 973                );
 974                events.push(BufferStoreEvent::BufferChangedFilePath {
 975                    buffer: cx.handle(),
 976                    old_file: buffer.file().cloned(),
 977                });
 978            }
 979
 980            if new_file.entry_id != old_file.entry_id {
 981                if let Some(entry_id) = old_file.entry_id {
 982                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
 983                }
 984                if let Some(entry_id) = new_file.entry_id {
 985                    self.local_buffer_ids_by_entry_id
 986                        .insert(entry_id, buffer_id);
 987                }
 988            }
 989
 990            if let Some(project_id) = self.remote_id {
 991                events.push(BufferStoreEvent::MessageToReplicas(Box::new(
 992                    proto::UpdateBufferFile {
 993                        project_id,
 994                        buffer_id: buffer_id.to_proto(),
 995                        file: Some(new_file.to_proto(cx)),
 996                    }
 997                    .into_envelope(0, None, None),
 998                )))
 999            }
1000
1001            buffer.file_updated(Arc::new(new_file), cx);
1002            Some(events)
1003        })?;
1004
1005        for event in events {
1006            cx.emit(event);
1007        }
1008
1009        None
1010    }
1011
1012    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
1013        let file = File::from_dyn(buffer.read(cx).file())?;
1014
1015        let remote_id = buffer.read(cx).remote_id();
1016        if let Some(entry_id) = file.entry_id {
1017            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1018                Some(_) => {
1019                    return None;
1020                }
1021                None => {
1022                    self.local_buffer_ids_by_entry_id
1023                        .insert(entry_id, remote_id);
1024                }
1025            }
1026        };
1027        self.local_buffer_ids_by_path.insert(
1028            ProjectPath {
1029                worktree_id: file.worktree_id(cx),
1030                path: file.path.clone(),
1031            },
1032            remote_id,
1033        );
1034
1035        Some(())
1036    }
1037
1038    pub async fn create_buffer_for_peer(
1039        this: Model<Self>,
1040        peer_id: PeerId,
1041        buffer_id: BufferId,
1042        project_id: u64,
1043        client: AnyProtoClient,
1044        cx: &mut AsyncAppContext,
1045    ) -> Result<()> {
1046        let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
1047            return Ok(());
1048        };
1049
1050        let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
1051        let operations = operations.await;
1052        let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
1053
1054        let initial_state = proto::CreateBufferForPeer {
1055            project_id,
1056            peer_id: Some(peer_id),
1057            variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1058        };
1059
1060        if client.send(initial_state).log_err().is_some() {
1061            let client = client.clone();
1062            cx.background_executor()
1063                .spawn(async move {
1064                    let mut chunks = split_operations(operations).peekable();
1065                    while let Some(chunk) = chunks.next() {
1066                        let is_last = chunks.peek().is_none();
1067                        client.send(proto::CreateBufferForPeer {
1068                            project_id,
1069                            peer_id: Some(peer_id),
1070                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1071                                proto::BufferChunk {
1072                                    buffer_id: buffer_id.into(),
1073                                    operations: chunk,
1074                                    is_last,
1075                                },
1076                            )),
1077                        })?;
1078                    }
1079                    anyhow::Ok(())
1080                })
1081                .await
1082                .log_err();
1083        }
1084        Ok(())
1085    }
1086
1087    pub async fn handle_update_buffer(
1088        this: Model<Self>,
1089        envelope: TypedEnvelope<proto::UpdateBuffer>,
1090        mut cx: AsyncAppContext,
1091    ) -> Result<proto::Ack> {
1092        let payload = envelope.payload.clone();
1093        let buffer_id = BufferId::new(payload.buffer_id)?;
1094        let ops = payload
1095            .operations
1096            .into_iter()
1097            .map(language::proto::deserialize_operation)
1098            .collect::<Result<Vec<_>, _>>()?;
1099        this.update(&mut cx, |this, cx| {
1100            match this.opened_buffers.entry(buffer_id) {
1101                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1102                    OpenBuffer::Strong(buffer) => {
1103                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1104                    }
1105                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1106                    OpenBuffer::Weak(_) => {}
1107                },
1108                hash_map::Entry::Vacant(e) => {
1109                    e.insert(OpenBuffer::Operations(ops));
1110                }
1111            }
1112            Ok(proto::Ack {})
1113        })?
1114    }
1115
1116    pub fn handle_create_buffer_for_peer(
1117        &mut self,
1118        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1119        replica_id: u16,
1120        capability: Capability,
1121        cx: &mut ModelContext<Self>,
1122    ) -> Result<()> {
1123        match envelope
1124            .payload
1125            .variant
1126            .ok_or_else(|| anyhow!("missing variant"))?
1127        {
1128            proto::create_buffer_for_peer::Variant::State(mut state) => {
1129                let buffer_id = BufferId::new(state.id)?;
1130
1131                let buffer_result = maybe!({
1132                    let mut buffer_file = None;
1133                    if let Some(file) = state.file.take() {
1134                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1135                        let worktree = self
1136                            .worktree_store
1137                            .read(cx)
1138                            .worktree_for_id(worktree_id, cx)
1139                            .ok_or_else(|| {
1140                                anyhow!("no worktree found for id {}", file.worktree_id)
1141                            })?;
1142                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1143                            as Arc<dyn language::File>);
1144                    }
1145                    Buffer::from_proto(replica_id, capability, state, buffer_file)
1146                });
1147
1148                match buffer_result {
1149                    Ok(buffer) => {
1150                        let buffer = cx.new_model(|_| buffer);
1151                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1152                    }
1153                    Err(error) => {
1154                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1155                            for listener in listeners {
1156                                listener.send(Err(anyhow!(error.cloned()))).ok();
1157                            }
1158                        }
1159                    }
1160                }
1161            }
1162            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1163                let buffer_id = BufferId::new(chunk.buffer_id)?;
1164                let buffer = self
1165                    .loading_remote_buffers_by_id
1166                    .get(&buffer_id)
1167                    .cloned()
1168                    .ok_or_else(|| {
1169                        anyhow!(
1170                            "received chunk for buffer {} without initial state",
1171                            chunk.buffer_id
1172                        )
1173                    })?;
1174
1175                let result = maybe!({
1176                    let operations = chunk
1177                        .operations
1178                        .into_iter()
1179                        .map(language::proto::deserialize_operation)
1180                        .collect::<Result<Vec<_>>>()?;
1181                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1182                });
1183
1184                if let Err(error) = result {
1185                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1186                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1187                        for listener in listeners {
1188                            listener.send(Err(error.cloned())).ok();
1189                        }
1190                    }
1191                } else if chunk.is_last {
1192                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1193                    self.add_buffer(buffer, cx)?;
1194                }
1195            }
1196        }
1197
1198        Ok(())
1199    }
1200
1201    pub async fn handle_update_buffer_file(
1202        this: Model<Self>,
1203        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1204        mut cx: AsyncAppContext,
1205    ) -> Result<()> {
1206        let buffer_id = envelope.payload.buffer_id;
1207        let buffer_id = BufferId::new(buffer_id)?;
1208
1209        this.update(&mut cx, |this, cx| {
1210            let payload = envelope.payload.clone();
1211            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1212                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1213                let worktree = this
1214                    .worktree_store
1215                    .read(cx)
1216                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1217                    .ok_or_else(|| anyhow!("no such worktree"))?;
1218                let file = File::from_proto(file, worktree, cx)?;
1219                let old_file = buffer.update(cx, |buffer, cx| {
1220                    let old_file = buffer.file().cloned();
1221                    let new_path = file.path.clone();
1222                    buffer.file_updated(Arc::new(file), cx);
1223                    if old_file
1224                        .as_ref()
1225                        .map_or(true, |old| *old.path() != new_path)
1226                    {
1227                        Some(old_file)
1228                    } else {
1229                        None
1230                    }
1231                });
1232                if let Some(old_file) = old_file {
1233                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1234                }
1235            }
1236            Ok(())
1237        })?
1238    }
1239
1240    pub async fn handle_update_diff_base(
1241        this: Model<Self>,
1242        envelope: TypedEnvelope<proto::UpdateDiffBase>,
1243        mut cx: AsyncAppContext,
1244    ) -> Result<()> {
1245        this.update(&mut cx, |this, cx| {
1246            let buffer_id = envelope.payload.buffer_id;
1247            let buffer_id = BufferId::new(buffer_id)?;
1248            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1249                buffer.update(cx, |buffer, cx| {
1250                    buffer.set_diff_base(envelope.payload.diff_base, cx)
1251                });
1252            }
1253            Ok(())
1254        })?
1255    }
1256
1257    pub async fn handle_save_buffer(
1258        this: Model<Self>,
1259        envelope: TypedEnvelope<proto::SaveBuffer>,
1260        mut cx: AsyncAppContext,
1261    ) -> Result<proto::BufferSaved> {
1262        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1263        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1264            anyhow::Ok((
1265                this.get_existing(buffer_id)?,
1266                this.remote_id.context("project is not shared")?,
1267            ))
1268        })??;
1269        buffer
1270            .update(&mut cx, |buffer, _| {
1271                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1272            })?
1273            .await?;
1274        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1275
1276        if let Some(new_path) = envelope.payload.new_path {
1277            let new_path = ProjectPath::from_proto(new_path);
1278            this.update(&mut cx, |this, cx| {
1279                this.save_buffer_as(buffer.clone(), new_path, cx)
1280            })?
1281            .await?;
1282        } else {
1283            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1284                .await?;
1285        }
1286
1287        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1288            project_id,
1289            buffer_id: buffer_id.into(),
1290            version: serialize_version(buffer.saved_version()),
1291            mtime: buffer.saved_mtime().map(|time| time.into()),
1292        })
1293    }
1294
1295    pub async fn handle_buffer_saved(
1296        this: Model<Self>,
1297        envelope: TypedEnvelope<proto::BufferSaved>,
1298        mut cx: AsyncAppContext,
1299    ) -> Result<()> {
1300        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1301        let version = deserialize_version(&envelope.payload.version);
1302        let mtime = envelope.payload.mtime.map(|time| time.into());
1303        this.update(&mut cx, |this, cx| {
1304            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1305                buffer.update(cx, |buffer, cx| {
1306                    buffer.did_save(version, mtime, cx);
1307                });
1308            }
1309        })
1310    }
1311
1312    pub async fn handle_buffer_reloaded(
1313        this: Model<Self>,
1314        envelope: TypedEnvelope<proto::BufferReloaded>,
1315        mut cx: AsyncAppContext,
1316    ) -> Result<()> {
1317        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1318        let version = deserialize_version(&envelope.payload.version);
1319        let mtime = envelope.payload.mtime.map(|time| time.into());
1320        let line_ending = deserialize_line_ending(
1321            proto::LineEnding::from_i32(envelope.payload.line_ending)
1322                .ok_or_else(|| anyhow!("missing line ending"))?,
1323        );
1324        this.update(&mut cx, |this, cx| {
1325            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1326                buffer.update(cx, |buffer, cx| {
1327                    buffer.did_reload(version, line_ending, mtime, cx);
1328                });
1329            }
1330        })
1331    }
1332
1333    pub async fn handle_blame_buffer(
1334        this: Model<Self>,
1335        envelope: TypedEnvelope<proto::BlameBuffer>,
1336        mut cx: AsyncAppContext,
1337    ) -> Result<proto::BlameBufferResponse> {
1338        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1339        let version = deserialize_version(&envelope.payload.version);
1340        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1341        buffer
1342            .update(&mut cx, |buffer, _| {
1343                buffer.wait_for_version(version.clone())
1344            })?
1345            .await?;
1346        let blame = this
1347            .update(&mut cx, |this, cx| {
1348                this.blame_buffer(&buffer, Some(version), cx)
1349            })?
1350            .await?;
1351        Ok(serialize_blame_buffer_response(blame))
1352    }
1353
1354    pub async fn wait_for_loading_buffer(
1355        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1356    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1357        loop {
1358            if let Some(result) = receiver.borrow().as_ref() {
1359                match result {
1360                    Ok(buffer) => return Ok(buffer.to_owned()),
1361                    Err(e) => return Err(e.to_owned()),
1362                }
1363            }
1364            receiver.next().await;
1365        }
1366    }
1367}
1368
1369impl OpenBuffer {
1370    fn upgrade(&self) -> Option<Model<Buffer>> {
1371        match self {
1372            OpenBuffer::Strong(handle) => Some(handle.clone()),
1373            OpenBuffer::Weak(handle) => handle.upgrade(),
1374            OpenBuffer::Operations(_) => None,
1375        }
1376    }
1377}
1378
1379fn is_not_found_error(error: &anyhow::Error) -> bool {
1380    error
1381        .root_cause()
1382        .downcast_ref::<io::Error>()
1383        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1384}
1385
1386fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1387    let entries = blame
1388        .entries
1389        .into_iter()
1390        .map(|entry| proto::BlameEntry {
1391            sha: entry.sha.as_bytes().into(),
1392            start_line: entry.range.start,
1393            end_line: entry.range.end,
1394            original_line_number: entry.original_line_number,
1395            author: entry.author.clone(),
1396            author_mail: entry.author_mail.clone(),
1397            author_time: entry.author_time,
1398            author_tz: entry.author_tz.clone(),
1399            committer: entry.committer.clone(),
1400            committer_mail: entry.committer_mail.clone(),
1401            committer_time: entry.committer_time,
1402            committer_tz: entry.committer_tz.clone(),
1403            summary: entry.summary.clone(),
1404            previous: entry.previous.clone(),
1405            filename: entry.filename.clone(),
1406        })
1407        .collect::<Vec<_>>();
1408
1409    let messages = blame
1410        .messages
1411        .into_iter()
1412        .map(|(oid, message)| proto::CommitMessage {
1413            oid: oid.as_bytes().into(),
1414            message,
1415        })
1416        .collect::<Vec<_>>();
1417
1418    let permalinks = blame
1419        .permalinks
1420        .into_iter()
1421        .map(|(oid, url)| proto::CommitPermalink {
1422            oid: oid.as_bytes().into(),
1423            permalink: url.to_string(),
1424        })
1425        .collect::<Vec<_>>();
1426
1427    proto::BlameBufferResponse {
1428        entries,
1429        messages,
1430        permalinks,
1431        remote_url: blame.remote_url,
1432    }
1433}
1434
1435fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1436    let entries = response
1437        .entries
1438        .into_iter()
1439        .filter_map(|entry| {
1440            Some(git::blame::BlameEntry {
1441                sha: git::Oid::from_bytes(&entry.sha).ok()?,
1442                range: entry.start_line..entry.end_line,
1443                original_line_number: entry.original_line_number,
1444                committer: entry.committer,
1445                committer_time: entry.committer_time,
1446                committer_tz: entry.committer_tz,
1447                committer_mail: entry.committer_mail,
1448                author: entry.author,
1449                author_mail: entry.author_mail,
1450                author_time: entry.author_time,
1451                author_tz: entry.author_tz,
1452                summary: entry.summary,
1453                previous: entry.previous,
1454                filename: entry.filename,
1455            })
1456        })
1457        .collect::<Vec<_>>();
1458
1459    let messages = response
1460        .messages
1461        .into_iter()
1462        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1463        .collect::<HashMap<_, _>>();
1464
1465    let permalinks = response
1466        .permalinks
1467        .into_iter()
1468        .filter_map(|permalink| {
1469            Some((
1470                git::Oid::from_bytes(&permalink.oid).ok()?,
1471                Url::from_str(&permalink.permalink).ok()?,
1472            ))
1473        })
1474        .collect::<HashMap<_, _>>();
1475
1476    Blame {
1477        entries,
1478        permalinks,
1479        messages,
1480        remote_url: response.remote_url,
1481    }
1482}