buffer_store.rs

   1use crate::{
   2    search::SearchQuery,
   3    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   4    Item, ProjectPath,
   5};
   6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
   7use anyhow::{anyhow, Context as _, Result};
   8use client::Client;
   9use collections::{hash_map, HashMap, HashSet};
  10use fs::Fs;
  11use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
  12use git::blame::Blame;
  13use gpui::{
  14    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
  15    Task, WeakModel,
  16};
  17use http_client::Url;
  18use language::{
  19    proto::{
  20        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  21        split_operations,
  22    },
  23    Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
  24};
  25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
  26use smol::channel::Receiver;
  27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
  28use text::BufferId;
  29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
  31
  32/// A set of open buffers.
  33pub struct BufferStore {
  34    state: BufferStoreState,
  35    #[allow(clippy::type_complexity)]
  36    loading_buffers_by_path: HashMap<
  37        ProjectPath,
  38        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  39    >,
  40    worktree_store: Model<WorktreeStore>,
  41    opened_buffers: HashMap<BufferId, OpenBuffer>,
  42    downstream_client: Option<(AnyProtoClient, u64)>,
  43    shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
  44}
  45
  46enum BufferStoreState {
  47    Local(LocalBufferStore),
  48    Remote(RemoteBufferStore),
  49}
  50
  51struct RemoteBufferStore {
  52    shared_with_me: HashSet<Model<Buffer>>,
  53    upstream_client: AnyProtoClient,
  54    project_id: u64,
  55    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  56    remote_buffer_listeners:
  57        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  58    worktree_store: Model<WorktreeStore>,
  59}
  60
  61struct LocalBufferStore {
  62    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  63    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  64    worktree_store: Model<WorktreeStore>,
  65    _subscription: Subscription,
  66}
  67
  68enum OpenBuffer {
  69    Buffer(WeakModel<Buffer>),
  70    Operations(Vec<Operation>),
  71}
  72
  73pub enum BufferStoreEvent {
  74    BufferAdded(Model<Buffer>),
  75    BufferDropped(BufferId),
  76    BufferChangedFilePath {
  77        buffer: Model<Buffer>,
  78        old_file: Option<Arc<dyn language::File>>,
  79    },
  80}
  81
  82#[derive(Default, Debug)]
  83pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
  84
  85impl EventEmitter<BufferStoreEvent> for BufferStore {}
  86
  87impl RemoteBufferStore {
  88    pub fn wait_for_remote_buffer(
  89        &mut self,
  90        id: BufferId,
  91        cx: &mut ModelContext<BufferStore>,
  92    ) -> Task<Result<Model<Buffer>>> {
  93        let (tx, rx) = oneshot::channel();
  94        self.remote_buffer_listeners.entry(id).or_default().push(tx);
  95
  96        cx.spawn(|this, cx| async move {
  97            if let Some(buffer) = this
  98                .read_with(&cx, |buffer_store, _| buffer_store.get(id))
  99                .ok()
 100                .flatten()
 101            {
 102                return Ok(buffer);
 103            }
 104
 105            cx.background_executor()
 106                .spawn(async move { rx.await? })
 107                .await
 108        })
 109    }
 110
 111    fn save_remote_buffer(
 112        &self,
 113        buffer_handle: Model<Buffer>,
 114        new_path: Option<proto::ProjectPath>,
 115        cx: &ModelContext<BufferStore>,
 116    ) -> Task<Result<()>> {
 117        let buffer = buffer_handle.read(cx);
 118        let buffer_id = buffer.remote_id().into();
 119        let version = buffer.version();
 120        let rpc = self.upstream_client.clone();
 121        let project_id = self.project_id;
 122        cx.spawn(move |_, mut cx| async move {
 123            let response = rpc
 124                .request(proto::SaveBuffer {
 125                    project_id,
 126                    buffer_id,
 127                    new_path,
 128                    version: serialize_version(&version),
 129                })
 130                .await?;
 131            let version = deserialize_version(&response.version);
 132            let mtime = response.mtime.map(|mtime| mtime.into());
 133
 134            buffer_handle.update(&mut cx, |buffer, cx| {
 135                buffer.did_save(version.clone(), mtime, cx);
 136            })?;
 137
 138            Ok(())
 139        })
 140    }
 141
 142    pub fn handle_create_buffer_for_peer(
 143        &mut self,
 144        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 145        replica_id: u16,
 146        capability: Capability,
 147        cx: &mut ModelContext<BufferStore>,
 148    ) -> Result<Option<Model<Buffer>>> {
 149        match envelope
 150            .payload
 151            .variant
 152            .ok_or_else(|| anyhow!("missing variant"))?
 153        {
 154            proto::create_buffer_for_peer::Variant::State(mut state) => {
 155                let buffer_id = BufferId::new(state.id)?;
 156
 157                let buffer_result = maybe!({
 158                    let mut buffer_file = None;
 159                    if let Some(file) = state.file.take() {
 160                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 161                        let worktree = self
 162                            .worktree_store
 163                            .read(cx)
 164                            .worktree_for_id(worktree_id, cx)
 165                            .ok_or_else(|| {
 166                                anyhow!("no worktree found for id {}", file.worktree_id)
 167                            })?;
 168                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
 169                            as Arc<dyn language::File>);
 170                    }
 171                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 172                });
 173
 174                match buffer_result {
 175                    Ok(buffer) => {
 176                        let buffer = cx.new_model(|_| buffer);
 177                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 178                    }
 179                    Err(error) => {
 180                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 181                            for listener in listeners {
 182                                listener.send(Err(anyhow!(error.cloned()))).ok();
 183                            }
 184                        }
 185                    }
 186                }
 187            }
 188            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 189                let buffer_id = BufferId::new(chunk.buffer_id)?;
 190                let buffer = self
 191                    .loading_remote_buffers_by_id
 192                    .get(&buffer_id)
 193                    .cloned()
 194                    .ok_or_else(|| {
 195                        anyhow!(
 196                            "received chunk for buffer {} without initial state",
 197                            chunk.buffer_id
 198                        )
 199                    })?;
 200
 201                let result = maybe!({
 202                    let operations = chunk
 203                        .operations
 204                        .into_iter()
 205                        .map(language::proto::deserialize_operation)
 206                        .collect::<Result<Vec<_>>>()?;
 207                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 208                    anyhow::Ok(())
 209                });
 210
 211                if let Err(error) = result {
 212                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 213                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 214                        for listener in listeners {
 215                            listener.send(Err(error.cloned())).ok();
 216                        }
 217                    }
 218                } else if chunk.is_last {
 219                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 220                    if self.upstream_client.is_via_collab() {
 221                        // retain buffers sent by peers to avoid races.
 222                        self.shared_with_me.insert(buffer.clone());
 223                    }
 224
 225                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 226                        for sender in senders {
 227                            sender.send(Ok(buffer.clone())).ok();
 228                        }
 229                    }
 230                    return Ok(Some(buffer));
 231                }
 232            }
 233        }
 234        return Ok(None);
 235    }
 236
 237    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 238        self.loading_remote_buffers_by_id
 239            .keys()
 240            .copied()
 241            .collect::<Vec<_>>()
 242    }
 243
 244    pub fn deserialize_project_transaction(
 245        &self,
 246        message: proto::ProjectTransaction,
 247        push_to_history: bool,
 248        cx: &mut ModelContext<BufferStore>,
 249    ) -> Task<Result<ProjectTransaction>> {
 250        cx.spawn(|this, mut cx| async move {
 251            let mut project_transaction = ProjectTransaction::default();
 252            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 253            {
 254                let buffer_id = BufferId::new(buffer_id)?;
 255                let buffer = this
 256                    .update(&mut cx, |this, cx| {
 257                        this.wait_for_remote_buffer(buffer_id, cx)
 258                    })?
 259                    .await?;
 260                let transaction = language::proto::deserialize_transaction(transaction)?;
 261                project_transaction.0.insert(buffer, transaction);
 262            }
 263
 264            for (buffer, transaction) in &project_transaction.0 {
 265                buffer
 266                    .update(&mut cx, |buffer, _| {
 267                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 268                    })?
 269                    .await?;
 270
 271                if push_to_history {
 272                    buffer.update(&mut cx, |buffer, _| {
 273                        buffer.push_transaction(transaction.clone(), Instant::now());
 274                    })?;
 275                }
 276            }
 277
 278            Ok(project_transaction)
 279        })
 280    }
 281
 282    fn open_buffer(
 283        &self,
 284        path: Arc<Path>,
 285        worktree: Model<Worktree>,
 286        cx: &mut ModelContext<BufferStore>,
 287    ) -> Task<Result<Model<Buffer>>> {
 288        let worktree_id = worktree.read(cx).id().to_proto();
 289        let project_id = self.project_id;
 290        let client = self.upstream_client.clone();
 291        let path_string = path.clone().to_string_lossy().to_string();
 292        cx.spawn(move |this, mut cx| async move {
 293            let response = client
 294                .request(proto::OpenBufferByPath {
 295                    project_id,
 296                    worktree_id,
 297                    path: path_string,
 298                })
 299                .await?;
 300            let buffer_id = BufferId::new(response.buffer_id)?;
 301
 302            let buffer = this
 303                .update(&mut cx, {
 304                    |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 305                })?
 306                .await?;
 307
 308            Ok(buffer)
 309        })
 310    }
 311
 312    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
 313        let create = self.upstream_client.request(proto::OpenNewBuffer {
 314            project_id: self.project_id,
 315        });
 316        cx.spawn(|this, mut cx| async move {
 317            let response = create.await?;
 318            let buffer_id = BufferId::new(response.buffer_id)?;
 319
 320            this.update(&mut cx, |this, cx| {
 321                this.wait_for_remote_buffer(buffer_id, cx)
 322            })?
 323            .await
 324        })
 325    }
 326
 327    fn reload_buffers(
 328        &self,
 329        buffers: HashSet<Model<Buffer>>,
 330        push_to_history: bool,
 331        cx: &mut ModelContext<BufferStore>,
 332    ) -> Task<Result<ProjectTransaction>> {
 333        let request = self.upstream_client.request(proto::ReloadBuffers {
 334            project_id: self.project_id,
 335            buffer_ids: buffers
 336                .iter()
 337                .map(|buffer| buffer.read(cx).remote_id().to_proto())
 338                .collect(),
 339        });
 340
 341        cx.spawn(|this, mut cx| async move {
 342            let response = request
 343                .await?
 344                .transaction
 345                .ok_or_else(|| anyhow!("missing transaction"))?;
 346            this.update(&mut cx, |this, cx| {
 347                this.deserialize_project_transaction(response, push_to_history, cx)
 348            })?
 349            .await
 350        })
 351    }
 352}
 353
 354impl LocalBufferStore {
 355    fn save_local_buffer(
 356        &self,
 357        buffer_handle: Model<Buffer>,
 358        worktree: Model<Worktree>,
 359        path: Arc<Path>,
 360        mut has_changed_file: bool,
 361        cx: &mut ModelContext<BufferStore>,
 362    ) -> Task<Result<()>> {
 363        let buffer = buffer_handle.read(cx);
 364
 365        let text = buffer.as_rope().clone();
 366        let line_ending = buffer.line_ending();
 367        let version = buffer.version();
 368        let buffer_id = buffer.remote_id();
 369        if buffer
 370            .file()
 371            .is_some_and(|file| file.disk_state() == DiskState::New)
 372        {
 373            has_changed_file = true;
 374        }
 375
 376        let save = worktree.update(cx, |worktree, cx| {
 377            worktree.write_file(path.as_ref(), text, line_ending, cx)
 378        });
 379
 380        cx.spawn(move |this, mut cx| async move {
 381            let new_file = save.await?;
 382            let mtime = new_file.disk_state().mtime();
 383            this.update(&mut cx, |this, cx| {
 384                if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
 385                    if has_changed_file {
 386                        downstream_client
 387                            .send(proto::UpdateBufferFile {
 388                                project_id,
 389                                buffer_id: buffer_id.to_proto(),
 390                                file: Some(language::File::to_proto(&*new_file, cx)),
 391                            })
 392                            .log_err();
 393                    }
 394                    downstream_client
 395                        .send(proto::BufferSaved {
 396                            project_id,
 397                            buffer_id: buffer_id.to_proto(),
 398                            version: serialize_version(&version),
 399                            mtime: mtime.map(|time| time.into()),
 400                        })
 401                        .log_err();
 402                }
 403            })?;
 404            buffer_handle.update(&mut cx, |buffer, cx| {
 405                if has_changed_file {
 406                    buffer.file_updated(new_file, cx);
 407                }
 408                buffer.did_save(version.clone(), mtime, cx);
 409            })
 410        })
 411    }
 412
 413    fn subscribe_to_worktree(
 414        &mut self,
 415        worktree: &Model<Worktree>,
 416        cx: &mut ModelContext<BufferStore>,
 417    ) {
 418        cx.subscribe(worktree, |this, worktree, event, cx| {
 419            if worktree.read(cx).is_local() {
 420                match event {
 421                    worktree::Event::UpdatedEntries(changes) => {
 422                        Self::local_worktree_entries_changed(this, &worktree, changes, cx);
 423                    }
 424                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 425                        Self::local_worktree_git_repos_changed(
 426                            this,
 427                            worktree.clone(),
 428                            updated_repos,
 429                            cx,
 430                        )
 431                    }
 432                    _ => {}
 433                }
 434            }
 435        })
 436        .detach();
 437    }
 438
 439    fn local_worktree_entries_changed(
 440        this: &mut BufferStore,
 441        worktree_handle: &Model<Worktree>,
 442        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 443        cx: &mut ModelContext<BufferStore>,
 444    ) {
 445        let snapshot = worktree_handle.read(cx).snapshot();
 446        for (path, entry_id, _) in changes {
 447            Self::local_worktree_entry_changed(
 448                this,
 449                *entry_id,
 450                path,
 451                worktree_handle,
 452                &snapshot,
 453                cx,
 454            );
 455        }
 456    }
 457
 458    fn local_worktree_git_repos_changed(
 459        this: &mut BufferStore,
 460        worktree_handle: Model<Worktree>,
 461        changed_repos: &UpdatedGitRepositoriesSet,
 462        cx: &mut ModelContext<BufferStore>,
 463    ) {
 464        debug_assert!(worktree_handle.read(cx).is_local());
 465
 466        // Identify the loading buffers whose containing repository that has changed.
 467        let future_buffers = this
 468            .loading_buffers()
 469            .filter_map(|(project_path, receiver)| {
 470                if project_path.worktree_id != worktree_handle.read(cx).id() {
 471                    return None;
 472                }
 473                let path = &project_path.path;
 474                changed_repos
 475                    .iter()
 476                    .find(|(work_dir, _)| path.starts_with(work_dir))?;
 477                let path = path.clone();
 478                Some(async move {
 479                    BufferStore::wait_for_loading_buffer(receiver)
 480                        .await
 481                        .ok()
 482                        .map(|buffer| (buffer, path))
 483                })
 484            })
 485            .collect::<FuturesUnordered<_>>();
 486
 487        // Identify the current buffers whose containing repository has changed.
 488        let current_buffers = this
 489            .buffers()
 490            .filter_map(|buffer| {
 491                let file = File::from_dyn(buffer.read(cx).file())?;
 492                if file.worktree != worktree_handle {
 493                    return None;
 494                }
 495                changed_repos
 496                    .iter()
 497                    .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 498                Some((buffer, file.path.clone()))
 499            })
 500            .collect::<Vec<_>>();
 501
 502        if future_buffers.len() + current_buffers.len() == 0 {
 503            return;
 504        }
 505
 506        cx.spawn(move |this, mut cx| async move {
 507            // Wait for all of the buffers to load.
 508            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 509
 510            // Reload the diff base for every buffer whose containing git repository has changed.
 511            let snapshot =
 512                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 513            let diff_bases_by_buffer = cx
 514                .background_executor()
 515                .spawn(async move {
 516                    let mut diff_base_tasks = future_buffers
 517                        .into_iter()
 518                        .flatten()
 519                        .chain(current_buffers)
 520                        .filter_map(|(buffer, path)| {
 521                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 522                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 523                            Some(async move {
 524                                let base_text =
 525                                    local_repo_entry.repo().load_index_text(&relative_path);
 526                                Some((buffer, base_text))
 527                            })
 528                        })
 529                        .collect::<FuturesUnordered<_>>();
 530
 531                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 532                    while let Some(diff_base) = diff_base_tasks.next().await {
 533                        if let Some(diff_base) = diff_base {
 534                            diff_bases.push(diff_base);
 535                        }
 536                    }
 537                    diff_bases
 538                })
 539                .await;
 540
 541            this.update(&mut cx, |this, cx| {
 542                // Assign the new diff bases on all of the buffers.
 543                for (buffer, diff_base) in diff_bases_by_buffer {
 544                    let buffer_id = buffer.update(cx, |buffer, cx| {
 545                        buffer.set_diff_base(diff_base.clone(), cx);
 546                        buffer.remote_id().to_proto()
 547                    });
 548                    if let Some((client, project_id)) = &this.downstream_client.clone() {
 549                        client
 550                            .send(proto::UpdateDiffBase {
 551                                project_id: *project_id,
 552                                buffer_id,
 553                                diff_base,
 554                            })
 555                            .log_err();
 556                    }
 557                }
 558            })
 559        })
 560        .detach_and_log_err(cx);
 561    }
 562
 563    fn local_worktree_entry_changed(
 564        this: &mut BufferStore,
 565        entry_id: ProjectEntryId,
 566        path: &Arc<Path>,
 567        worktree: &Model<worktree::Worktree>,
 568        snapshot: &worktree::Snapshot,
 569        cx: &mut ModelContext<BufferStore>,
 570    ) -> Option<()> {
 571        let project_path = ProjectPath {
 572            worktree_id: snapshot.id(),
 573            path: path.clone(),
 574        };
 575
 576        let buffer_id = {
 577            let local = this.as_local_mut()?;
 578            match local.local_buffer_ids_by_entry_id.get(&entry_id) {
 579                Some(&buffer_id) => buffer_id,
 580                None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
 581            }
 582        };
 583
 584        let buffer = if let Some(buffer) = this.get(buffer_id) {
 585            Some(buffer)
 586        } else {
 587            this.opened_buffers.remove(&buffer_id);
 588            None
 589        };
 590
 591        let buffer = if let Some(buffer) = buffer {
 592            buffer
 593        } else {
 594            let this = this.as_local_mut()?;
 595            this.local_buffer_ids_by_path.remove(&project_path);
 596            this.local_buffer_ids_by_entry_id.remove(&entry_id);
 597            return None;
 598        };
 599
 600        let events = buffer.update(cx, |buffer, cx| {
 601            let local = this.as_local_mut()?;
 602            let file = buffer.file()?;
 603            let old_file = File::from_dyn(Some(file))?;
 604            if old_file.worktree != *worktree {
 605                return None;
 606            }
 607
 608            let snapshot_entry = old_file
 609                .entry_id
 610                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 611                .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
 612
 613            let new_file = if let Some(entry) = snapshot_entry {
 614                File {
 615                    disk_state: match entry.mtime {
 616                        Some(mtime) => DiskState::Present { mtime },
 617                        None => old_file.disk_state,
 618                    },
 619                    is_local: true,
 620                    entry_id: Some(entry.id),
 621                    path: entry.path.clone(),
 622                    worktree: worktree.clone(),
 623                    is_private: entry.is_private,
 624                }
 625            } else {
 626                File {
 627                    disk_state: DiskState::Deleted,
 628                    is_local: true,
 629                    entry_id: old_file.entry_id,
 630                    path: old_file.path.clone(),
 631                    worktree: worktree.clone(),
 632                    is_private: old_file.is_private,
 633                }
 634            };
 635
 636            if new_file == *old_file {
 637                return None;
 638            }
 639
 640            let mut events = Vec::new();
 641            if new_file.path != old_file.path {
 642                local.local_buffer_ids_by_path.remove(&ProjectPath {
 643                    path: old_file.path.clone(),
 644                    worktree_id: old_file.worktree_id(cx),
 645                });
 646                local.local_buffer_ids_by_path.insert(
 647                    ProjectPath {
 648                        worktree_id: new_file.worktree_id(cx),
 649                        path: new_file.path.clone(),
 650                    },
 651                    buffer_id,
 652                );
 653                events.push(BufferStoreEvent::BufferChangedFilePath {
 654                    buffer: cx.handle(),
 655                    old_file: buffer.file().cloned(),
 656                });
 657            }
 658
 659            if new_file.entry_id != old_file.entry_id {
 660                if let Some(entry_id) = old_file.entry_id {
 661                    local.local_buffer_ids_by_entry_id.remove(&entry_id);
 662                }
 663                if let Some(entry_id) = new_file.entry_id {
 664                    local
 665                        .local_buffer_ids_by_entry_id
 666                        .insert(entry_id, buffer_id);
 667                }
 668            }
 669
 670            if let Some((client, project_id)) = &this.downstream_client {
 671                client
 672                    .send(proto::UpdateBufferFile {
 673                        project_id: *project_id,
 674                        buffer_id: buffer_id.to_proto(),
 675                        file: Some(new_file.to_proto(cx)),
 676                    })
 677                    .ok();
 678            }
 679
 680            buffer.file_updated(Arc::new(new_file), cx);
 681            Some(events)
 682        })?;
 683
 684        for event in events {
 685            cx.emit(event);
 686        }
 687
 688        None
 689    }
 690
 691    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
 692        let file = File::from_dyn(buffer.read(cx).file())?;
 693
 694        let remote_id = buffer.read(cx).remote_id();
 695        if let Some(entry_id) = file.entry_id {
 696            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 697                Some(_) => {
 698                    return None;
 699                }
 700                None => {
 701                    self.local_buffer_ids_by_entry_id
 702                        .insert(entry_id, remote_id);
 703                }
 704            }
 705        };
 706        self.local_buffer_ids_by_path.insert(
 707            ProjectPath {
 708                worktree_id: file.worktree_id(cx),
 709                path: file.path.clone(),
 710            },
 711            remote_id,
 712        );
 713
 714        Some(())
 715    }
 716
 717    fn save_buffer(
 718        &self,
 719        buffer: Model<Buffer>,
 720        cx: &mut ModelContext<BufferStore>,
 721    ) -> Task<Result<()>> {
 722        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 723            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 724        };
 725        let worktree = file.worktree.clone();
 726        self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
 727    }
 728
 729    fn save_buffer_as(
 730        &self,
 731        buffer: Model<Buffer>,
 732        path: ProjectPath,
 733        cx: &mut ModelContext<BufferStore>,
 734    ) -> Task<Result<()>> {
 735        let Some(worktree) = self
 736            .worktree_store
 737            .read(cx)
 738            .worktree_for_id(path.worktree_id, cx)
 739        else {
 740            return Task::ready(Err(anyhow!("no such worktree")));
 741        };
 742        self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
 743    }
 744
 745    fn open_buffer(
 746        &self,
 747        path: Arc<Path>,
 748        worktree: Model<Worktree>,
 749        cx: &mut ModelContext<BufferStore>,
 750    ) -> Task<Result<Model<Buffer>>> {
 751        let load_buffer = worktree.update(cx, |worktree, cx| {
 752            let load_file = worktree.load_file(path.as_ref(), cx);
 753            let reservation = cx.reserve_model();
 754            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 755            cx.spawn(move |_, mut cx| async move {
 756                let loaded = load_file.await?;
 757                let text_buffer = cx
 758                    .background_executor()
 759                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 760                    .await;
 761                cx.insert_model(reservation, |_| {
 762                    Buffer::build(
 763                        text_buffer,
 764                        loaded.diff_base,
 765                        Some(loaded.file),
 766                        Capability::ReadWrite,
 767                    )
 768                })
 769            })
 770        });
 771
 772        cx.spawn(move |this, mut cx| async move {
 773            let buffer = match load_buffer.await {
 774                Ok(buffer) => Ok(buffer),
 775                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 776                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 777                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 778                    Buffer::build(
 779                        text_buffer,
 780                        None,
 781                        Some(Arc::new(File {
 782                            worktree,
 783                            path,
 784                            disk_state: DiskState::New,
 785                            entry_id: None,
 786                            is_local: true,
 787                            is_private: false,
 788                        })),
 789                        Capability::ReadWrite,
 790                    )
 791                }),
 792                Err(e) => Err(e),
 793            }?;
 794            this.update(&mut cx, |this, cx| {
 795                this.add_buffer(buffer.clone(), cx)?;
 796                let buffer_id = buffer.read(cx).remote_id();
 797                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 798                    let this = this.as_local_mut().unwrap();
 799                    this.local_buffer_ids_by_path.insert(
 800                        ProjectPath {
 801                            worktree_id: file.worktree_id(cx),
 802                            path: file.path.clone(),
 803                        },
 804                        buffer_id,
 805                    );
 806
 807                    if let Some(entry_id) = file.entry_id {
 808                        this.local_buffer_ids_by_entry_id
 809                            .insert(entry_id, buffer_id);
 810                    }
 811                }
 812
 813                anyhow::Ok(())
 814            })??;
 815
 816            Ok(buffer)
 817        })
 818    }
 819
 820    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
 821        cx.spawn(|buffer_store, mut cx| async move {
 822            let buffer = cx.new_model(|cx| {
 823                Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
 824            })?;
 825            buffer_store.update(&mut cx, |buffer_store, cx| {
 826                buffer_store.add_buffer(buffer.clone(), cx).log_err();
 827            })?;
 828            Ok(buffer)
 829        })
 830    }
 831
 832    fn reload_buffers(
 833        &self,
 834        buffers: HashSet<Model<Buffer>>,
 835        push_to_history: bool,
 836        cx: &mut ModelContext<BufferStore>,
 837    ) -> Task<Result<ProjectTransaction>> {
 838        cx.spawn(move |_, mut cx| async move {
 839            let mut project_transaction = ProjectTransaction::default();
 840            for buffer in buffers {
 841                let transaction = buffer
 842                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
 843                    .await?;
 844                buffer.update(&mut cx, |buffer, cx| {
 845                    if let Some(transaction) = transaction {
 846                        if !push_to_history {
 847                            buffer.forget_transaction(transaction.id);
 848                        }
 849                        project_transaction.0.insert(cx.handle(), transaction);
 850                    }
 851                })?;
 852            }
 853
 854            Ok(project_transaction)
 855        })
 856    }
 857}
 858
 859impl BufferStore {
 860    pub fn init(client: &AnyProtoClient) {
 861        client.add_model_message_handler(Self::handle_buffer_reloaded);
 862        client.add_model_message_handler(Self::handle_buffer_saved);
 863        client.add_model_message_handler(Self::handle_update_buffer_file);
 864        client.add_model_message_handler(Self::handle_update_diff_base);
 865        client.add_model_request_handler(Self::handle_save_buffer);
 866        client.add_model_request_handler(Self::handle_blame_buffer);
 867        client.add_model_request_handler(Self::handle_reload_buffers);
 868        client.add_model_request_handler(Self::handle_get_permalink_to_line);
 869    }
 870
 871    /// Creates a buffer store, optionally retaining its buffers.
 872    pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
 873        Self {
 874            state: BufferStoreState::Local(LocalBufferStore {
 875                local_buffer_ids_by_path: Default::default(),
 876                local_buffer_ids_by_entry_id: Default::default(),
 877                worktree_store: worktree_store.clone(),
 878                _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
 879                    if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 880                        let this = this.as_local_mut().unwrap();
 881                        this.subscribe_to_worktree(worktree, cx);
 882                    }
 883                }),
 884            }),
 885            downstream_client: None,
 886            opened_buffers: Default::default(),
 887            shared_buffers: Default::default(),
 888            loading_buffers_by_path: Default::default(),
 889            worktree_store,
 890        }
 891    }
 892
 893    pub fn remote(
 894        worktree_store: Model<WorktreeStore>,
 895        upstream_client: AnyProtoClient,
 896        remote_id: u64,
 897        _cx: &mut ModelContext<Self>,
 898    ) -> Self {
 899        Self {
 900            state: BufferStoreState::Remote(RemoteBufferStore {
 901                shared_with_me: Default::default(),
 902                loading_remote_buffers_by_id: Default::default(),
 903                remote_buffer_listeners: Default::default(),
 904                project_id: remote_id,
 905                upstream_client,
 906                worktree_store: worktree_store.clone(),
 907            }),
 908            downstream_client: None,
 909            opened_buffers: Default::default(),
 910            loading_buffers_by_path: Default::default(),
 911            shared_buffers: Default::default(),
 912            worktree_store,
 913        }
 914    }
 915
 916    fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
 917        match &mut self.state {
 918            BufferStoreState::Local(state) => Some(state),
 919            _ => None,
 920        }
 921    }
 922
 923    fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
 924        match &mut self.state {
 925            BufferStoreState::Remote(state) => Some(state),
 926            _ => None,
 927        }
 928    }
 929
 930    fn as_remote(&self) -> Option<&RemoteBufferStore> {
 931        match &self.state {
 932            BufferStoreState::Remote(state) => Some(state),
 933            _ => None,
 934        }
 935    }
 936
 937    pub fn open_buffer(
 938        &mut self,
 939        project_path: ProjectPath,
 940        cx: &mut ModelContext<Self>,
 941    ) -> Task<Result<Model<Buffer>>> {
 942        let existing_buffer = self.get_by_path(&project_path, cx);
 943        if let Some(existing_buffer) = existing_buffer {
 944            return Task::ready(Ok(existing_buffer));
 945        }
 946
 947        let Some(worktree) = self
 948            .worktree_store
 949            .read(cx)
 950            .worktree_for_id(project_path.worktree_id, cx)
 951        else {
 952            return Task::ready(Err(anyhow!("no such worktree")));
 953        };
 954
 955        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
 956            // If the given path is already being loaded, then wait for that existing
 957            // task to complete and return the same buffer.
 958            hash_map::Entry::Occupied(e) => e.get().clone(),
 959
 960            // Otherwise, record the fact that this path is now being loaded.
 961            hash_map::Entry::Vacant(entry) => {
 962                let (mut tx, rx) = postage::watch::channel();
 963                entry.insert(rx.clone());
 964
 965                let path = project_path.path.clone();
 966                let load_buffer = match &self.state {
 967                    BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
 968                    BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
 969                };
 970
 971                cx.spawn(move |this, mut cx| async move {
 972                    let load_result = load_buffer.await;
 973                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
 974                        // Record the fact that the buffer is no longer loading.
 975                        this.loading_buffers_by_path.remove(&project_path);
 976                        let buffer = load_result.map_err(Arc::new)?;
 977                        Ok(buffer)
 978                    })?);
 979                    anyhow::Ok(())
 980                })
 981                .detach();
 982                rx
 983            }
 984        };
 985
 986        cx.background_executor().spawn(async move {
 987            Self::wait_for_loading_buffer(loading_watch)
 988                .await
 989                .map_err(|e| e.cloned())
 990        })
 991    }
 992
 993    pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
 994        match &self.state {
 995            BufferStoreState::Local(this) => this.create_buffer(cx),
 996            BufferStoreState::Remote(this) => this.create_buffer(cx),
 997        }
 998    }
 999
