buffer_store.rs

   1use crate::{
   2    search::SearchQuery,
   3    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   4    Item, NoRepositoryError, ProjectPath,
   5};
   6use anyhow::{anyhow, Context as _, Result};
   7use client::Client;
   8use collections::{hash_map, HashMap, HashSet};
   9use fs::Fs;
  10use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
  11use git::blame::Blame;
  12use gpui::{
  13    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
  14    Task, WeakModel,
  15};
  16use http_client::Url;
  17use language::{
  18    proto::{
  19        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  20        split_operations,
  21    },
  22    Buffer, BufferEvent, Capability, File as _, Language, Operation,
  23};
  24use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
  25use smol::channel::Receiver;
  26use std::{io, path::Path, str::FromStr as _, sync::Arc, time::Instant};
  27use text::BufferId;
  28use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  29use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
  30
  31trait BufferStoreImpl {
  32    fn open_buffer(
  33        &self,
  34        path: Arc<Path>,
  35        worktree: Model<Worktree>,
  36        cx: &mut ModelContext<BufferStore>,
  37    ) -> Task<Result<Model<Buffer>>>;
  38
  39    fn save_buffer(
  40        &self,
  41        buffer: Model<Buffer>,
  42        cx: &mut ModelContext<BufferStore>,
  43    ) -> Task<Result<()>>;
  44
  45    fn save_buffer_as(
  46        &self,
  47        buffer: Model<Buffer>,
  48        path: ProjectPath,
  49        cx: &mut ModelContext<BufferStore>,
  50    ) -> Task<Result<()>>;
  51
  52    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>>;
  53
  54    fn reload_buffers(
  55        &self,
  56        buffers: Vec<Model<Buffer>>,
  57        push_to_history: bool,
  58        cx: &mut ModelContext<BufferStore>,
  59    ) -> Task<Result<ProjectTransaction>>;
  60
  61    fn as_remote(&self) -> Option<Model<RemoteBufferStore>>;
  62    fn as_local(&self) -> Option<Model<LocalBufferStore>>;
  63}
  64
  65struct RemoteBufferStore {
  66    shared_with_me: HashSet<Model<Buffer>>,
  67    upstream_client: AnyProtoClient,
  68    project_id: u64,
  69    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  70    remote_buffer_listeners:
  71        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  72    worktree_store: Model<WorktreeStore>,
  73    buffer_store: WeakModel<BufferStore>,
  74}
  75
  76struct LocalBufferStore {
  77    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  78    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  79    buffer_store: WeakModel<BufferStore>,
  80    worktree_store: Model<WorktreeStore>,
  81    _subscription: Subscription,
  82}
  83
  84/// A set of open buffers.
  85pub struct BufferStore {
  86    state: Box<dyn BufferStoreImpl>,
  87    #[allow(clippy::type_complexity)]
  88    loading_buffers_by_path: HashMap<
  89        ProjectPath,
  90        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  91    >,
  92    worktree_store: Model<WorktreeStore>,
  93    opened_buffers: HashMap<BufferId, OpenBuffer>,
  94    downstream_client: Option<(AnyProtoClient, u64)>,
  95    shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
  96}
  97
  98enum OpenBuffer {
  99    Buffer(WeakModel<Buffer>),
 100    Operations(Vec<Operation>),
 101}
 102
 103pub enum BufferStoreEvent {
 104    BufferAdded(Model<Buffer>),
 105    BufferDropped(BufferId),
 106    BufferChangedFilePath {
 107        buffer: Model<Buffer>,
 108        old_file: Option<Arc<dyn language::File>>,
 109    },
 110}
 111
 112#[derive(Default, Debug)]
 113pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
 114
 115impl EventEmitter<BufferStoreEvent> for BufferStore {}
 116
 117impl RemoteBufferStore {
 118    pub fn wait_for_remote_buffer(
 119        &mut self,
 120        id: BufferId,
 121        cx: &mut AppContext,
 122    ) -> Task<Result<Model<Buffer>>> {
 123        let buffer_store = self.buffer_store.clone();
 124        let (tx, rx) = oneshot::channel();
 125        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 126
 127        cx.spawn(|cx| async move {
 128            if let Some(buffer) = buffer_store
 129                .read_with(&cx, |buffer_store, _| buffer_store.get(id))
 130                .ok()
 131                .flatten()
 132            {
 133                return Ok(buffer);
 134            }
 135
 136            cx.background_executor()
 137                .spawn(async move { rx.await? })
 138                .await
 139        })
 140    }
 141
 142    fn save_remote_buffer(
 143        &self,
 144        buffer_handle: Model<Buffer>,
 145        new_path: Option<proto::ProjectPath>,
 146        cx: &ModelContext<Self>,
 147    ) -> Task<Result<()>> {
 148        let buffer = buffer_handle.read(cx);
 149        let buffer_id = buffer.remote_id().into();
 150        let version = buffer.version();
 151        let rpc = self.upstream_client.clone();
 152        let project_id = self.project_id;
 153        cx.spawn(move |_, mut cx| async move {
 154            let response = rpc
 155                .request(proto::SaveBuffer {
 156                    project_id,
 157                    buffer_id,
 158                    new_path,
 159                    version: serialize_version(&version),
 160                })
 161                .await?;
 162            let version = deserialize_version(&response.version);
 163            let mtime = response.mtime.map(|mtime| mtime.into());
 164
 165            buffer_handle.update(&mut cx, |buffer, cx| {
 166                buffer.did_save(version.clone(), mtime, cx);
 167            })?;
 168
 169            Ok(())
 170        })
 171    }
 172
 173    pub fn handle_create_buffer_for_peer(
 174        &mut self,
 175        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 176        replica_id: u16,
 177        capability: Capability,
 178        cx: &mut ModelContext<Self>,
 179    ) -> Result<Option<Model<Buffer>>> {
 180        match envelope
 181            .payload
 182            .variant
 183            .ok_or_else(|| anyhow!("missing variant"))?
 184        {
 185            proto::create_buffer_for_peer::Variant::State(mut state) => {
 186                let buffer_id = BufferId::new(state.id)?;
 187
 188                let buffer_result = maybe!({
 189                    let mut buffer_file = None;
 190                    if let Some(file) = state.file.take() {
 191                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 192                        let worktree = self
 193                            .worktree_store
 194                            .read(cx)
 195                            .worktree_for_id(worktree_id, cx)
 196                            .ok_or_else(|| {
 197                                anyhow!("no worktree found for id {}", file.worktree_id)
 198                            })?;
 199                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
 200                            as Arc<dyn language::File>);
 201                    }
 202                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 203                });
 204
 205                match buffer_result {
 206                    Ok(buffer) => {
 207                        let buffer = cx.new_model(|_| buffer);
 208                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 209                    }
 210                    Err(error) => {
 211                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 212                            for listener in listeners {
 213                                listener.send(Err(anyhow!(error.cloned()))).ok();
 214                            }
 215                        }
 216                    }
 217                }
 218            }
 219            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 220                let buffer_id = BufferId::new(chunk.buffer_id)?;
 221                let buffer = self
 222                    .loading_remote_buffers_by_id
 223                    .get(&buffer_id)
 224                    .cloned()
 225                    .ok_or_else(|| {
 226                        anyhow!(
 227                            "received chunk for buffer {} without initial state",
 228                            chunk.buffer_id
 229                        )
 230                    })?;
 231
 232                let result = maybe!({
 233                    let operations = chunk
 234                        .operations
 235                        .into_iter()
 236                        .map(language::proto::deserialize_operation)
 237                        .collect::<Result<Vec<_>>>()?;
 238                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 239                    anyhow::Ok(())
 240                });
 241
 242                if let Err(error) = result {
 243                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 244                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 245                        for listener in listeners {
 246                            listener.send(Err(error.cloned())).ok();
 247                        }
 248                    }
 249                } else if chunk.is_last {
 250                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 251                    if self.upstream_client.is_via_collab() {
 252                        // retain buffers sent by peers to avoid races.
 253                        self.shared_with_me.insert(buffer.clone());
 254                    }
 255
 256                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 257                        for sender in senders {
 258                            sender.send(Ok(buffer.clone())).ok();
 259                        }
 260                    }
 261                    return Ok(Some(buffer));
 262                }
 263            }
 264        }
 265        return Ok(None);
 266    }
 267
 268    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 269        self.loading_remote_buffers_by_id
 270            .keys()
 271            .copied()
 272            .collect::<Vec<_>>()
 273    }
 274
 275    pub fn deserialize_project_transaction(
 276        &self,
 277        message: proto::ProjectTransaction,
 278        push_to_history: bool,
 279        cx: &mut ModelContext<Self>,
 280    ) -> Task<Result<ProjectTransaction>> {
 281        cx.spawn(|this, mut cx| async move {
 282            let mut project_transaction = ProjectTransaction::default();
 283            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 284            {
 285                let buffer_id = BufferId::new(buffer_id)?;
 286                let buffer = this
 287                    .update(&mut cx, |this, cx| {
 288                        this.wait_for_remote_buffer(buffer_id, cx)
 289                    })?
 290                    .await?;
 291                let transaction = language::proto::deserialize_transaction(transaction)?;
 292                project_transaction.0.insert(buffer, transaction);
 293            }
 294
 295            for (buffer, transaction) in &project_transaction.0 {
 296                buffer
 297                    .update(&mut cx, |buffer, _| {
 298                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 299                    })?
 300                    .await?;
 301
 302                if push_to_history {
 303                    buffer.update(&mut cx, |buffer, _| {
 304                        buffer.push_transaction(transaction.clone(), Instant::now());
 305                    })?;
 306                }
 307            }
 308
 309            Ok(project_transaction)
 310        })
 311    }
 312}
 313
 314impl BufferStoreImpl for Model<RemoteBufferStore> {
 315    fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
 316        Some(self.clone())
 317    }
 318
 319    fn as_local(&self) -> Option<Model<LocalBufferStore>> {
 320        None
 321    }
 322
 323    fn save_buffer(
 324        &self,
 325        buffer: Model<Buffer>,
 326        cx: &mut ModelContext<BufferStore>,
 327    ) -> Task<Result<()>> {
 328        self.update(cx, |this, cx| {
 329            this.save_remote_buffer(buffer.clone(), None, cx)
 330        })
 331    }
 332    fn save_buffer_as(
 333        &self,
 334        buffer: Model<Buffer>,
 335        path: ProjectPath,
 336        cx: &mut ModelContext<BufferStore>,
 337    ) -> Task<Result<()>> {
 338        self.update(cx, |this, cx| {
 339            this.save_remote_buffer(buffer, Some(path.to_proto()), cx)
 340        })
 341    }
 342
 343    fn open_buffer(
 344        &self,
 345        path: Arc<Path>,
 346        worktree: Model<Worktree>,
 347        cx: &mut ModelContext<BufferStore>,
 348    ) -> Task<Result<Model<Buffer>>> {
 349        self.update(cx, |this, cx| {
 350            let worktree_id = worktree.read(cx).id().to_proto();
 351            let project_id = this.project_id;
 352            let client = this.upstream_client.clone();
 353            let path_string = path.clone().to_string_lossy().to_string();
 354            cx.spawn(move |this, mut cx| async move {
 355                let response = client
 356                    .request(proto::OpenBufferByPath {
 357                        project_id,
 358                        worktree_id,
 359                        path: path_string,
 360                    })
 361                    .await?;
 362                let buffer_id = BufferId::new(response.buffer_id)?;
 363
 364                let buffer = this
 365                    .update(&mut cx, {
 366                        |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 367                    })?
 368                    .await?;
 369
 370                Ok(buffer)
 371            })
 372        })
 373    }
 374
 375    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
 376        self.update(cx, |this, cx| {
 377            let create = this.upstream_client.request(proto::OpenNewBuffer {
 378                project_id: this.project_id,
 379            });
 380            cx.spawn(|this, mut cx| async move {
 381                let response = create.await?;
 382                let buffer_id = BufferId::new(response.buffer_id)?;
 383
 384                this.update(&mut cx, |this, cx| {
 385                    this.wait_for_remote_buffer(buffer_id, cx)
 386                })?
 387                .await
 388            })
 389        })
 390    }
 391
 392    fn reload_buffers(
 393        &self,
 394        buffers: Vec<Model<Buffer>>,
 395        push_to_history: bool,
 396        cx: &mut ModelContext<BufferStore>,
 397    ) -> Task<Result<ProjectTransaction>> {
 398        self.update(cx, |this, cx| {
 399            let request = this.upstream_client.request(proto::ReloadBuffers {
 400                project_id: this.project_id,
 401                buffer_ids: buffers
 402                    .iter()
 403                    .map(|buffer| buffer.read(cx).remote_id().to_proto())
 404                    .collect(),
 405            });
 406
 407            cx.spawn(|this, mut cx| async move {
 408                let response = request
 409                    .await?
 410                    .transaction
 411                    .ok_or_else(|| anyhow!("missing transaction"))?;
 412                this.update(&mut cx, |this, cx| {
 413                    this.deserialize_project_transaction(response, push_to_history, cx)
 414                })?
 415                .await
 416            })
 417        })
 418    }
 419}
 420
 421impl LocalBufferStore {
 422    fn save_local_buffer(
 423        &self,
 424        buffer_handle: Model<Buffer>,
 425        worktree: Model<Worktree>,
 426        path: Arc<Path>,
 427        mut has_changed_file: bool,
 428        cx: &mut ModelContext<Self>,
 429    ) -> Task<Result<()>> {
 430        let buffer = buffer_handle.read(cx);
 431
 432        let text = buffer.as_rope().clone();
 433        let line_ending = buffer.line_ending();
 434        let version = buffer.version();
 435        let buffer_id = buffer.remote_id();
 436        if buffer.file().is_some_and(|file| !file.is_created()) {
 437            has_changed_file = true;
 438        }
 439
 440        let save = worktree.update(cx, |worktree, cx| {
 441            worktree.write_file(path.as_ref(), text, line_ending, cx)
 442        });
 443
 444        cx.spawn(move |this, mut cx| async move {
 445            let new_file = save.await?;
 446            let mtime = new_file.mtime;
 447            this.update(&mut cx, |this, cx| {
 448                if let Some((downstream_client, project_id)) = this.downstream_client(cx) {
 449                    if has_changed_file {
 450                        downstream_client
 451                            .send(proto::UpdateBufferFile {
 452                                project_id,
 453                                buffer_id: buffer_id.to_proto(),
 454                                file: Some(language::File::to_proto(&*new_file, cx)),
 455                            })
 456                            .log_err();
 457                    }
 458                    downstream_client
 459                        .send(proto::BufferSaved {
 460                            project_id,
 461                            buffer_id: buffer_id.to_proto(),
 462                            version: serialize_version(&version),
 463                            mtime: mtime.map(|time| time.into()),
 464                        })
 465                        .log_err();
 466                }
 467            })?;
 468            buffer_handle.update(&mut cx, |buffer, cx| {
 469                if has_changed_file {
 470                    buffer.file_updated(new_file, cx);
 471                }
 472                buffer.did_save(version.clone(), mtime, cx);
 473            })
 474        })
 475    }
 476
 477    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 478        cx.subscribe(worktree, |this, worktree, event, cx| {
 479            if worktree.read(cx).is_local() {
 480                match event {
 481                    worktree::Event::UpdatedEntries(changes) => {
 482                        this.local_worktree_entries_changed(&worktree, changes, cx);
 483                    }
 484                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 485                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
 486                    }
 487                    _ => {}
 488                }
 489            }
 490        })
 491        .detach();
 492    }
 493
 494    fn local_worktree_entries_changed(
 495        &mut self,
 496        worktree_handle: &Model<Worktree>,
 497        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 498        cx: &mut ModelContext<Self>,
 499    ) {
 500        let snapshot = worktree_handle.read(cx).snapshot();
 501        for (path, entry_id, _) in changes {
 502            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
 503        }
 504    }
 505
 506    fn local_worktree_git_repos_changed(
 507        &mut self,
 508        worktree_handle: Model<Worktree>,
 509        changed_repos: &UpdatedGitRepositoriesSet,
 510        cx: &mut ModelContext<Self>,
 511    ) {
 512        debug_assert!(worktree_handle.read(cx).is_local());
 513        let Some(buffer_store) = self.buffer_store.upgrade() else {
 514            return;
 515        };
 516
 517        // Identify the loading buffers whose containing repository that has changed.
 518        let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| {
 519            let future_buffers = buffer_store
 520                .loading_buffers()
 521                .filter_map(|(project_path, receiver)| {
 522                    if project_path.worktree_id != worktree_handle.read(cx).id() {
 523                        return None;
 524                    }
 525                    let path = &project_path.path;
 526                    changed_repos
 527                        .iter()
 528                        .find(|(work_dir, _)| path.starts_with(work_dir))?;
 529                    let path = path.clone();
 530                    Some(async move {
 531                        BufferStore::wait_for_loading_buffer(receiver)
 532                            .await
 533                            .ok()
 534                            .map(|buffer| (buffer, path))
 535                    })
 536                })
 537                .collect::<FuturesUnordered<_>>();
 538
 539            // Identify the current buffers whose containing repository has changed.
 540            let current_buffers = buffer_store
 541                .buffers()
 542                .filter_map(|buffer| {
 543                    let file = File::from_dyn(buffer.read(cx).file())?;
 544                    if file.worktree != worktree_handle {
 545                        return None;
 546                    }
 547                    changed_repos
 548                        .iter()
 549                        .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 550                    Some((buffer, file.path.clone()))
 551                })
 552                .collect::<Vec<_>>();
 553            (future_buffers, current_buffers)
 554        });
 555
 556        if future_buffers.len() + current_buffers.len() == 0 {
 557            return;
 558        }
 559
 560        cx.spawn(move |this, mut cx| async move {
 561            // Wait for all of the buffers to load.
 562            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 563
 564            // Reload the diff base for every buffer whose containing git repository has changed.
 565            let snapshot =
 566                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 567            let diff_bases_by_buffer = cx
 568                .background_executor()
 569                .spawn(async move {
 570                    let mut diff_base_tasks = future_buffers
 571                        .into_iter()
 572                        .flatten()
 573                        .chain(current_buffers)
 574                        .filter_map(|(buffer, path)| {
 575                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 576                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 577                            Some(async move {
 578                                let base_text =
 579                                    local_repo_entry.repo().load_index_text(&relative_path);
 580                                Some((buffer, base_text))
 581                            })
 582                        })
 583                        .collect::<FuturesUnordered<_>>();
 584
 585                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 586                    while let Some(diff_base) = diff_base_tasks.next().await {
 587                        if let Some(diff_base) = diff_base {
 588                            diff_bases.push(diff_base);
 589                        }
 590                    }
 591                    diff_bases
 592                })
 593                .await;
 594
 595            this.update(&mut cx, |this, cx| {
 596                // Assign the new diff bases on all of the buffers.
 597                for (buffer, diff_base) in diff_bases_by_buffer {
 598                    let buffer_id = buffer.update(cx, |buffer, cx| {
 599                        buffer.set_diff_base(diff_base.clone(), cx);
 600                        buffer.remote_id().to_proto()
 601                    });
 602                    if let Some((client, project_id)) = &this.downstream_client(cx) {
 603                        client
 604                            .send(proto::UpdateDiffBase {
 605                                project_id: *project_id,
 606                                buffer_id,
 607                                diff_base,
 608                            })
 609                            .log_err();
 610                    }
 611                }
 612            })
 613        })
 614        .detach_and_log_err(cx);
 615    }
 616
 617    fn local_worktree_entry_changed(
 618        &mut self,
 619        entry_id: ProjectEntryId,
 620        path: &Arc<Path>,
 621        worktree: &Model<worktree::Worktree>,
 622        snapshot: &worktree::Snapshot,
 623        cx: &mut ModelContext<Self>,
 624    ) -> Option<()> {
 625        let project_path = ProjectPath {
 626            worktree_id: snapshot.id(),
 627            path: path.clone(),
 628        };
 629        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 630            Some(&buffer_id) => buffer_id,
 631            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
 632        };
 633        let buffer = self
 634            .buffer_store
 635            .update(cx, |buffer_store, _| {
 636                if let Some(buffer) = buffer_store.get(buffer_id) {
 637                    Some(buffer)
 638                } else {
 639                    buffer_store.opened_buffers.remove(&buffer_id);
 640                    None
 641                }
 642            })
 643            .ok()
 644            .flatten();
 645        let buffer = if let Some(buffer) = buffer {
 646            buffer
 647        } else {
 648            self.local_buffer_ids_by_path.remove(&project_path);
 649            self.local_buffer_ids_by_entry_id.remove(&entry_id);
 650            return None;
 651        };
 652
 653        let events = buffer.update(cx, |buffer, cx| {
 654            let file = buffer.file()?;
 655            let old_file = File::from_dyn(Some(file))?;
 656            if old_file.worktree != *worktree {
 657                return None;
 658            }
 659
 660            let new_file = if let Some(entry) = old_file
 661                .entry_id
 662                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 663            {
 664                File {
 665                    is_local: true,
 666                    entry_id: Some(entry.id),
 667                    mtime: entry.mtime,
 668                    path: entry.path.clone(),
 669                    worktree: worktree.clone(),
 670                    is_deleted: false,
 671                    is_private: entry.is_private,
 672                }
 673            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
 674                File {
 675                    is_local: true,
 676                    entry_id: Some(entry.id),
 677                    mtime: entry.mtime,
 678                    path: entry.path.clone(),
 679                    worktree: worktree.clone(),
 680                    is_deleted: false,
 681                    is_private: entry.is_private,
 682                }
 683            } else {
 684                File {
 685                    is_local: true,
 686                    entry_id: old_file.entry_id,
 687                    path: old_file.path.clone(),
 688                    mtime: old_file.mtime,
 689                    worktree: worktree.clone(),
 690                    is_deleted: true,
 691                    is_private: old_file.is_private,
 692                }
 693            };
 694
 695            if new_file == *old_file {
 696                return None;
 697            }
 698
 699            let mut events = Vec::new();
 700            if new_file.path != old_file.path {
 701                self.local_buffer_ids_by_path.remove(&ProjectPath {
 702                    path: old_file.path.clone(),
 703                    worktree_id: old_file.worktree_id(cx),
 704                });
 705                self.local_buffer_ids_by_path.insert(
 706                    ProjectPath {
 707                        worktree_id: new_file.worktree_id(cx),
 708                        path: new_file.path.clone(),
 709                    },
 710                    buffer_id,
 711                );
 712                events.push(BufferStoreEvent::BufferChangedFilePath {
 713                    buffer: cx.handle(),
 714                    old_file: buffer.file().cloned(),
 715                });
 716            }
 717
 718            if new_file.entry_id != old_file.entry_id {
 719                if let Some(entry_id) = old_file.entry_id {
 720                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
 721                }
 722                if let Some(entry_id) = new_file.entry_id {
 723                    self.local_buffer_ids_by_entry_id
 724                        .insert(entry_id, buffer_id);
 725                }
 726            }
 727
 728            if let Some((client, project_id)) = &self.downstream_client(cx) {
 729                client
 730                    .send(proto::UpdateBufferFile {
 731                        project_id: *project_id,
 732                        buffer_id: buffer_id.to_proto(),
 733                        file: Some(new_file.to_proto(cx)),
 734                    })
 735                    .ok();
 736            }
 737
 738            buffer.file_updated(Arc::new(new_file), cx);
 739            Some(events)
 740        })?;
 741        self.buffer_store
 742            .update(cx, |_buffer_store, cx| {
 743                for event in events {
 744                    cx.emit(event);
 745                }
 746            })
 747            .log_err()?;
 748
 749        None
 750    }
 751
 752    fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> {
 753        self.buffer_store
 754            .upgrade()?
 755            .read(cx)
 756            .downstream_client
 757            .clone()
 758    }
 759
 760    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
 761        let file = File::from_dyn(buffer.read(cx).file())?;
 762
 763        let remote_id = buffer.read(cx).remote_id();
 764        if let Some(entry_id) = file.entry_id {
 765            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 766                Some(_) => {
 767                    return None;
 768                }
 769                None => {
 770                    self.local_buffer_ids_by_entry_id
 771                        .insert(entry_id, remote_id);
 772                }
 773            }
 774        };
 775        self.local_buffer_ids_by_path.insert(
 776            ProjectPath {
 777                worktree_id: file.worktree_id(cx),
 778                path: file.path.clone(),
 779            },
 780            remote_id,
 781        );
 782
 783        Some(())
 784    }
 785}
 786
 787impl BufferStoreImpl for Model<LocalBufferStore> {
 788    fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
 789        None
 790    }
 791
 792    fn as_local(&self) -> Option<Model<LocalBufferStore>> {
 793        Some(self.clone())
 794    }
 795
 796    fn save_buffer(
 797        &self,
 798        buffer: Model<Buffer>,
 799        cx: &mut ModelContext<BufferStore>,
 800    ) -> Task<Result<()>> {
 801        self.update(cx, |this, cx| {
 802            let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 803                return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 804            };
 805            let worktree = file.worktree.clone();
 806            this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
 807        })
 808    }
 809
 810    fn save_buffer_as(
 811        &self,
 812        buffer: Model<Buffer>,
 813        path: ProjectPath,
 814        cx: &mut ModelContext<BufferStore>,
 815    ) -> Task<Result<()>> {
 816        self.update(cx, |this, cx| {
 817            let Some(worktree) = this
 818                .worktree_store
 819                .read(cx)
 820                .worktree_for_id(path.worktree_id, cx)
 821            else {
 822                return Task::ready(Err(anyhow!("no such worktree")));
 823            };
 824            this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
 825        })
 826    }
 827
 828    fn open_buffer(
 829        &self,
 830        path: Arc<Path>,
 831        worktree: Model<Worktree>,
 832        cx: &mut ModelContext<BufferStore>,
 833    ) -> Task<Result<Model<Buffer>>> {
 834        let buffer_store = cx.weak_model();
 835        self.update(cx, |_, cx| {
 836            let load_buffer = worktree.update(cx, |worktree, cx| {
 837                let load_file = worktree.load_file(path.as_ref(), cx);
 838                let reservation = cx.reserve_model();
 839                let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 840                cx.spawn(move |_, mut cx| async move {
 841                    let loaded = load_file.await?;
 842                    let text_buffer = cx
 843                        .background_executor()
 844                        .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 845                        .await;
 846                    cx.insert_model(reservation, |_| {
 847                        Buffer::build(
 848                            text_buffer,
 849                            loaded.diff_base,
 850                            Some(loaded.file),
 851                            Capability::ReadWrite,
 852                        )
 853                    })
 854                })
 855            });
 856
 857            cx.spawn(move |this, mut cx| async move {
 858                let buffer = match load_buffer.await {
 859                    Ok(buffer) => Ok(buffer),
 860                    Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 861                        let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 862                        let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 863                        Buffer::build(
 864                            text_buffer,
 865                            None,
 866                            Some(Arc::new(File {
 867                                worktree,
 868                                path,
 869                                mtime: None,
 870                                entry_id: None,
 871                                is_local: true,
 872                                is_deleted: false,
 873                                is_private: false,
 874                            })),
 875                            Capability::ReadWrite,
 876                        )
 877                    }),
 878                    Err(e) => Err(e),
 879                }?;
 880                this.update(&mut cx, |this, cx| {
 881                    buffer_store.update(cx, |buffer_store, cx| {
 882                        buffer_store.add_buffer(buffer.clone(), cx)
 883                    })??;
 884                    let buffer_id = buffer.read(cx).remote_id();
 885                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 886                        this.local_buffer_ids_by_path.insert(
 887                            ProjectPath {
 888                                worktree_id: file.worktree_id(cx),
 889                                path: file.path.clone(),
 890                            },
 891                            buffer_id,
 892                        );
 893
 894                        if let Some(entry_id) = file.entry_id {
 895                            this.local_buffer_ids_by_entry_id
 896                                .insert(entry_id, buffer_id);
 897                        }
 898                    }
 899
 900                    anyhow::Ok(())
 901                })??;
 902
 903                Ok(buffer)
 904            })
 905        })
 906    }
 907
 908    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
 909        let handle = self.clone();
 910        cx.spawn(|buffer_store, mut cx| async move {
 911            let buffer = cx.new_model(|cx| {
 912                Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
 913            })?;
 914            buffer_store.update(&mut cx, |buffer_store, cx| {
 915                buffer_store.add_buffer(buffer.clone(), cx).log_err();
 916                let buffer_id = buffer.read(cx).remote_id();
 917                handle.update(cx, |this, cx| {
 918                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 919                        this.local_buffer_ids_by_path.insert(
 920                            ProjectPath {
 921                                worktree_id: file.worktree_id(cx),
 922                                path: file.path.clone(),
 923                            },
 924                            buffer_id,
 925                        );
 926
 927                        if let Some(entry_id) = file.entry_id {
 928                            this.local_buffer_ids_by_entry_id
 929                                .insert(entry_id, buffer_id);
 930                        }
 931                    }
 932                });
 933            })?;
 934            Ok(buffer)
 935        })
 936    }
 937
 938    fn reload_buffers(
 939        &self,
 940        buffers: Vec<Model<Buffer>>,
 941        push_to_history: bool,
 942        cx: &mut ModelContext<BufferStore>,
 943    ) -> Task<Result<ProjectTransaction>> {
 944        cx.spawn(move |_, mut cx| async move {
 945            let mut project_transaction = ProjectTransaction::default();
 946            for buffer in buffers {
 947                let transaction = buffer
 948                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
 949                    .await?;
 950                buffer.update(&mut cx, |buffer, cx| {
 951                    if let Some(transaction) = transaction {
 952                        if !push_to_history {
 953                            buffer.forget_transaction(transaction.id);
 954                        }
 955                        project_transaction.0.insert(cx.handle(), transaction);
 956                    }
 957                })?;
 958            }
 959
 960            Ok(project_transaction)
 961        })
 962    }
 963}
 964
 965impl BufferStore {
 966    pub fn init(client: &AnyProtoClient) {
 967        client.add_model_message_handler(Self::handle_buffer_reloaded);
 968        client.add_model_message_handler(Self::handle_buffer_saved);
 969        client.add_model_message_handler(Self::handle_update_buffer_file);
 970        client.add_model_message_handler(Self::handle_update_diff_base);
 971        client.add_model_request_handler(Self::handle_save_buffer);
 972        client.add_model_request_handler(Self::handle_blame_buffer);
 973        client.add_model_request_handler(Self::handle_reload_buffers);
 974    }
 975
 976    /// Creates a buffer store, optionally retaining its buffers.
 977    pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
 978        let this = cx.weak_model();
 979        Self {
 980            state: Box::new(cx.new_model(|cx| {
 981                let subscription = cx.subscribe(
 982                    &worktree_store,
 983                    |this: &mut LocalBufferStore, _, event, cx| {
 984                        if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 985                            this.subscribe_to_worktree(worktree, cx);
 986                        }
 987                    },
 988                );
 989
 990                LocalBufferStore {
 991                    local_buffer_ids_by_path: Default::default(),
 992                    local_buffer_ids_by_entry_id: Default::default(),
 993                    buffer_store: this,
 994                    worktree_store: worktree_store.clone(),
 995                    _subscription: subscription,
 996                }
 997            })),
 998            downstream_client: None,
 999            opened_buffers: Default::default(),
1000            shared_buffers: Default::default(),
1001            loading_buffers_by_path: Default::default(),
1002            worktree_store,
1003        }
1004    }
1005
1006    pub fn remote(
1007        worktree_store: Model<WorktreeStore>,
1008        upstream_client: AnyProtoClient,
1009        remote_id: u64,
1010        cx: &mut ModelContext<Self>,
1011    ) -> Self {
1012        let this = cx.weak_model();
1013        Self {
1014            state: Box::new(cx.new_model(|_| RemoteBufferStore {
1015                shared_with_me: Default::default(),
1016                loading_remote_buffers_by_id: Default::default(),
1017                remote_buffer_listeners: Default::default(),
1018                project_id: remote_id,
1019                upstream_client,
1020                worktree_store: worktree_store.clone(),
1021                buffer_store: this,
1022            })),
1023            downstream_client: None,
1024            opened_buffers: Default::default(),
1025            loading_buffers_by_path: Default::default(),
1026            shared_buffers: Default::default(),
1027            worktree_store,
1028        }
1029    }
1030
1031    pub fn open_buffer(
1032        &mut self,
1033        project_path: ProjectPath,
1034        cx: &mut ModelContext<Self>,
1035    ) -> Task<Result<Model<Buffer>>> {
1036        let existing_buffer = self.get_by_path(&project_path, cx);
1037        if let Some(existing_buffer) = existing_buffer {
1038            return Task::ready(Ok(existing_buffer));
1039        }
1040
1041        let Some(worktree) = self
1042            .worktree_store
1043            .read(cx)
1044            .worktree_for_id(project_path.worktree_id, cx)
1045        else {
1046            return Task::ready(Err(anyhow!("no such worktree")));
1047        };
1048
1049        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1050            // If the given path is already being loaded, then wait for that existing
1051            // task to complete and return the same buffer.
1052            hash_map::Entry::Occupied(e) => e.get().clone(),
1053
1054            // Otherwise, record the fact that this path is now being loaded.
1055            hash_map::Entry::Vacant(entry) => {
1056                let (mut tx, rx) = postage::watch::channel();
1057                entry.insert(rx.clone());
1058
1059                let project_path = project_path.clone();
1060                let load_buffer = self
1061                    .state
1062                    .open_buffer(project_path.path.clone(), worktree, cx);
1063
1064                cx.spawn(move |this, mut cx| async move {
1065                    let load_result = load_buffer.await;
1066                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
1067                        // Record the fact that the buffer is no longer loading.
1068                        this.loading_buffers_by_path.remove(&project_path);
1069                        let buffer = load_result.map_err(Arc::new)?;
1070                        Ok(buffer)
1071                    })?);
1072                    anyhow::Ok(())
1073                })
1074                .detach();
1075                rx
1076            }
1077        };
1078
1079        cx.background_executor().spawn(async move {
1080            Self::wait_for_loading_buffer(loading_watch)
1081                .await
1082                .map_err(|e| e.cloned())
1083        })
1084    }
1085
1086    pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1087        self.state.create_buffer(cx)
1088    }
1089
1090    pub fn save_buffer(
1091        &mut self,
1092        buffer: Model<Buffer>,
1093        cx: &mut ModelContext<Self>,
1094    ) -> Task<Result<()>> {
1095        self.state.save_buffer(buffer, cx)
1096    }
1097
1098    pub fn save_buffer_as(
1099        &mut self,
1100        buffer: Model<Buffer>,
1101        path: ProjectPath,
1102        cx: &mut ModelContext<Self>,
1103    ) -> Task<Result<()>> {
1104        let old_file = buffer.read(cx).file().cloned();
1105        let task = self.state.save_buffer_as(buffer.clone(), path, cx);
1106        cx.spawn(|this, mut cx| async move {
1107            task.await?;
1108            this.update(&mut cx, |_, cx| {
1109                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1110            })
1111        })
1112    }
1113
1114    pub fn blame_buffer(
1115        &self,
1116        buffer: &Model<Buffer>,
1117        version: Option<clock::Global>,
1118        cx: &AppContext,
1119    ) -> Task<Result<Blame>> {
1120        let buffer = buffer.read(cx);
1121        let Some(file) = File::from_dyn(buffer.file()) else {
1122            return Task::ready(Err(anyhow!("buffer has no file")));
1123        };
1124
1125        match file.worktree.clone().read(cx) {
1126            Worktree::Local(worktree) => {
1127                let worktree = worktree.snapshot();
1128                let blame_params = maybe!({
1129                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1130                        Some(repo_for_path) => repo_for_path,
1131                        None => anyhow::bail!(NoRepositoryError {}),
1132                    };
1133
1134                    let relative_path = repo_entry
1135                        .relativize(&worktree, &file.path)
1136                        .context("failed to relativize buffer path")?;
1137
1138                    let repo = local_repo_entry.repo().clone();
1139
1140                    let content = match version {
1141                        Some(version) => buffer.rope_for_version(&version).clone(),
1142                        None => buffer.as_rope().clone(),
1143                    };
1144
1145                    anyhow::Ok((repo, relative_path, content))
1146                });
1147
1148                cx.background_executor().spawn(async move {
1149                    let (repo, relative_path, content) = blame_params?;
1150                    repo.blame(&relative_path, content)
1151                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1152                })
1153            }
1154            Worktree::Remote(worktree) => {
1155                let buffer_id = buffer.remote_id();
1156                let version = buffer.version();
1157                let project_id = worktree.project_id();
1158                let client = worktree.client();
1159                cx.spawn(|_| async move {
1160                    let response = client
1161                        .request(proto::BlameBuffer {
1162                            project_id,
1163                            buffer_id: buffer_id.into(),
1164                            version: serialize_version(&version),
1165                        })
1166                        .await?;
1167                    Ok(deserialize_blame_buffer_response(response))
1168                })
1169            }
1170        }
1171    }
1172
1173    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1174        let remote_id = buffer.read(cx).remote_id();
1175        let is_remote = buffer.read(cx).replica_id() != 0;
1176        let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
1177
1178        let handle = cx.handle().downgrade();
1179        buffer.update(cx, move |_, cx| {
1180            cx.on_release(move |buffer, cx| {
1181                handle
1182                    .update(cx, |_, cx| {
1183                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1184                    })
1185                    .ok();
1186            })
1187            .detach()
1188        });
1189
1190        match self.opened_buffers.entry(remote_id) {
1191            hash_map::Entry::Vacant(entry) => {
1192                entry.insert(open_buffer);
1193            }
1194            hash_map::Entry::Occupied(mut entry) => {
1195                if let OpenBuffer::Operations(operations) = entry.get_mut() {
1196                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1197                } else if entry.get().upgrade().is_some() {
1198                    if is_remote {
1199                        return Ok(());
1200                    } else {
1201                        debug_panic!("buffer {} was already registered", remote_id);
1202                        Err(anyhow!("buffer {} was already registered", remote_id))?;
1203                    }
1204                }
1205                entry.insert(open_buffer);
1206            }
1207        }
1208
1209        cx.subscribe(&buffer, Self::on_buffer_event).detach();
1210        cx.emit(BufferStoreEvent::BufferAdded(buffer));
1211        Ok(())
1212    }
1213
1214    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1215        self.opened_buffers
1216            .values()
1217            .filter_map(|buffer| buffer.upgrade())
1218    }
1219
1220    pub fn loading_buffers(
1221        &self,
1222    ) -> impl Iterator<
1223        Item = (
1224            &ProjectPath,
1225            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1226        ),
1227    > {
1228        self.loading_buffers_by_path
1229            .iter()
1230            .map(|(path, rx)| (path, rx.clone()))
1231    }
1232
1233    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1234        self.buffers().find_map(|buffer| {
1235            let file = File::from_dyn(buffer.read(cx).file())?;
1236            if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1237                Some(buffer)
1238            } else {
1239                None
1240            }
1241        })
1242    }
1243
1244    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1245        self.opened_buffers
1246            .get(&buffer_id)
1247            .and_then(|buffer| buffer.upgrade())
1248    }
1249
1250    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1251        self.get(buffer_id)
1252            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1253    }
1254
1255    pub fn get_possibly_incomplete(
1256        &self,
1257        buffer_id: BufferId,
1258        cx: &AppContext,
1259    ) -> Option<Model<Buffer>> {
1260        self.get(buffer_id).or_else(|| {
1261            self.state.as_remote().and_then(|remote| {
1262                remote
1263                    .read(cx)
1264                    .loading_remote_buffers_by_id
1265                    .get(&buffer_id)
1266                    .cloned()
1267            })
1268        })
1269    }
1270
1271    pub fn buffer_version_info(
1272        &self,
1273        cx: &AppContext,
1274    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1275        let buffers = self
1276            .buffers()
1277            .map(|buffer| {
1278                let buffer = buffer.read(cx);
1279                proto::BufferVersion {
1280                    id: buffer.remote_id().into(),
1281                    version: language::proto::serialize_version(&buffer.version),
1282                }
1283            })
1284            .collect();
1285        let incomplete_buffer_ids = self
1286            .state
1287            .as_remote()
1288            .map(|remote| remote.read(cx).incomplete_buffer_ids())
1289            .unwrap_or_default();
1290        (buffers, incomplete_buffer_ids)
1291    }
1292
1293    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1294        for open_buffer in self.opened_buffers.values_mut() {
1295            if let Some(buffer) = open_buffer.upgrade() {
1296                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1297            }
1298        }
1299
1300        for buffer in self.buffers() {
1301            buffer.update(cx, |buffer, cx| {
1302                buffer.set_capability(Capability::ReadOnly, cx)
1303            });
1304        }
1305
1306        if let Some(remote) = self.state.as_remote() {
1307            remote.update(cx, |remote, _| {
1308                // Wake up all futures currently waiting on a buffer to get opened,
1309                // to give them a chance to fail now that we've disconnected.
1310                remote.remote_buffer_listeners.clear()
1311            })
1312        }
1313    }
1314
1315    pub fn shared(
1316        &mut self,
1317        remote_id: u64,
1318        downstream_client: AnyProtoClient,
1319        _cx: &mut AppContext,
1320    ) {
1321        self.downstream_client = Some((downstream_client, remote_id));
1322    }
1323
1324    pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1325        self.downstream_client.take();
1326        self.forget_shared_buffers();
1327    }
1328
1329    pub fn discard_incomplete(&mut self) {
1330        self.opened_buffers
1331            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1332    }
1333
1334    pub fn find_search_candidates(
1335        &mut self,
1336        query: &SearchQuery,
1337        mut limit: usize,
1338        fs: Arc<dyn Fs>,
1339        cx: &mut ModelContext<Self>,
1340    ) -> Receiver<Model<Buffer>> {
1341        let (tx, rx) = smol::channel::unbounded();
1342        let mut open_buffers = HashSet::default();
1343        let mut unnamed_buffers = Vec::new();
1344        for handle in self.buffers() {
1345            let buffer = handle.read(cx);
1346            if let Some(entry_id) = buffer.entry_id(cx) {
1347                open_buffers.insert(entry_id);
1348            } else {
1349                limit = limit.saturating_sub(1);
1350                unnamed_buffers.push(handle)
1351            };
1352        }
1353
1354        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1355        let mut project_paths_rx = self
1356            .worktree_store
1357            .update(cx, |worktree_store, cx| {
1358                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1359            })
1360            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1361
1362        cx.spawn(|this, mut cx| async move {
1363            for buffer in unnamed_buffers {
1364                tx.send(buffer).await.ok();
1365            }
1366
1367            while let Some(project_paths) = project_paths_rx.next().await {
1368                let buffers = this.update(&mut cx, |this, cx| {
1369                    project_paths
1370                        .into_iter()
1371                        .map(|project_path| this.open_buffer(project_path, cx))
1372                        .collect::<Vec<_>>()
1373                })?;
1374                for buffer_task in buffers {
1375                    if let Some(buffer) = buffer_task.await.log_err() {
1376                        if tx.send(buffer).await.is_err() {
1377                            return anyhow::Ok(());
1378                        }
1379                    }
1380                }
1381            }
1382            anyhow::Ok(())
1383        })
1384        .detach();
1385        rx
1386    }
1387
1388    fn on_buffer_event(
1389        &mut self,
1390        buffer: Model<Buffer>,
1391        event: &BufferEvent,
1392        cx: &mut ModelContext<Self>,
1393    ) {
1394        match event {
1395            BufferEvent::FileHandleChanged => {
1396                if let Some(local) = self.state.as_local() {
1397                    local.update(cx, |local, cx| {
1398                        local.buffer_changed_file(buffer, cx);
1399                    })
1400                }
1401            }
1402            BufferEvent::Reloaded => {
1403                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1404                    return;
1405                };
1406                let buffer = buffer.read(cx);
1407                downstream_client
1408                    .send(proto::BufferReloaded {
1409                        project_id: *project_id,
1410                        buffer_id: buffer.remote_id().to_proto(),
1411                        version: serialize_version(&buffer.version()),
1412                        mtime: buffer.saved_mtime().map(|t| t.into()),
1413                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1414                    })
1415                    .log_err();
1416            }
1417            _ => {}
1418        }
1419    }
1420
1421    pub async fn handle_update_buffer(
1422        this: Model<Self>,
1423        envelope: TypedEnvelope<proto::UpdateBuffer>,
1424        mut cx: AsyncAppContext,
1425    ) -> Result<proto::Ack> {
1426        let payload = envelope.payload.clone();
1427        let buffer_id = BufferId::new(payload.buffer_id)?;
1428        let ops = payload
1429            .operations
1430            .into_iter()
1431            .map(language::proto::deserialize_operation)
1432            .collect::<Result<Vec<_>, _>>()?;
1433        this.update(&mut cx, |this, cx| {
1434            match this.opened_buffers.entry(buffer_id) {
1435                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1436                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1437                    OpenBuffer::Buffer(buffer) => {
1438                        if let Some(buffer) = buffer.upgrade() {
1439                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1440                        }
1441                    }
1442                },
1443                hash_map::Entry::Vacant(e) => {
1444                    e.insert(OpenBuffer::Operations(ops));
1445                }
1446            }
1447            Ok(proto::Ack {})
1448        })?
1449    }
1450
1451    pub fn handle_synchronize_buffers(
1452        &mut self,
1453        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1454        cx: &mut ModelContext<Self>,
1455        client: Arc<Client>,
1456    ) -> Result<proto::SynchronizeBuffersResponse> {
1457        let project_id = envelope.payload.project_id;
1458        let mut response = proto::SynchronizeBuffersResponse {
1459            buffers: Default::default(),
1460        };
1461        let Some(guest_id) = envelope.original_sender_id else {
1462            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1463        };
1464
1465        self.shared_buffers.entry(guest_id).or_default().clear();
1466        for buffer in envelope.payload.buffers {
1467            let buffer_id = BufferId::new(buffer.id)?;
1468            let remote_version = language::proto::deserialize_version(&buffer.version);
1469            if let Some(buffer) = self.get(buffer_id) {
1470                self.shared_buffers
1471                    .entry(guest_id)
1472                    .or_default()
1473                    .insert(buffer.clone());
1474
1475                let buffer = buffer.read(cx);
1476                response.buffers.push(proto::BufferVersion {
1477                    id: buffer_id.into(),
1478                    version: language::proto::serialize_version(&buffer.version),
1479                });
1480
1481                let operations = buffer.serialize_ops(Some(remote_version), cx);
1482                let client = client.clone();
1483                if let Some(file) = buffer.file() {
1484                    client
1485                        .send(proto::UpdateBufferFile {
1486                            project_id,
1487                            buffer_id: buffer_id.into(),
1488                            file: Some(file.to_proto(cx)),
1489                        })
1490                        .log_err();
1491                }
1492
1493                client
1494                    .send(proto::UpdateDiffBase {
1495                        project_id,
1496                        buffer_id: buffer_id.into(),
1497                        diff_base: buffer.diff_base().map(ToString::to_string),
1498                    })
1499                    .log_err();
1500
1501                client
1502                    .send(proto::BufferReloaded {
1503                        project_id,
1504                        buffer_id: buffer_id.into(),
1505                        version: language::proto::serialize_version(buffer.saved_version()),
1506                        mtime: buffer.saved_mtime().map(|time| time.into()),
1507                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1508                            as i32,
1509                    })
1510                    .log_err();
1511
1512                cx.background_executor()
1513                    .spawn(
1514                        async move {
1515                            let operations = operations.await;
1516                            for chunk in split_operations(operations) {
1517                                client
1518                                    .request(proto::UpdateBuffer {
1519                                        project_id,
1520                                        buffer_id: buffer_id.into(),
1521                                        operations: chunk,
1522                                    })
1523                                    .await?;
1524                            }
1525                            anyhow::Ok(())
1526                        }
1527                        .log_err(),
1528                    )
1529                    .detach();
1530            }
1531        }
1532        Ok(response)
1533    }
1534
1535    pub fn handle_create_buffer_for_peer(
1536        &mut self,
1537        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1538        replica_id: u16,
1539        capability: Capability,
1540        cx: &mut ModelContext<Self>,
1541    ) -> Result<()> {
1542        let Some(remote) = self.state.as_remote() else {
1543            return Err(anyhow!("buffer store is not a remote"));
1544        };
1545
1546        if let Some(buffer) = remote.update(cx, |remote, cx| {
1547            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)
1548        })? {
1549            self.add_buffer(buffer, cx)?;
1550        }
1551
1552        Ok(())
1553    }
1554
1555    pub async fn handle_update_buffer_file(
1556        this: Model<Self>,
1557        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1558        mut cx: AsyncAppContext,
1559    ) -> Result<()> {
1560        let buffer_id = envelope.payload.buffer_id;
1561        let buffer_id = BufferId::new(buffer_id)?;
1562
1563        this.update(&mut cx, |this, cx| {
1564            let payload = envelope.payload.clone();
1565            if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1566                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1567                let worktree = this
1568                    .worktree_store
1569                    .read(cx)
1570                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1571                    .ok_or_else(|| anyhow!("no such worktree"))?;
1572                let file = File::from_proto(file, worktree, cx)?;
1573                let old_file = buffer.update(cx, |buffer, cx| {
1574                    let old_file = buffer.file().cloned();
1575                    let new_path = file.path.clone();
1576                    buffer.file_updated(Arc::new(file), cx);
1577                    if old_file
1578                        .as_ref()
1579                        .map_or(true, |old| *old.path() != new_path)
1580                    {
1581                        Some(old_file)
1582                    } else {
1583                        None
1584                    }
1585                });
1586                if let Some(old_file) = old_file {
1587                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1588                }
1589            }
1590            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1591                downstream_client
1592                    .send(proto::UpdateBufferFile {
1593                        project_id: *project_id,
1594                        buffer_id: buffer_id.into(),
1595                        file: envelope.payload.file,
1596                    })
1597                    .log_err();
1598            }
1599            Ok(())
1600        })?
1601    }
1602
1603    pub async fn handle_update_diff_base(
1604        this: Model<Self>,
1605        envelope: TypedEnvelope<proto::UpdateDiffBase>,
1606        mut cx: AsyncAppContext,
1607    ) -> Result<()> {
1608        this.update(&mut cx, |this, cx| {
1609            let buffer_id = envelope.payload.buffer_id;
1610            let buffer_id = BufferId::new(buffer_id)?;
1611            if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1612                buffer.update(cx, |buffer, cx| {
1613                    buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
1614                });
1615            }
1616            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1617                downstream_client
1618                    .send(proto::UpdateDiffBase {
1619                        project_id: *project_id,
1620                        buffer_id: buffer_id.into(),
1621                        diff_base: envelope.payload.diff_base,
1622                    })
1623                    .log_err();
1624            }
1625            Ok(())
1626        })?
1627    }
1628
1629    pub async fn handle_save_buffer(
1630        this: Model<Self>,
1631        envelope: TypedEnvelope<proto::SaveBuffer>,
1632        mut cx: AsyncAppContext,
1633    ) -> Result<proto::BufferSaved> {
1634        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1635        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1636            anyhow::Ok((
1637                this.get_existing(buffer_id)?,
1638                this.downstream_client
1639                    .as_ref()
1640                    .map(|(_, project_id)| *project_id)
1641                    .context("project is not shared")?,
1642            ))
1643        })??;
1644        buffer
1645            .update(&mut cx, |buffer, _| {
1646                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1647            })?
1648            .await?;
1649        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1650
1651        if let Some(new_path) = envelope.payload.new_path {
1652            let new_path = ProjectPath::from_proto(new_path);
1653            this.update(&mut cx, |this, cx| {
1654                this.save_buffer_as(buffer.clone(), new_path, cx)
1655            })?
1656            .await?;
1657        } else {
1658            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1659                .await?;
1660        }
1661
1662        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1663            project_id,
1664            buffer_id: buffer_id.into(),
1665            version: serialize_version(buffer.saved_version()),
1666            mtime: buffer.saved_mtime().map(|time| time.into()),
1667        })
1668    }
1669
1670    pub async fn handle_close_buffer(
1671        this: Model<Self>,
1672        envelope: TypedEnvelope<proto::CloseBuffer>,
1673        mut cx: AsyncAppContext,
1674    ) -> Result<()> {
1675        let peer_id = envelope.sender_id;
1676        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1677        this.update(&mut cx, |this, _| {
1678            if let Some(buffer) = this.get(buffer_id) {
1679                if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1680                    if shared.remove(&buffer) {
1681                        if shared.is_empty() {
1682                            this.shared_buffers.remove(&peer_id);
1683                        }
1684                        return;
1685                    }
1686                }
1687            };
1688            debug_panic!(
1689                "peer_id {} closed buffer_id {} which was either not open or already closed",
1690                peer_id,
1691                buffer_id
1692            )
1693        })
1694    }
1695
1696    pub async fn handle_buffer_saved(
1697        this: Model<Self>,
1698        envelope: TypedEnvelope<proto::BufferSaved>,
1699        mut cx: AsyncAppContext,
1700    ) -> Result<()> {
1701        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1702        let version = deserialize_version(&envelope.payload.version);
1703        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1704        this.update(&mut cx, move |this, cx| {
1705            if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1706                buffer.update(cx, |buffer, cx| {
1707                    buffer.did_save(version, mtime, cx);
1708                });
1709            }
1710
1711            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1712                downstream_client
1713                    .send(proto::BufferSaved {
1714                        project_id: *project_id,
1715                        buffer_id: buffer_id.into(),
1716                        mtime: envelope.payload.mtime,
1717                        version: envelope.payload.version,
1718                    })
1719                    .log_err();
1720            }
1721        })
1722    }
1723
1724    pub async fn handle_buffer_reloaded(
1725        this: Model<Self>,
1726        envelope: TypedEnvelope<proto::BufferReloaded>,
1727        mut cx: AsyncAppContext,
1728    ) -> Result<()> {
1729        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1730        let version = deserialize_version(&envelope.payload.version);
1731        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1732        let line_ending = deserialize_line_ending(
1733            proto::LineEnding::from_i32(envelope.payload.line_ending)
1734                .ok_or_else(|| anyhow!("missing line ending"))?,
1735        );
1736        this.update(&mut cx, |this, cx| {
1737            if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1738                buffer.update(cx, |buffer, cx| {
1739                    buffer.did_reload(version, line_ending, mtime, cx);
1740                });
1741            }
1742
1743            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1744                downstream_client
1745                    .send(proto::BufferReloaded {
1746                        project_id: *project_id,
1747                        buffer_id: buffer_id.into(),
1748                        mtime: envelope.payload.mtime,
1749                        version: envelope.payload.version,
1750                        line_ending: envelope.payload.line_ending,
1751                    })
1752                    .log_err();
1753            }
1754        })
1755    }
1756
1757    pub async fn handle_blame_buffer(
1758        this: Model<Self>,
1759        envelope: TypedEnvelope<proto::BlameBuffer>,
1760        mut cx: AsyncAppContext,
1761    ) -> Result<proto::BlameBufferResponse> {
1762        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1763        let version = deserialize_version(&envelope.payload.version);
1764        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1765        buffer
1766            .update(&mut cx, |buffer, _| {
1767                buffer.wait_for_version(version.clone())
1768            })?
1769            .await?;
1770        let blame = this
1771            .update(&mut cx, |this, cx| {
1772                this.blame_buffer(&buffer, Some(version), cx)
1773            })?
1774            .await?;
1775        Ok(serialize_blame_buffer_response(blame))
1776    }
1777
1778    pub async fn wait_for_loading_buffer(
1779        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1780    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1781        loop {
1782            if let Some(result) = receiver.borrow().as_ref() {
1783                match result {
1784                    Ok(buffer) => return Ok(buffer.to_owned()),
1785                    Err(e) => return Err(e.to_owned()),
1786                }
1787            }
1788            receiver.next().await;
1789        }
1790    }
1791
1792    pub fn reload_buffers(
1793        &self,
1794        buffers: HashSet<Model<Buffer>>,
1795        push_to_history: bool,
1796        cx: &mut ModelContext<Self>,
1797    ) -> Task<Result<ProjectTransaction>> {
1798        let buffers: Vec<Model<Buffer>> = buffers
1799            .into_iter()
1800            .filter(|buffer| buffer.read(cx).is_dirty())
1801            .collect();
1802        if buffers.is_empty() {
1803            return Task::ready(Ok(ProjectTransaction::default()));
1804        }
1805        self.state.reload_buffers(buffers, push_to_history, cx)
1806    }
1807
1808    async fn handle_reload_buffers(
1809        this: Model<Self>,
1810        envelope: TypedEnvelope<proto::ReloadBuffers>,
1811        mut cx: AsyncAppContext,
1812    ) -> Result<proto::ReloadBuffersResponse> {
1813        let sender_id = envelope.original_sender_id().unwrap_or_default();
1814        let reload = this.update(&mut cx, |this, cx| {
1815            let mut buffers = HashSet::default();
1816            for buffer_id in &envelope.payload.buffer_ids {
1817                let buffer_id = BufferId::new(*buffer_id)?;
1818                buffers.insert(this.get_existing(buffer_id)?);
1819            }
1820            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1821        })??;
1822
1823        let project_transaction = reload.await?;
1824        let project_transaction = this.update(&mut cx, |this, cx| {
1825            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1826        })?;
1827        Ok(proto::ReloadBuffersResponse {
1828            transaction: Some(project_transaction),
1829        })
1830    }
1831
1832    pub fn create_buffer_for_peer(
1833        &mut self,
1834        buffer: &Model<Buffer>,
1835        peer_id: proto::PeerId,
1836        cx: &mut ModelContext<Self>,
1837    ) -> Task<Result<()>> {
1838        let buffer_id = buffer.read(cx).remote_id();
1839        if !self
1840            .shared_buffers
1841            .entry(peer_id)
1842            .or_default()
1843            .insert(buffer.clone())
1844        {
1845            return Task::ready(Ok(()));
1846        }
1847
1848        let Some((client, project_id)) = self.downstream_client.clone() else {
1849            return Task::ready(Ok(()));
1850        };
1851
1852        cx.spawn(|this, mut cx| async move {
1853            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1854                return anyhow::Ok(());
1855            };
1856
1857            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1858            let operations = operations.await;
1859            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1860
1861            let initial_state = proto::CreateBufferForPeer {
1862                project_id,
1863                peer_id: Some(peer_id),
1864                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1865            };
1866
1867            if client.send(initial_state).log_err().is_some() {
1868                let client = client.clone();
1869                cx.background_executor()
1870                    .spawn(async move {
1871                        let mut chunks = split_operations(operations).peekable();
1872                        while let Some(chunk) = chunks.next() {
1873                            let is_last = chunks.peek().is_none();
1874                            client.send(proto::CreateBufferForPeer {
1875                                project_id,
1876                                peer_id: Some(peer_id),
1877                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1878                                    proto::BufferChunk {
1879                                        buffer_id: buffer_id.into(),
1880                                        operations: chunk,
1881                                        is_last,
1882                                    },
1883                                )),
1884                            })?;
1885                        }
1886                        anyhow::Ok(())
1887                    })
1888                    .await
1889                    .log_err();
1890            }
1891            Ok(())
1892        })
1893    }
1894
1895    pub fn forget_shared_buffers(&mut self) {
1896        self.shared_buffers.clear();
1897    }
1898
1899    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1900        self.shared_buffers.remove(peer_id);
1901    }
1902
1903    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1904        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1905            self.shared_buffers.insert(new_peer_id, buffers);
1906        }
1907    }
1908
1909    pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
1910        &self.shared_buffers
1911    }
1912
1913    pub fn create_local_buffer(
1914        &mut self,
1915        text: &str,
1916        language: Option<Arc<Language>>,
1917        cx: &mut ModelContext<Self>,
1918    ) -> Model<Buffer> {
1919        let buffer = cx.new_model(|cx| {
1920            Buffer::local(text, cx)
1921                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1922        });
1923
1924        self.add_buffer(buffer.clone(), cx).log_err();
1925        let buffer_id = buffer.read(cx).remote_id();
1926
1927        let local = self
1928            .state
1929            .as_local()
1930            .expect("local-only method called in a non-local context");
1931        local.update(cx, |this, cx| {
1932            if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1933                this.local_buffer_ids_by_path.insert(
1934                    ProjectPath {
1935                        worktree_id: file.worktree_id(cx),
1936                        path: file.path.clone(),
1937                    },
1938                    buffer_id,
1939                );
1940
1941                if let Some(entry_id) = file.entry_id {
1942                    this.local_buffer_ids_by_entry_id
1943                        .insert(entry_id, buffer_id);
1944                }
1945            }
1946        });
1947        buffer
1948    }
1949
1950    pub fn deserialize_project_transaction(
1951        &mut self,
1952        message: proto::ProjectTransaction,
1953        push_to_history: bool,
1954        cx: &mut ModelContext<Self>,
1955    ) -> Task<Result<ProjectTransaction>> {
1956        if let Some(remote) = self.state.as_remote() {
1957            remote.update(cx, |remote, cx| {
1958                remote.deserialize_project_transaction(message, push_to_history, cx)
1959            })
1960        } else {
1961            debug_panic!("not a remote buffer store");
1962            Task::ready(Err(anyhow!("not a remote buffer store")))
1963        }
1964    }
1965
1966    pub fn wait_for_remote_buffer(
1967        &self,
1968        id: BufferId,
1969        cx: &mut AppContext,
1970    ) -> Task<Result<Model<Buffer>>> {
1971        if let Some(remote) = self.state.as_remote() {
1972            remote.update(cx, |remote, cx| remote.wait_for_remote_buffer(id, cx))
1973        } else {
1974            debug_panic!("not a remote buffer store");
1975            Task::ready(Err(anyhow!("not a remote buffer store")))
1976        }
1977    }
1978
1979    pub fn serialize_project_transaction_for_peer(
1980        &mut self,
1981        project_transaction: ProjectTransaction,
1982        peer_id: proto::PeerId,
1983        cx: &mut ModelContext<Self>,
1984    ) -> proto::ProjectTransaction {
1985        let mut serialized_transaction = proto::ProjectTransaction {
1986            buffer_ids: Default::default(),
1987            transactions: Default::default(),
1988        };
1989        for (buffer, transaction) in project_transaction.0 {
1990            self.create_buffer_for_peer(&buffer, peer_id, cx)
1991                .detach_and_log_err(cx);
1992            serialized_transaction
1993                .buffer_ids
1994                .push(buffer.read(cx).remote_id().into());
1995            serialized_transaction
1996                .transactions
1997                .push(language::proto::serialize_transaction(&transaction));
1998        }
1999        serialized_transaction
2000    }
2001}
2002
2003impl OpenBuffer {
2004    fn upgrade(&self) -> Option<Model<Buffer>> {
2005        match self {
2006            OpenBuffer::Buffer(handle) => handle.upgrade(),
2007            OpenBuffer::Operations(_) => None,
2008        }
2009    }
2010}
2011
2012fn is_not_found_error(error: &anyhow::Error) -> bool {
2013    error
2014        .root_cause()
2015        .downcast_ref::<io::Error>()
2016        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2017}
2018
2019fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
2020    let entries = blame
2021        .entries
2022        .into_iter()
2023        .map(|entry| proto::BlameEntry {
2024            sha: entry.sha.as_bytes().into(),
2025            start_line: entry.range.start,
2026            end_line: entry.range.end,
2027            original_line_number: entry.original_line_number,
2028            author: entry.author.clone(),
2029            author_mail: entry.author_mail.clone(),
2030            author_time: entry.author_time,
2031            author_tz: entry.author_tz.clone(),
2032            committer: entry.committer.clone(),
2033            committer_mail: entry.committer_mail.clone(),
2034            committer_time: entry.committer_time,
2035            committer_tz: entry.committer_tz.clone(),
2036            summary: entry.summary.clone(),
2037            previous: entry.previous.clone(),
2038            filename: entry.filename.clone(),
2039        })
2040        .collect::<Vec<_>>();
2041
2042    let messages = blame
2043        .messages
2044        .into_iter()
2045        .map(|(oid, message)| proto::CommitMessage {
2046            oid: oid.as_bytes().into(),
2047            message,
2048        })
2049        .collect::<Vec<_>>();
2050
2051    let permalinks = blame
2052        .permalinks
2053        .into_iter()
2054        .map(|(oid, url)| proto::CommitPermalink {
2055            oid: oid.as_bytes().into(),
2056            permalink: url.to_string(),
2057        })
2058        .collect::<Vec<_>>();
2059
2060    proto::BlameBufferResponse {
2061        entries,
2062        messages,
2063        permalinks,
2064        remote_url: blame.remote_url,
2065    }
2066}
2067
2068fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
2069    let entries = response
2070        .entries
2071        .into_iter()
2072        .filter_map(|entry| {
2073            Some(git::blame::BlameEntry {
2074                sha: git::Oid::from_bytes(&entry.sha).ok()?,
2075                range: entry.start_line..entry.end_line,
2076                original_line_number: entry.original_line_number,
2077                committer: entry.committer,
2078                committer_time: entry.committer_time,
2079                committer_tz: entry.committer_tz,
2080                committer_mail: entry.committer_mail,
2081                author: entry.author,
2082                author_mail: entry.author_mail,
2083                author_time: entry.author_time,
2084                author_tz: entry.author_tz,
2085                summary: entry.summary,
2086                previous: entry.previous,
2087                filename: entry.filename,
2088            })
2089        })
2090        .collect::<Vec<_>>();
2091
2092    let messages = response
2093        .messages
2094        .into_iter()
2095        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2096        .collect::<HashMap<_, _>>();
2097
2098    let permalinks = response
2099        .permalinks
2100        .into_iter()
2101        .filter_map(|permalink| {
2102            Some((
2103                git::Oid::from_bytes(&permalink.oid).ok()?,
2104                Url::from_str(&permalink.permalink).ok()?,
2105            ))
2106        })
2107        .collect::<HashMap<_, _>>();
2108
2109    Blame {
2110        entries,
2111        permalinks,
2112        messages,
2113        remote_url: response.remote_url,
2114    }
2115}