1000    pub fn save_buffer(
1001        &mut self,
1002        buffer: Model<Buffer>,
1003        cx: &mut ModelContext<Self>,
1004    ) -> Task<Result<()>> {
1005        match &mut self.state {
1006            BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1007            BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1008        }
1009    }
1010
1011    pub fn save_buffer_as(
1012        &mut self,
1013        buffer: Model<Buffer>,
1014        path: ProjectPath,
1015        cx: &mut ModelContext<Self>,
1016    ) -> Task<Result<()>> {
1017        let old_file = buffer.read(cx).file().cloned();
1018        let task = match &self.state {
1019            BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1020            BufferStoreState::Remote(this) => {
1021                this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1022            }
1023        };
1024        cx.spawn(|this, mut cx| async move {
1025            task.await?;
1026            this.update(&mut cx, |_, cx| {
1027                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1028            })
1029        })
1030    }
1031
1032    pub fn blame_buffer(
1033        &self,
1034        buffer: &Model<Buffer>,
1035        version: Option<clock::Global>,
1036        cx: &AppContext,
1037    ) -> Task<Result<Option<Blame>>> {
1038        let buffer = buffer.read(cx);
1039        let Some(file) = File::from_dyn(buffer.file()) else {
1040            return Task::ready(Err(anyhow!("buffer has no file")));
1041        };
1042
1043        match file.worktree.clone().read(cx) {
1044            Worktree::Local(worktree) => {
1045                let worktree = worktree.snapshot();
1046                let blame_params = maybe!({
1047                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1048                        Some(repo_for_path) => repo_for_path,
1049                        None => return Ok(None),
1050                    };
1051
1052                    let relative_path = repo_entry
1053                        .relativize(&worktree, &file.path)
1054                        .context("failed to relativize buffer path")?;
1055
1056                    let repo = local_repo_entry.repo().clone();
1057
1058                    let content = match version {
1059                        Some(version) => buffer.rope_for_version(&version).clone(),
1060                        None => buffer.as_rope().clone(),
1061                    };
1062
1063                    anyhow::Ok(Some((repo, relative_path, content)))
1064                });
1065
1066                cx.background_executor().spawn(async move {
1067                    let Some((repo, relative_path, content)) = blame_params? else {
1068                        return Ok(None);
1069                    };
1070                    repo.blame(&relative_path, content)
1071                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1072                        .map(Some)
1073                })
1074            }
1075            Worktree::Remote(worktree) => {
1076                let buffer_id = buffer.remote_id();
1077                let version = buffer.version();
1078                let project_id = worktree.project_id();
1079                let client = worktree.client();
1080                cx.spawn(|_| async move {
1081                    let response = client
1082                        .request(proto::BlameBuffer {
1083                            project_id,
1084                            buffer_id: buffer_id.into(),
1085                            version: serialize_version(&version),
1086                        })
1087                        .await?;
1088                    Ok(deserialize_blame_buffer_response(response))
1089                })
1090            }
1091        }
1092    }
1093
1094    pub fn get_permalink_to_line(
1095        &self,
1096        buffer: &Model<Buffer>,
1097        selection: Range<u32>,
1098        cx: &AppContext,
1099    ) -> Task<Result<url::Url>> {
1100        let buffer = buffer.read(cx);
1101        let Some(file) = File::from_dyn(buffer.file()) else {
1102            return Task::ready(Err(anyhow!("buffer has no file")));
1103        };
1104
1105        match file.worktree.clone().read(cx) {
1106            Worktree::Local(worktree) => {
1107                let Some(repo) = worktree.local_git_repo(file.path()) else {
1108                    return Task::ready(Err(anyhow!("no repository for buffer found")));
1109                };
1110
1111                let path = file.path().clone();
1112
1113                cx.spawn(|cx| async move {
1114                    const REMOTE_NAME: &str = "origin";
1115                    let origin_url = repo
1116                        .remote_url(REMOTE_NAME)
1117                        .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1118
1119                    let sha = repo
1120                        .head_sha()
1121                        .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1122
1123                    let provider_registry =
1124                        cx.update(GitHostingProviderRegistry::default_global)?;
1125
1126                    let (provider, remote) =
1127                        parse_git_remote_url(provider_registry, &origin_url)
1128                            .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1129
1130                    let path = path
1131                        .to_str()
1132                        .context("failed to convert buffer path to string")?;
1133
1134                    Ok(provider.build_permalink(
1135                        remote,
1136                        BuildPermalinkParams {
1137                            sha: &sha,
1138                            path,
1139                            selection: Some(selection),
1140                        },
1141                    ))
1142                })
1143            }
1144            Worktree::Remote(worktree) => {
1145                let buffer_id = buffer.remote_id();
1146                let project_id = worktree.project_id();
1147                let client = worktree.client();
1148                cx.spawn(|_| async move {
1149                    let response = client
1150                        .request(proto::GetPermalinkToLine {
1151                            project_id,
1152                            buffer_id: buffer_id.into(),
1153                            selection: Some(proto::Range {
1154                                start: selection.start as u64,
1155                                end: selection.end as u64,
1156                            }),
1157                        })
1158                        .await?;
1159
1160                    url::Url::parse(&response.permalink).context("failed to parse permalink")
1161                })
1162            }
1163        }
1164    }
1165
1166    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1167        let remote_id = buffer.read(cx).remote_id();
1168        let is_remote = buffer.read(cx).replica_id() != 0;
1169        let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
1170
1171        let handle = cx.handle().downgrade();
1172        buffer.update(cx, move |_, cx| {
1173            cx.on_release(move |buffer, cx| {
1174                handle
1175                    .update(cx, |_, cx| {
1176                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1177                    })
1178                    .ok();
1179            })
1180            .detach()
1181        });
1182
1183        match self.opened_buffers.entry(remote_id) {
1184            hash_map::Entry::Vacant(entry) => {
1185                entry.insert(open_buffer);
1186            }
1187            hash_map::Entry::Occupied(mut entry) => {
1188                if let OpenBuffer::Operations(operations) = entry.get_mut() {
1189                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1190                } else if entry.get().upgrade().is_some() {
1191                    if is_remote {
1192                        return Ok(());
1193                    } else {
1194                        debug_panic!("buffer {} was already registered", remote_id);
1195                        Err(anyhow!("buffer {} was already registered", remote_id))?;
1196                    }
1197                }
1198                entry.insert(open_buffer);
1199            }
1200        }
1201
1202        cx.subscribe(&buffer, Self::on_buffer_event).detach();
1203        cx.emit(BufferStoreEvent::BufferAdded(buffer));
1204        Ok(())
1205    }
1206
1207    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1208        self.opened_buffers
1209            .values()
1210            .filter_map(|buffer| buffer.upgrade())
1211    }
1212
1213    pub fn loading_buffers(
1214        &self,
1215    ) -> impl Iterator<
1216        Item = (
1217            &ProjectPath,
1218            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1219        ),
1220    > {
1221        self.loading_buffers_by_path
1222            .iter()
1223            .map(|(path, rx)| (path, rx.clone()))
1224    }
1225
1226    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1227        self.buffers().find_map(|buffer| {
1228            let file = File::from_dyn(buffer.read(cx).file())?;
1229            if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1230                Some(buffer)
1231            } else {
1232                None
1233            }
1234        })
1235    }
1236
1237    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1238        self.opened_buffers
1239            .get(&buffer_id)
1240            .and_then(|buffer| buffer.upgrade())
1241    }
1242
1243    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1244        self.get(buffer_id)
1245            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1246    }
1247
1248    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1249        self.get(buffer_id).or_else(|| {
1250            self.as_remote()
1251                .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1252        })
1253    }
1254
1255    pub fn buffer_version_info(
1256        &self,
1257        cx: &AppContext,
1258    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1259        let buffers = self
1260            .buffers()
1261            .map(|buffer| {
1262                let buffer = buffer.read(cx);
1263                proto::BufferVersion {
1264                    id: buffer.remote_id().into(),
1265                    version: language::proto::serialize_version(&buffer.version),
1266                }
1267            })
1268            .collect();
1269        let incomplete_buffer_ids = self
1270            .as_remote()
1271            .map(|remote| remote.incomplete_buffer_ids())
1272            .unwrap_or_default();
1273        (buffers, incomplete_buffer_ids)
1274    }
1275
1276    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1277        for open_buffer in self.opened_buffers.values_mut() {
1278            if let Some(buffer) = open_buffer.upgrade() {
1279                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1280            }
1281        }
1282
1283        for buffer in self.buffers() {
1284            buffer.update(cx, |buffer, cx| {
1285                buffer.set_capability(Capability::ReadOnly, cx)
1286            });
1287        }
1288
1289        if let Some(remote) = self.as_remote_mut() {
1290            // Wake up all futures currently waiting on a buffer to get opened,
1291            // to give them a chance to fail now that we've disconnected.
1292            remote.remote_buffer_listeners.clear()
1293        }
1294    }
1295
1296    pub fn shared(
1297        &mut self,
1298        remote_id: u64,
1299        downstream_client: AnyProtoClient,
1300        _cx: &mut AppContext,
1301    ) {
1302        self.downstream_client = Some((downstream_client, remote_id));
1303    }
1304
1305    pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1306        self.downstream_client.take();
1307        self.forget_shared_buffers();
1308    }
1309
1310    pub fn discard_incomplete(&mut self) {
1311        self.opened_buffers
1312            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1313    }
1314
1315    pub fn find_search_candidates(
1316        &mut self,
1317        query: &SearchQuery,
1318        mut limit: usize,
1319        fs: Arc<dyn Fs>,
1320        cx: &mut ModelContext<Self>,
1321    ) -> Receiver<Model<Buffer>> {
1322        let (tx, rx) = smol::channel::unbounded();
1323        let mut open_buffers = HashSet::default();
1324        let mut unnamed_buffers = Vec::new();
1325        for handle in self.buffers() {
1326            let buffer = handle.read(cx);
1327            if let Some(entry_id) = buffer.entry_id(cx) {
1328                open_buffers.insert(entry_id);
1329            } else {
1330                limit = limit.saturating_sub(1);
1331                unnamed_buffers.push(handle)
1332            };
1333        }
1334
1335        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1336        let mut project_paths_rx = self
1337            .worktree_store
1338            .update(cx, |worktree_store, cx| {
1339                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1340            })
1341            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1342
1343        cx.spawn(|this, mut cx| async move {
1344            for buffer in unnamed_buffers {
1345                tx.send(buffer).await.ok();
1346            }
1347
1348            while let Some(project_paths) = project_paths_rx.next().await {
1349                let buffers = this.update(&mut cx, |this, cx| {
1350                    project_paths
1351                        .into_iter()
1352                        .map(|project_path| this.open_buffer(project_path, cx))
1353                        .collect::<Vec<_>>()
1354                })?;
1355                for buffer_task in buffers {
1356                    if let Some(buffer) = buffer_task.await.log_err() {
1357                        if tx.send(buffer).await.is_err() {
1358                            return anyhow::Ok(());
1359                        }
1360                    }
1361                }
1362            }
1363            anyhow::Ok(())
1364        })
1365        .detach();
1366        rx
1367    }
1368
1369    fn on_buffer_event(
1370        &mut self,
1371        buffer: Model<Buffer>,
1372        event: &BufferEvent,
1373        cx: &mut ModelContext<Self>,
1374    ) {
1375        match event {
1376            BufferEvent::FileHandleChanged => {
1377                if let Some(local) = self.as_local_mut() {
1378                    local.buffer_changed_file(buffer, cx);
1379                }
1380            }
1381            BufferEvent::Reloaded => {
1382                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1383                    return;
1384                };
1385                let buffer = buffer.read(cx);
1386                downstream_client
1387                    .send(proto::BufferReloaded {
1388                        project_id: *project_id,
1389                        buffer_id: buffer.remote_id().to_proto(),
1390                        version: serialize_version(&buffer.version()),
1391                        mtime: buffer.saved_mtime().map(|t| t.into()),
1392                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1393                    })
1394                    .log_err();
1395            }
1396            _ => {}
1397        }
1398    }
1399
1400    pub async fn handle_update_buffer(
1401        this: Model<Self>,
1402        envelope: TypedEnvelope<proto::UpdateBuffer>,
1403        mut cx: AsyncAppContext,
1404    ) -> Result<proto::Ack> {
1405        let payload = envelope.payload.clone();
1406        let buffer_id = BufferId::new(payload.buffer_id)?;
1407        let ops = payload
1408            .operations
1409            .into_iter()
1410            .map(language::proto::deserialize_operation)
1411            .collect::<Result<Vec<_>, _>>()?;
1412        this.update(&mut cx, |this, cx| {
1413            match this.opened_buffers.entry(buffer_id) {
1414                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1415                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1416                    OpenBuffer::Buffer(buffer) => {
1417                        if let Some(buffer) = buffer.upgrade() {
1418                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1419                        }
1420                    }
1421                },
1422                hash_map::Entry::Vacant(e) => {
1423                    e.insert(OpenBuffer::Operations(ops));
1424                }
1425            }
1426            Ok(proto::Ack {})
1427        })?
1428    }
1429
1430    pub fn handle_synchronize_buffers(
1431        &mut self,
1432        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1433        cx: &mut ModelContext<Self>,
1434        client: Arc<Client>,
1435    ) -> Result<proto::SynchronizeBuffersResponse> {
1436        let project_id = envelope.payload.project_id;
1437        let mut response = proto::SynchronizeBuffersResponse {
1438            buffers: Default::default(),
1439        };
1440        let Some(guest_id) = envelope.original_sender_id else {
1441            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1442        };
1443
1444        self.shared_buffers.entry(guest_id).or_default().clear();
1445        for buffer in envelope.payload.buffers {
1446            let buffer_id = BufferId::new(buffer.id)?;
1447            let remote_version = language::proto::deserialize_version(&buffer.version);
1448            if let Some(buffer) = self.get(buffer_id) {
1449                self.shared_buffers
1450                    .entry(guest_id)
1451                    .or_default()
1452                    .insert(buffer.clone());
1453
1454                let buffer = buffer.read(cx);
1455                response.buffers.push(proto::BufferVersion {
1456                    id: buffer_id.into(),
1457                    version: language::proto::serialize_version(&buffer.version),
1458                });
1459
1460                let operations = buffer.serialize_ops(Some(remote_version), cx);
1461                let client = client.clone();
1462                if let Some(file) = buffer.file() {
1463                    client
1464                        .send(proto::UpdateBufferFile {
1465                            project_id,
1466                            buffer_id: buffer_id.into(),
1467                            file: Some(file.to_proto(cx)),
1468                        })
1469                        .log_err();
1470                }
1471
1472                client
1473                    .send(proto::UpdateDiffBase {
1474                        project_id,
1475                        buffer_id: buffer_id.into(),
1476                        diff_base: buffer.diff_base().map(ToString::to_string),
1477                    })
1478                    .log_err();
1479
1480                client
1481                    .send(proto::BufferReloaded {
1482                        project_id,
1483                        buffer_id: buffer_id.into(),
1484                        version: language::proto::serialize_version(buffer.saved_version()),
1485                        mtime: buffer.saved_mtime().map(|time| time.into()),
1486                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1487                            as i32,
1488                    })
1489                    .log_err();
1490
1491                cx.background_executor()
1492                    .spawn(
1493                        async move {
1494                            let operations = operations.await;
1495                            for chunk in split_operations(operations) {
1496                                client
1497                                    .request(proto::UpdateBuffer {
1498                                        project_id,
1499                                        buffer_id: buffer_id.into(),
1500                                        operations: chunk,
1501                                    })
1502                                    .await?;
1503                            }
1504                            anyhow::Ok(())
1505                        }
1506                        .log_err(),
1507                    )
1508                    .detach();
1509            }
1510        }
1511        Ok(response)
1512    }
1513
1514    pub fn handle_create_buffer_for_peer(
1515        &mut self,
1516        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1517        replica_id: u16,
1518        capability: Capability,
1519        cx: &mut ModelContext<Self>,
1520    ) -> Result<()> {
1521        let Some(remote) = self.as_remote_mut() else {
1522            return Err(anyhow!("buffer store is not a remote"));
1523        };
1524
1525        if let Some(buffer) =
1526            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1527        {
1528            self.add_buffer(buffer, cx)?;
1529        }
1530
1531        Ok(())
1532    }
1533
1534    pub async fn handle_update_buffer_file(
1535        this: Model<Self>,
1536        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1537        mut cx: AsyncAppContext,
1538    ) -> Result<()> {
1539        let buffer_id = envelope.payload.buffer_id;
1540        let buffer_id = BufferId::new(buffer_id)?;
1541
1542        this.update(&mut cx, |this, cx| {
1543            let payload = envelope.payload.clone();
1544            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1545                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1546                let worktree = this
1547                    .worktree_store
1548                    .read(cx)
1549                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1550                    .ok_or_else(|| anyhow!("no such worktree"))?;
1551                let file = File::from_proto(file, worktree, cx)?;
1552                let old_file = buffer.update(cx, |buffer, cx| {
1553                    let old_file = buffer.file().cloned();
1554                    let new_path = file.path.clone();
1555                    buffer.file_updated(Arc::new(file), cx);
1556                    if old_file
1557                        .as_ref()
1558                        .map_or(true, |old| *old.path() != new_path)
1559                    {
1560                        Some(old_file)
1561                    } else {
1562                        None
1563                    }
1564                });
1565                if let Some(old_file) = old_file {
1566                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1567                }
1568            }
1569            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1570                downstream_client
1571                    .send(proto::UpdateBufferFile {
1572                        project_id: *project_id,
1573                        buffer_id: buffer_id.into(),
1574                        file: envelope.payload.file,
1575                    })
1576                    .log_err();
1577            }
1578            Ok(())
1579        })?
1580    }
1581
1582    pub async fn handle_update_diff_base(
1583        this: Model<Self>,
1584        envelope: TypedEnvelope<proto::UpdateDiffBase>,
1585        mut cx: AsyncAppContext,
1586    ) -> Result<()> {
1587        this.update(&mut cx, |this, cx| {
1588            let buffer_id = envelope.payload.buffer_id;
1589            let buffer_id = BufferId::new(buffer_id)?;
1590            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1591                buffer.update(cx, |buffer, cx| {
1592                    buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
1593                });
1594            }
1595            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1596                downstream_client
1597                    .send(proto::UpdateDiffBase {
1598                        project_id: *project_id,
1599                        buffer_id: buffer_id.into(),
1600                        diff_base: envelope.payload.diff_base,
1601                    })
1602                    .log_err();
1603            }
1604            Ok(())
1605        })?
1606    }
1607
1608    pub async fn handle_save_buffer(
1609        this: Model<Self>,
1610        envelope: TypedEnvelope<proto::SaveBuffer>,
1611        mut cx: AsyncAppContext,
1612    ) -> Result<proto::BufferSaved> {
1613        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1614        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1615            anyhow::Ok((
1616                this.get_existing(buffer_id)?,
1617                this.downstream_client
1618                    .as_ref()
1619                    .map(|(_, project_id)| *project_id)
1620                    .context("project is not shared")?,
1621            ))
1622        })??;
1623        buffer
1624            .update(&mut cx, |buffer, _| {
1625                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1626            })?
1627            .await?;
1628        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1629
1630        if let Some(new_path) = envelope.payload.new_path {
1631            let new_path = ProjectPath::from_proto(new_path);
1632            this.update(&mut cx, |this, cx| {
1633                this.save_buffer_as(buffer.clone(), new_path, cx)
1634            })?
1635            .await?;
1636        } else {
1637            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1638                .await?;
1639        }
1640
1641        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1642            project_id,
1643            buffer_id: buffer_id.into(),
1644            version: serialize_version(buffer.saved_version()),
1645            mtime: buffer.saved_mtime().map(|time| time.into()),
1646        })
1647    }
1648
1649    pub async fn handle_close_buffer(
1650        this: Model<Self>,
1651        envelope: TypedEnvelope<proto::CloseBuffer>,
1652        mut cx: AsyncAppContext,
1653    ) -> Result<()> {
1654        let peer_id = envelope.sender_id;
1655        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1656        this.update(&mut cx, |this, _| {
1657            if let Some(buffer) = this.get(buffer_id) {
1658                if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1659                    if shared.remove(&buffer) {
1660                        if shared.is_empty() {
1661                            this.shared_buffers.remove(&peer_id);
1662                        }
1663                        return;
1664                    }
1665                }
1666            };
1667            debug_panic!(
1668                "peer_id {} closed buffer_id {} which was either not open or already closed",
1669                peer_id,
1670                buffer_id
1671            )
1672        })
1673    }
1674
1675    pub async fn handle_buffer_saved(
1676        this: Model<Self>,
1677        envelope: TypedEnvelope<proto::BufferSaved>,
1678        mut cx: AsyncAppContext,
1679    ) -> Result<()> {
1680        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1681        let version = deserialize_version(&envelope.payload.version);
1682        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1683        this.update(&mut cx, move |this, cx| {
1684            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1685                buffer.update(cx, |buffer, cx| {
1686                    buffer.did_save(version, mtime, cx);
1687                });
1688            }
1689
1690            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1691                downstream_client
1692                    .send(proto::BufferSaved {
1693                        project_id: *project_id,
1694                        buffer_id: buffer_id.into(),
1695                        mtime: envelope.payload.mtime,
1696                        version: envelope.payload.version,
1697                    })
1698                    .log_err();
1699            }
1700        })
1701    }
1702
1703    pub async fn handle_buffer_reloaded(
1704        this: Model<Self>,
1705        envelope: TypedEnvelope<proto::BufferReloaded>,
1706        mut cx: AsyncAppContext,
1707    ) -> Result<()> {
1708        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1709        let version = deserialize_version(&envelope.payload.version);
1710        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1711        let line_ending = deserialize_line_ending(
1712            proto::LineEnding::from_i32(envelope.payload.line_ending)
1713                .ok_or_else(|| anyhow!("missing line ending"))?,
1714        );
1715        this.update(&mut cx, |this, cx| {
1716            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1717                buffer.update(cx, |buffer, cx| {
1718                    buffer.did_reload(version, line_ending, mtime, cx);
1719                });
1720            }
1721
1722            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1723                downstream_client
1724                    .send(proto::BufferReloaded {
1725                        project_id: *project_id,
1726                        buffer_id: buffer_id.into(),
1727                        mtime: envelope.payload.mtime,
1728                        version: envelope.payload.version,
1729                        line_ending: envelope.payload.line_ending,
1730                    })
1731                    .log_err();
1732            }
1733        })
1734    }
1735
1736    pub async fn handle_blame_buffer(
1737        this: Model<Self>,
1738        envelope: TypedEnvelope<proto::BlameBuffer>,
1739        mut cx: AsyncAppContext,
1740    ) -> Result<proto::BlameBufferResponse> {
1741        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1742        let version = deserialize_version(&envelope.payload.version);
1743        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1744        buffer
1745            .update(&mut cx, |buffer, _| {
1746                buffer.wait_for_version(version.clone())
1747            })?
1748            .await?;
1749        let blame = this
1750            .update(&mut cx, |this, cx| {
1751                this.blame_buffer(&buffer, Some(version), cx)
1752            })?
1753            .await?;
1754        Ok(serialize_blame_buffer_response(blame))
1755    }
1756
1757    pub async fn handle_get_permalink_to_line(
1758        this: Model<Self>,
1759        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1760        mut cx: AsyncAppContext,
1761    ) -> Result<proto::GetPermalinkToLineResponse> {
1762        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1763        // let version = deserialize_version(&envelope.payload.version);
1764        let selection = {
1765            let proto_selection = envelope
1766                .payload
1767                .selection
1768                .context("no selection to get permalink for defined")?;
1769            proto_selection.start as u32..proto_selection.end as u32
1770        };
1771        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1772        let permalink = this
1773            .update(&mut cx, |this, cx| {
1774                this.get_permalink_to_line(&buffer, selection, cx)
1775            })?
1776            .await?;
1777        Ok(proto::GetPermalinkToLineResponse {
1778            permalink: permalink.to_string(),
1779        })
1780    }
1781
1782    pub async fn wait_for_loading_buffer(
1783        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1784    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1785        loop {
1786            if let Some(result) = receiver.borrow().as_ref() {
1787                match result {
1788                    Ok(buffer) => return Ok(buffer.to_owned()),
1789                    Err(e) => return Err(e.to_owned()),
1790                }
1791            }
1792            receiver.next().await;
1793        }
1794    }
1795
1796    pub fn reload_buffers(
1797        &self,
1798        buffers: HashSet<Model<Buffer>>,
1799        push_to_history: bool,
1800        cx: &mut ModelContext<Self>,
1801    ) -> Task<Result<ProjectTransaction>> {
1802        if buffers.is_empty() {
1803            return Task::ready(Ok(ProjectTransaction::default()));
1804        }
1805        match &self.state {
1806            BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1807            BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1808        }
1809    }
1810
1811    async fn handle_reload_buffers(
1812        this: Model<Self>,
1813        envelope: TypedEnvelope<proto::ReloadBuffers>,
1814        mut cx: AsyncAppContext,
1815    ) -> Result<proto::ReloadBuffersResponse> {
1816        let sender_id = envelope.original_sender_id().unwrap_or_default();
1817        let reload = this.update(&mut cx, |this, cx| {
1818            let mut buffers = HashSet::default();
1819            for buffer_id in &envelope.payload.buffer_ids {
1820                let buffer_id = BufferId::new(*buffer_id)?;
1821                buffers.insert(this.get_existing(buffer_id)?);
1822            }
1823            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1824        })??;
1825
1826        let project_transaction = reload.await?;
1827        let project_transaction = this.update(&mut cx, |this, cx| {
1828            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1829        })?;
1830        Ok(proto::ReloadBuffersResponse {
1831            transaction: Some(project_transaction),
1832        })
1833    }
1834
1835    pub fn create_buffer_for_peer(
1836        &mut self,
1837        buffer: &Model<Buffer>,
1838        peer_id: proto::PeerId,
1839        cx: &mut ModelContext<Self>,
1840    ) -> Task<Result<()>> {
1841        let buffer_id = buffer.read(cx).remote_id();
1842        if !self
1843            .shared_buffers
1844            .entry(peer_id)
1845            .or_default()
1846            .insert(buffer.clone())
1847        {
1848            return Task::ready(Ok(()));
1849        }
1850
1851        let Some((client, project_id)) = self.downstream_client.clone() else {
1852            return Task::ready(Ok(()));
1853        };
1854
1855        cx.spawn(|this, mut cx| async move {
1856            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1857                return anyhow::Ok(());
1858            };
1859
1860            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1861            let operations = operations.await;
1862            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1863
1864            let initial_state = proto::CreateBufferForPeer {
1865                project_id,
1866                peer_id: Some(peer_id),
1867                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1868            };
1869
1870            if client.send(initial_state).log_err().is_some() {
1871                let client = client.clone();
1872                cx.background_executor()
1873                    .spawn(async move {
1874                        let mut chunks = split_operations(operations).peekable();
1875                        while let Some(chunk) = chunks.next() {
1876                            let is_last = chunks.peek().is_none();
1877                            client.send(proto::CreateBufferForPeer {
1878                                project_id,
1879                                peer_id: Some(peer_id),
1880                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1881                                    proto::BufferChunk {
1882                                        buffer_id: buffer_id.into(),
1883                                        operations: chunk,
1884                                        is_last,
1885                                    },
1886                                )),
1887                            })?;
1888                        }
1889                        anyhow::Ok(())
1890                    })
1891                    .await
1892                    .log_err();
1893            }
1894            Ok(())
1895        })
1896    }
1897
1898    pub fn forget_shared_buffers(&mut self) {
1899        self.shared_buffers.clear();
1900    }
1901
1902    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1903        self.shared_buffers.remove(peer_id);
1904    }
1905
1906    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1907        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1908            self.shared_buffers.insert(new_peer_id, buffers);
1909        }
1910    }
1911
1912    pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
1913        &self.shared_buffers
1914    }
1915
1916    pub fn create_local_buffer(
1917        &mut self,
1918        text: &str,
1919        language: Option<Arc<Language>>,
1920        cx: &mut ModelContext<Self>,
1921    ) -> Model<Buffer> {
1922        let buffer = cx.new_model(|cx| {
1923            Buffer::local(text, cx)
1924                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1925        });
1926
1927        self.add_buffer(buffer.clone(), cx).log_err();
1928        let buffer_id = buffer.read(cx).remote_id();
1929
1930        let this = self
1931            .as_local_mut()
1932            .expect("local-only method called in a non-local context");
1933        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1934            this.local_buffer_ids_by_path.insert(
1935                ProjectPath {
1936                    worktree_id: file.worktree_id(cx),
1937                    path: file.path.clone(),
1938                },
1939                buffer_id,
1940            );
1941
1942            if let Some(entry_id) = file.entry_id {
1943                this.local_buffer_ids_by_entry_id
1944                    .insert(entry_id, buffer_id);
1945            }
1946        }
1947        buffer
1948    }
1949
1950    pub fn deserialize_project_transaction(
1951        &mut self,
1952        message: proto::ProjectTransaction,
1953        push_to_history: bool,
1954        cx: &mut ModelContext<Self>,
1955    ) -> Task<Result<ProjectTransaction>> {
1956        if let Some(this) = self.as_remote_mut() {
1957            this.deserialize_project_transaction(message, push_to_history, cx)
1958        } else {
1959            debug_panic!("not a remote buffer store");
1960            Task::ready(Err(anyhow!("not a remote buffer store")))
1961        }
1962    }
1963
1964    pub fn wait_for_remote_buffer(
1965        &mut self,
1966        id: BufferId,
1967        cx: &mut ModelContext<BufferStore>,
1968    ) -> Task<Result<Model<Buffer>>> {
1969        if let Some(this) = self.as_remote_mut() {
1970            this.wait_for_remote_buffer(id, cx)
1971        } else {
1972            debug_panic!("not a remote buffer store");
1973            Task::ready(Err(anyhow!("not a remote buffer store")))
1974        }
1975    }
1976
1977    pub fn serialize_project_transaction_for_peer(
1978        &mut self,
1979        project_transaction: ProjectTransaction,
1980        peer_id: proto::PeerId,
1981        cx: &mut ModelContext<Self>,
1982    ) -> proto::ProjectTransaction {
1983        let mut serialized_transaction = proto::ProjectTransaction {
1984            buffer_ids: Default::default(),
1985            transactions: Default::default(),
1986        };
1987        for (buffer, transaction) in project_transaction.0 {
1988            self.create_buffer_for_peer(&buffer, peer_id, cx)
1989                .detach_and_log_err(cx);
1990            serialized_transaction
1991                .buffer_ids
1992                .push(buffer.read(cx).remote_id().into());
1993            serialized_transaction
1994                .transactions
1995                .push(language::proto::serialize_transaction(&transaction));
1996        }
1997        serialized_transaction
1998    }
1999}
2000
2001impl OpenBuffer {
2002    fn upgrade(&self) -> Option<Model<Buffer>> {
2003        match self {
2004            OpenBuffer::Buffer(handle) => handle.upgrade(),
2005            OpenBuffer::Operations(_) => None,
2006        }
2007    }
2008}
2009
2010fn is_not_found_error(error: &anyhow::Error) -> bool {
2011    error
2012        .root_cause()
2013        .downcast_ref::<io::Error>()
2014        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2015}
2016
2017fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2018    let Some(blame) = blame else {
2019        return proto::BlameBufferResponse {
2020            blame_response: None,
2021        };
2022    };
2023
2024    let entries = blame
2025        .entries
2026        .into_iter()
2027        .map(|entry| proto::BlameEntry {
2028            sha: entry.sha.as_bytes().into(),
2029            start_line: entry.range.start,
2030            end_line: entry.range.end,
2031            original_line_number: entry.original_line_number,
2032            author: entry.author.clone(),
2033            author_mail: entry.author_mail.clone(),
2034            author_time: entry.author_time,
2035            author_tz: entry.author_tz.clone(),
2036            committer: entry.committer.clone(),
2037            committer_mail: entry.committer_mail.clone(),
2038            committer_time: entry.committer_time,
2039            committer_tz: entry.committer_tz.clone(),
2040            summary: entry.summary.clone(),
2041            previous: entry.previous.clone(),
2042            filename: entry.filename.clone(),
2043        })
2044        .collect::<Vec<_>>();
2045
2046    let messages = blame
2047        .messages
2048        .into_iter()
2049        .map(|(oid, message)| proto::CommitMessage {
2050            oid: oid.as_bytes().into(),
2051            message,
2052        })
2053        .collect::<Vec<_>>();
2054
2055    let permalinks = blame
2056        .permalinks
2057        .into_iter()
2058        .map(|(oid, url)| proto::CommitPermalink {
2059            oid: oid.as_bytes().into(),
2060            permalink: url.to_string(),
2061        })
2062        .collect::<Vec<_>>();
2063
2064    proto::BlameBufferResponse {
2065        blame_response: Some(proto::blame_buffer_response::BlameResponse {
2066            entries,
2067            messages,
2068            permalinks,
2069            remote_url: blame.remote_url,
2070        }),
2071    }
2072}
2073
2074fn deserialize_blame_buffer_response(
2075    response: proto::BlameBufferResponse,
2076) -> Option<git::blame::Blame> {
2077    let response = response.blame_response?;
2078    let entries = response
2079        .entries
2080        .into_iter()
2081        .filter_map(|entry| {
2082            Some(git::blame::BlameEntry {
2083                sha: git::Oid::from_bytes(&entry.sha).ok()?,
2084                range: entry.start_line..entry.end_line,
2085                original_line_number: entry.original_line_number,
2086                committer: entry.committer,
2087                committer_time: entry.committer_time,
2088                committer_tz: entry.committer_tz,
2089                committer_mail: entry.committer_mail,
2090                author: entry.author,
2091                author_mail: entry.author_mail,
2092                author_time: entry.author_time,
2093                author_tz: entry.author_tz,
2094                summary: entry.summary,
2095                previous: entry.previous,
2096                filename: entry.filename,
2097            })
2098        })
2099        .collect::<Vec<_>>();
2100
2101    let messages = response
2102        .messages
2103        .into_iter()
2104        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2105        .collect::<HashMap<_, _>>();
2106
2107    let permalinks = response
2108        .permalinks
2109        .into_iter()
2110        .filter_map(|permalink| {
2111            Some((
2112                git::Oid::from_bytes(&permalink.oid).ok()?,
2113                Url::from_str(&permalink.permalink).ok()?,
2114            ))
2115        })
2116        .collect::<HashMap<_, _>>();
2117
2118    Some(Blame {
2119        entries,
2120        permalinks,
2121        messages,
2122        remote_url: response.remote_url,
2123    })
2124}